Source code for data_juicer.ops.mapper.video_camera_calibration_droidcalib_mapper

import argparse
import importlib
import os
import subprocess
import sys
from typing import Optional

import numpy as np
from loguru import logger

from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE
from data_juicer.utils.constant import CameraCalibrationKeys, Fields, MetaKeys
from data_juicer.utils.lazy_loader import LazyLoader

from ..base_op import OPERATORS, Mapper

torch = LazyLoader("torch")
cv2 = LazyLoader("cv2", "opencv-python")

OP_NAME = "video_camera_calibration_droidcalib_mapper"


[docs] @OPERATORS.register_module(OP_NAME) class VideoCameraCalibrationDroidCalibMapper(Mapper): """ Extract camera intrinsics from videos using DroidCalib. **Notice**: This operator will download the DroidCalib component from GitHub at runtime. This component follows the AGPL-3.0 license, please be aware for commercial use. """ _accelerator = "cuda"
[docs] def __init__( self, weights_path: Optional[str] = None, image_size: list = [384, 512], stride: int = 2, max_frames: int = 300, buffer: int = 1024, beta: float = 0.3, filter_thresh: float = 2.4, warmup: int = 8, keyframe_thresh: float = 4.0, frontend_thresh: float = 16.0, frontend_window: int = 25, frontend_radius: int = 2, frontend_nms: int = 1, backend_thresh: float = 22.0, backend_radius: int = 2, backend_nms: int = 3, upsample: bool = False, disable_vis: bool = True, verbose: bool = False, tag_field_name: str = MetaKeys.camera_calibration_droidcalib_tags, *args, **kwargs, ): """ Initialization method. :param weights_path: Path to the model weights. :param image_size: Target image size [height, width]. :param stride: Frame stride. :param max_frames: Maximum number of frames to process. :param buffer: Buffer size for Droid. :param beta: Weight for translation / rotation components of flow. :param filter_thresh: Motion threshold before considering new keyframe. :param warmup: Number of warmup frames. :param keyframe_thresh: Threshold to create a new keyframe. :param frontend_thresh: Add edges between frames within this distance. :param frontend_window: Frontend optimization window. :param frontend_radius: Force edges between frames within radius. :param frontend_nms: Non-maximal suppression of edges. :param backend_thresh: Backend threshold. :param backend_radius: Backend radius. :param backend_nms: Backend NMS. :param upsample: Whether to upsample. :param disable_vis: Whether to disable visualization. """ super().__init__(*args, **kwargs) self.verbose = verbose self._deps_ready = False self.droid_calib_home = os.path.join(DATA_JUICER_ASSETS_CACHE, "DroidCalib") self.droid_slam_path = os.path.join(self.droid_calib_home, "droid_slam") self._ensure_droidcalib_ready() self.weights_path = weights_path if self.weights_path is None: self.weights_path = os.path.join(self.droid_calib_home, "droidcalib.pth") self.image_size = image_size self.stride = stride self.max_frames = max_frames # Droid args self.droid_args = argparse.Namespace() self.droid_args.weights = self.weights_path self.droid_args.buffer = buffer self.droid_args.image_size = image_size self.droid_args.beta = beta self.droid_args.filter_thresh = filter_thresh self.droid_args.warmup = warmup self.droid_args.keyframe_thresh = keyframe_thresh self.droid_args.frontend_thresh = frontend_thresh self.droid_args.frontend_window = frontend_window self.droid_args.frontend_radius = frontend_radius self.droid_args.frontend_nms = frontend_nms self.droid_args.backend_thresh = backend_thresh self.droid_args.backend_radius = backend_radius self.droid_args.backend_nms = backend_nms self.droid_args.upsample = upsample self.droid_args.disable_vis = disable_vis self.droid_args.stereo = False self.droid_args.camera_model = "pinhole" # Default to pinhole self.droid_args.opt_intr = True self.tag_field_name = tag_field_name self._ensure_droidcalib_ready()
def _ensure_droidcalib_ready(self) -> bool: """Ensure DroidCalib is importable in the *current process*. This matters because `Dataset.map(num_proc>1)` may execute in child processes where `sys.path` changes from `__init__` are not present. """ if not os.path.exists(self.droid_calib_home): logger.info("Clone DroidCalib...") try: subprocess.run( [ "git", "clone", "--recursive", # "https://github.com/boschresearch/DroidCalib.git", "https://github.com/1van2ha0/DroidCalib.git", f"{self.droid_calib_home}", ], check=True, ) except Exception: raise ValueError( "Failed to clone DroidCalib repository. Please ensure you have git installed and an internet connection, or manually clone the repository to the path " ) if self._deps_ready: return True try: import torch_scatter # noqa F401 except ImportError: # Please refer to https://github.com/rusty1s/pytorch_scatter to locate the # installation link that is compatible with your PyTorch and CUDA versions. # For example: # torch_version = "2.6.0" # cuda_version = "cu124" subprocess.run( [ "pip", "install", "torch-scatter", # "-f", # f"https://data.pyg.org/whl/torch-{torch_version}+{cuda_version}.html", ], check=True, ) try: self._load_droid_module() except ImportError: subprocess.run(["pip", "uninstall", "droid_backends", "-y"]) subprocess.run(["python", "setup.py", "install"], cwd=self.droid_calib_home, check=True) self._deps_ready = True return True def _load_droid_module(self): if self.droid_slam_path not in sys.path: sys.path.insert(1, self.droid_slam_path) droid_module_path = f"{self.droid_slam_path}/droid.py" spec = importlib.util.spec_from_file_location("droid", droid_module_path) if spec is None: raise ImportError(f"Could not load spec from {droid_module_path}") droid_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(droid_module) return droid_module def _image_stream(self, video_path): """ Generator that yields (t, image, intrinsics, size_factor) """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return # Initial calibration guess (center of image) w0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h0 = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # fx, fy, cx, cy calib = np.array([(w0 + h0) / 2, (w0 + h0) / 2, w0 / 2, h0 / 2]) fx, fy, cx, cy = calib ht, wd = self.image_size # Target size [h, w] t = 0 frame_idx = 0 while cap.isOpened(): ret, image = cap.read() if not ret: break if frame_idx % self.stride != 0: frame_idx += 1 continue if self.max_frames and t >= self.max_frames: break h0, w0, _ = image.shape # Resize logic from demo.py # h1 = int(h0 * np.sqrt((ht * wd) / (h0 * w0))) # w1 = int(w0 * np.sqrt((ht * wd) / (h0 * w0))) # Actually demo.py logic seems to try to maintain aspect ratio but target specific area? # Let's stick to demo.py logic ratio = np.sqrt((ht * wd) / (h0 * w0)) h1 = int(h0 * ratio) w1 = int(w0 * ratio) image = cv2.resize(image, (w1, h1)) image = image[: h1 - h1 % 8, : w1 - w1 % 8] # Crop to be divisible by 8 image_tensor = torch.as_tensor(image).permute(2, 0, 1) intrinsics = torch.as_tensor([fx, fy, cx, cy]) # Adjust intrinsics for resize h_final, w_final = image.shape[:2] size_factor = [(w_final / w0), (h_final / h0)] intrinsics[0::2] *= size_factor[0] intrinsics[1::2] *= size_factor[1] yield t, image_tensor[None], intrinsics, size_factor t += 1 frame_idx += 1 cap.release() def _process_video_file(self, video_path): droid_module = self._load_droid_module() Droid = droid_module.Droid # from droid import Droid if not os.path.exists(video_path): return None # Let's create a generator stream = self._image_stream(video_path) droid = None sf = None # size factor intr_est_list = None # try: for t, image, intrinsics, size_factor in stream: if droid is None: # Update args with actual image size self.droid_args.image_size = [image.shape[2], image.shape[3]] droid = Droid(self.droid_args) droid.track(t, image, intrinsics=intrinsics) sf = size_factor if droid is not None: # Terminate and get results # We need to pass the stream again for terminate? # demo.py: droid.terminate(image_stream(...)) # It seems terminate does a final BA pass using the stream? # Let's recreate stream stream_second_pass = self._image_stream(video_path) traj_est, intr_est = droid.terminate(stream_second_pass) # Rescale intrinsics back to original resolution if sf: intr_est = intr_est.copy() intr_est[0:4:2] /= sf[0] intr_est[1:4:2] /= sf[1] intr_est_list = intr_est.tolist() if droid: del droid torch.cuda.empty_cache() return intr_est_list
[docs] def process_single(self, sample, rank=None): video_paths = sample[self.video_key] if isinstance(video_paths, str): video_paths = [video_paths] if Fields.meta not in sample: sample[Fields.meta] = {} if not sample[Fields.meta].get(self.tag_field_name, None): sample[Fields.meta][self.tag_field_name] = [] for video_path in video_paths: res = self._process_video_file(video_path) if res is not None: fx, fy, cx, cy = res res = [[fx, 0, cx], [0, fy, cy], [0, 0, 1]] sample[Fields.meta][self.tag_field_name].append({CameraCalibrationKeys.intrinsics: res}) return sample