Source code for data_juicer.ops.mapper.video_trajectory_overlay_mapper

import os

import cv2
import numpy as np
from loguru import logger

from data_juicer.utils.constant import Fields, MetaKeys

from ..base_op import OPERATORS, Mapper

OP_NAME = "video_trajectory_overlay_mapper"


[docs] @OPERATORS.register_module(OP_NAME) class VideoTrajectoryOverlayMapper(Mapper): """Prepare VLM-ready frames by sampling and overlaying hand trajectories. Implements the visualization step from paper https://arxiv.org/pdf/2510.21571: "From each segment, we evenly sample 8 frames and highlight hand trajectories on each frame by projecting the world-space trajectory of the hand palm from the current frame to the end of the clip." For each atomic action segment (output of ``VideoAtomicActionSegmentMapper``), this operator: 1. Evenly samples ``n_sample_frames`` frames from the segment. 2. For each sampled frame, projects the **future** world-space wrist trajectory (from the current frame to the end of the segment) onto the image using camera intrinsics and cam_c2w. 3. Draws the trajectory as a colored line with a dot at the current wrist position. 4. Saves the overlay images and stores their paths in the segment. The output is written back into each segment dict under ``"overlay_frames"``, ready to be consumed by the VLM captioning operator. """ # MANO joint index for palm center (middle finger MCP). # Paper §3.3: "trajectory of the hand palm" PALM_JOINT_INDEX = 9
[docs] def __init__( self, segment_field: str = "atomic_action_segments", camera_pose_field: str = MetaKeys.video_camera_pose_tags, moge_field: str = MetaKeys.camera_calibration_moge_tags, frame_field: str = MetaKeys.video_frames, save_dir: str = None, n_sample_frames: int = 8, palm_joint_index: int = 9, dot_radius: int = 10, line_thickness: int = 4, trajectory_alpha: float = 0.7, *args, **kwargs, ): """ Initialization method. :param segment_field: Meta field storing atomic action segments. :param camera_pose_field: Meta field storing camera pose (cam_c2w). :param moge_field: Meta field storing MoGe calibration (for fov_x). :param frame_field: Field storing frame image paths. :param save_dir: Directory to save overlay images. If None, uses a temp directory derived from the first frame path. :param n_sample_frames: Number of frames to evenly sample from each segment. :param palm_joint_index: MANO joint index for the palm position. Default 9 = middle finger MCP (palm center proxy). Joint 0 = wrist root. :param dot_radius: Radius of the dot at the current wrist position. :param line_thickness: Thickness of the trajectory line. :param trajectory_alpha: Alpha blending for the trajectory overlay. """ super().__init__(*args, **kwargs) self.segment_field = segment_field self.camera_pose_field = camera_pose_field self.moge_field = moge_field self.frame_field = frame_field self.save_dir = save_dir self.n_sample_frames = n_sample_frames self.palm_joint_index = palm_joint_index self.dot_radius = dot_radius self.line_thickness = line_thickness self.trajectory_alpha = trajectory_alpha
# ------------------------------------------------------------------ # Projection helpers # ------------------------------------------------------------------ @staticmethod def _world_to_camera( pos_world: np.ndarray, cam_c2w: np.ndarray, ) -> np.ndarray: """Convert world position(s) to camera space. Args: pos_world: (..., 3) world positions. cam_c2w: (4, 4) camera-to-world transform. Returns: (..., 3) camera-space positions. """ R = cam_c2w[:3, :3] t = cam_c2w[:3, 3] # cam = R^T @ (world - t) return (pos_world - t) @ R # equivalent to (R.T @ (p - t).T).T @staticmethod def _project_to_2d( pos_cam: np.ndarray, width: int, height: int, K: np.ndarray = None, fov_x: float = None, ) -> np.ndarray: """Project camera-space positions to 2D pixel coords. Args: pos_cam: (..., 3) camera-space positions. width: image width. height: image height. K: (3, 3) intrinsics matrix (preferred). If provided, fov_x is ignored and fx, fy, cx, cy are taken from K directly. fov_x: horizontal field of view in radians (fallback when K is not available). Returns: (..., 2) pixel coordinates (u, v). """ if K is not None: K = np.asarray(K, dtype=np.float64) fx = K[0, 0] fy = K[1, 1] cx = K[0, 2] cy = K[1, 2] elif fov_x is not None: fx = width / (2.0 * np.tan(fov_x / 2.0)) fy = fx cx, cy = width / 2.0, height / 2.0 else: raise ValueError("Either K or fov_x must be provided") z = pos_cam[..., 2] z_safe = np.where(np.abs(z) < 1e-6, 1e-6, z) u = fx * pos_cam[..., 0] / z_safe + cx v = fy * pos_cam[..., 1] / z_safe + cy return np.stack([u, v], axis=-1) # ------------------------------------------------------------------ # Drawing helpers # ------------------------------------------------------------------ @staticmethod def _temporal_color(t: float) -> tuple: """Map normalized time t ∈ [0, 1] to BGR color along blue → green → red gradient. t=0 (current) → blue, t=0.5 → green, t=1 (future end) → red. """ # BGR format if t < 0.5: # blue → green ratio = t / 0.5 b = int(255 * (1 - ratio)) g = int(255 * ratio) r = 0 else: # green → red ratio = (t - 0.5) / 0.5 b = 0 g = int(255 * (1 - ratio)) r = int(255 * ratio) return (b, g, r) def _draw_trajectory( self, frame: np.ndarray, points_2d: np.ndarray, current_idx: int = 0, ) -> np.ndarray: """Draw trajectory line with blue→green→red temporal gradient and a blue dot at the current palm position. Args: frame: BGR image to draw on (modified in place). points_2d: (N, 2) pixel coordinates of trajectory points. current_idx: index of the current frame's position in points_2d (drawn as a blue dot). """ h, w = frame.shape[:2] overlay = frame.copy() # Filter out-of-frame and behind-camera points, keep original index valid = [] valid_indices = [] for i, pt in enumerate(points_2d): if 0 <= pt[0] < w and 0 <= pt[1] < h: valid.append((int(pt[0]), int(pt[1]))) valid_indices.append(i) elif valid: # Keep trajectory continuous by clamping valid.append( ( int(np.clip(pt[0], 0, w - 1)), int(np.clip(pt[1], 0, h - 1)), ) ) valid_indices.append(i) # Draw trajectory line with temporal color gradient n_pts = len(points_2d) if len(valid) >= 2: for i in range(len(valid) - 1): t = valid_indices[i] / max(n_pts - 1, 1) line_color = self._temporal_color(t) cv2.line( overlay, valid[i], valid[i + 1], line_color, self.line_thickness, lineType=cv2.LINE_AA, ) # Draw current position as a blue dot if current_idx < len(points_2d): pt = points_2d[current_idx] if 0 <= pt[0] < w and 0 <= pt[1] < h: blue_bgr = (255, 100, 0) # blue in BGR cv2.circle( overlay, (int(pt[0]), int(pt[1])), self.dot_radius, blue_bgr, -1, lineType=cv2.LINE_AA, ) # White border for visibility cv2.circle( overlay, (int(pt[0]), int(pt[1])), self.dot_radius + 1, (255, 255, 255), 1, lineType=cv2.LINE_AA, ) # Alpha blend cv2.addWeighted( overlay, self.trajectory_alpha, frame, 1 - self.trajectory_alpha, 0, frame, ) return frame # ------------------------------------------------------------------ # Process one segment # ------------------------------------------------------------------ def _process_segment( self, segment: dict, all_frames: list[str], cam_c2w_all: np.ndarray, save_dir: str, intrinsics_list: list = None, fov_x: float = None, file_prefix: str = "", ) -> dict: """Process a single segment: sample frames, overlay trajectory. Args: intrinsics_list: per-frame (3,3) intrinsics matrices from MoGe. If provided, used for accurate projection (preferred). fov_x: fallback horizontal FOV in radians when intrinsics_list is not available. file_prefix: prefix added to overlay filenames to avoid collisions when multiple videos share the same save_dir. Returns the segment dict with ``overlay_frames`` added. """ hand_type = segment["hand_type"] valid_fids = segment["valid_frame_ids"] # Use joints_world for the palm trajectory (paper §3.3). # states[:, 0:3] is MANO's root transl, NOT the actual palm/wrist # position — there is a significant offset (~10cm) between them. joints_world = segment.get("joints_world") if joints_world and len(joints_world) > 0: jw_arr = np.asarray(joints_world, dtype=np.float64) palm_positions = jw_arr[:, self.palm_joint_index, :] else: # Fallback to states (less accurate) states = np.asarray(segment["states"], dtype=np.float64) palm_positions = states[:, 0:3] logger.debug( f"No joints_world for {hand_type} segment, " f"falling back to states[:, 0:3]", ) n = len(palm_positions) if n < 2: segment["overlay_frames"] = [] segment["sampled_frame_indices"] = [] return segment # Evenly sample frame indices within the segment if n <= self.n_sample_frames: sample_indices = list(range(n)) else: sample_indices = np.linspace( 0, n - 1, self.n_sample_frames, dtype=int, ).tolist() seg_id = segment.get("segment_id", 0) overlay_paths = [] for local_idx in sample_indices: fid = valid_fids[local_idx] if fid >= len(all_frames) or not all_frames[fid]: continue frame_path = all_frames[fid] frame = cv2.imread(frame_path) if frame is None: continue h, w = frame.shape[:2] # Get future trajectory: from current frame to end of segment future_positions = palm_positions[local_idx:] if fid >= len(cam_c2w_all): continue # Project all future world positions using the CURRENT frame's # camera (we are drawing on the current frame's image). cam = cam_c2w_all[fid] # Determine per-frame intrinsics for projection frame_K = None if intrinsics_list is not None and fid < len(intrinsics_list): frame_K = np.asarray(intrinsics_list[fid], dtype=np.float64) points_2d_list = [] for j in range(len(future_positions)): pos_cam = self._world_to_camera(future_positions[j], cam) pt_2d = self._project_to_2d( pos_cam, w, h, K=frame_K, fov_x=fov_x, ) points_2d_list.append(pt_2d) if not points_2d_list: continue points_2d = np.array(points_2d_list) frame = self._draw_trajectory(frame, points_2d, 0) # Save overlay frame (prefix avoids collisions across videos) fname = (f"{file_prefix}_" if file_prefix else "") + f"seg{seg_id}_{hand_type}_f{fid:06d}_overlay.jpg" out_path = os.path.join(save_dir, fname) cv2.imwrite(out_path, frame) overlay_paths.append(out_path) segment["overlay_frames"] = overlay_paths segment["sampled_frame_indices"] = [valid_fids[i] for i in sample_indices if i < len(valid_fids)] return segment # ------------------------------------------------------------------ # Main entry # ------------------------------------------------------------------ def _sample_prefix(self, sample: dict) -> str: """Derive a short unique prefix from the sample's video path. Used to namespace overlay files so different videos sharing the same save_dir do not overwrite each other. """ videos = sample.get(self.video_key, []) if videos: v = videos[0] if isinstance(videos, list) else videos return os.path.splitext(os.path.basename(v))[0] return "unknown"
[docs] def process_single(self, sample=None, rank=None): if Fields.meta not in sample: return sample meta = sample[Fields.meta] segments = meta.get(self.segment_field) if not segments: return sample # Get frame paths frame_data = sample.get(self.frame_field, []) if not frame_data: return sample all_frames = ( frame_data[0] if isinstance(frame_data, list) and frame_data and isinstance(frame_data[0], list) else frame_data ) # Get cam_c2w cam_pose_list = meta.get(self.camera_pose_field, []) if not cam_pose_list: logger.warning("No camera pose data for trajectory overlay.") return sample from data_juicer.utils.constant import CameraCalibrationKeys from data_juicer.utils.file_utils import load_numpy cam_pose = cam_pose_list[0] if isinstance(cam_pose_list, list) else cam_pose_list raw_c2w = cam_pose.get(CameraCalibrationKeys.cam_c2w) if raw_c2w is None: logger.warning("No cam_c2w for trajectory overlay.") return sample cam_c2w_all = np.asarray(load_numpy(raw_c2w), dtype=np.float64) # Get camera intrinsics (prefer full K matrix, fallback to fov_x) intrinsics_list, fov_x = self._get_intrinsics(meta) if intrinsics_list is None and fov_x is None: logger.warning( "Cannot determine camera intrinsics, skipping overlay.", ) return sample # Determine save directory save_dir = self.save_dir if save_dir is None and all_frames: save_dir = os.path.join( os.path.dirname(all_frames[0]), "trajectory_overlays", ) if save_dir: os.makedirs(save_dir, exist_ok=True) # Unique prefix to avoid filename collisions across videos prefix = self._sample_prefix(sample) # Process each segment for i, seg in enumerate(segments): try: segments[i] = self._process_segment( seg, all_frames, cam_c2w_all, save_dir, intrinsics_list=intrinsics_list, fov_x=fov_x, file_prefix=prefix, ) except Exception as e: logger.warning( f"Trajectory overlay failed for segment {i}: {e}", ) seg["overlay_frames"] = [] seg["sampled_frame_indices"] = [] meta[self.segment_field] = segments return sample
def _get_intrinsics(self, meta: dict) -> tuple: """Extract camera intrinsics for projection. Returns: (intrinsics_list, fov_x): intrinsics_list is a per-frame list of (3,3) K matrices if available (preferred), otherwise None. fov_x is a scalar fallback FOV in radians. At least one of them will be non-None if calibration data exists. """ from data_juicer.utils.constant import CameraCalibrationKeys intrinsics_list = None fov_x = None # Try MoGe calibration — prefer full intrinsics matrix K moge_list = meta.get(self.moge_field, []) if moge_list: moge = moge_list[0] if isinstance(moge_list, list) else moge_list if isinstance(moge, dict): # Prefer per-frame intrinsics K matrix K_list = moge.get(CameraCalibrationKeys.intrinsics) if K_list and isinstance(K_list, list) and len(K_list) > 0: intrinsics_list = K_list # Also get hfov as fallback hfov = moge.get(CameraCalibrationKeys.hfov) if hfov is not None: if isinstance(hfov, list) and hfov: fov_x = float(np.median(hfov)) else: fov_x = float(hfov) # Try HaWoR fov_x (HaWoR uses median of MoGe hfov, most consistent) if fov_x is None: hawor_field = MetaKeys.hand_reconstruction_hawor_tags hawor_list = meta.get(hawor_field, []) if hawor_list: hawor = hawor_list[0] if isinstance(hawor_list, list) else hawor_list if isinstance(hawor, dict): hawor_fov = hawor.get("fov_x") if hawor_fov is not None: fov_x = float(hawor_fov) return intrinsics_list, fov_x