Source code for data_juicer.ops.mapper.video_undistort_mapper

import os
import subprocess

import numpy as np

from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.lazy_loader import LazyLoader

from ..base_op import OPERATORS, Mapper
from ..op_fusion import LOADED_VIDEOS

OP_NAME = "video_undistort_mapper"

ffmpeg = LazyLoader("ffmpeg", "ffmpeg-python")


[docs] @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) class VideoUndistortMapper(Mapper): """Undistort raw videos with corresponding camera intrinsics and distortion coefficients."""
[docs] def __init__( self, output_video_dir: str = DATA_JUICER_ASSETS_CACHE, tag_field_name: str = MetaKeys.video_undistortion_tags, batch_size_each_video: int = 1000, crf: int = 22, *args, **kwargs, ): """ Initialization method. :param output_video_dir: Output directory to save undistorted videos. :param tag_field_name: The field name to store the tags. It's "video_undistortion_tags" in default. :param batch_size_each_video: Number of frames to process and save per temporary TS file batch. :param crf: Constant Rate Factor (CRF) for FFmpeg encoding quality. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) opencv_contrib_python_exist_code = subprocess.run(["pip", "show", "opencv-contrib-python"]) # This is written to fix version issues with Numpy if opencv_contrib_python_exist_code.returncode == 1: # not exist LazyLoader.check_packages(["opencv-contrib-python"]) subprocess.run(["pip", "install", "numpy==1.26.4"], check=True) import cv2 self.VideoCapture = cv2.VideoCapture self.CAP_PROP_FRAME_HEIGHT = cv2.CAP_PROP_FRAME_HEIGHT self.CAP_PROP_FRAME_WIDTH = cv2.CAP_PROP_FRAME_WIDTH self.CAP_PROP_FPS = cv2.CAP_PROP_FPS self.omnidir = cv2.omnidir self.CV_16SC2 = cv2.CV_16SC2 self.remap = cv2.remap self.INTER_CUBIC = cv2.INTER_CUBIC self.BORDER_CONSTANT = cv2.BORDER_CONSTANT self.cvtColor = cv2.cvtColor self.COLOR_BGR2RGB = cv2.COLOR_BGR2RGB self.output_video_dir = output_video_dir self.tag_field_name = tag_field_name self.batch_size_each_video = batch_size_each_video self.crf = crf
[docs] def concatenate_ts_files(self, folder, video_name, batch_counts): """Concatenate batch TS files into final mp4.""" inputs_path = os.path.join(folder, "inputs.txt") # Create a file list for ffmpeg with open(inputs_path, "w") as f: for i in range(batch_counts): f.write(f"file '{video_name}_b{i:04d}.ts'\n") # Merge using ffmpeg concat demuxer ffmpeg.input(inputs_path, format="concat", safe=0).output( os.path.join(folder, f"{video_name}.mp4"), c="copy" ).run() # Cleanup temporary TS files and list file for i in range(batch_counts): os.remove(os.path.join(folder, f"{video_name}_b{i:04d}.ts")) os.remove(inputs_path)
[docs] def create_ffmpeg_writer(self, output_path, width, height, fps, crf): """Spawn an ffmpeg async encoding process for writing raw frames.""" return ( ffmpeg.output( ffmpeg.input( "pipe:0", format="rawvideo", pix_fmt="rgb24", s=f"{width}x{height}", r=fps, ), output_path, **{ "preset": "medium", "pix_fmt": "yuv420p", "b:v": "0", "c:v": "libx264", "crf": str(crf), "r": fps, }, ) .overwrite_output() .run_async(quiet=True, pipe_stdin=True) )
[docs] def process_single(self, sample, context=False): # check if it's generated already if self.tag_field_name in sample[Fields.meta]: return sample # there is no videos in this sample if self.video_key not in sample or not sample[self.video_key]: return [] cap = self.VideoCapture(sample[self.video_key][0]) video_name = os.path.splitext(os.path.basename(sample[self.video_key][0]))[0] # Get video properties height = int(cap.get(self.CAP_PROP_FRAME_HEIGHT)) width = int(cap.get(self.CAP_PROP_FRAME_WIDTH)) fps = cap.get(self.CAP_PROP_FPS) if "intrinsics" not in sample or sample["intrinsics"] is None: raise ValueError("The sample must include an 'intrinsics' field to store the 3x3 camera intrinsics matrix.") if "xi" not in sample or sample["xi"] is None: raise ValueError("The sample must include an 'xi' field to store the parameter xi in CMei's model.") K = sample["intrinsics"] # 3x3 camera intrinsics. D = sample.get( "distortion_coefficients", None ) # Distortion coefficients (k1,k2,p1,p2). If D is None then zero distortion is used. xi = sample["xi"] # The parameter xi for CMei's model. R = sample.get( "rotation_matrix", None ) # Rotation transform between the original and object space. If it is None, there is no rotation. new_K = sample.get( "intrinsics_new", None ) # New camera intrinsics. if new_K is empty then identity intrinsics are used. K = np.array(K, dtype=np.float32) xi = np.array(xi, dtype=np.float32) if D is None: D = np.array([0, 0, 0, 0], dtype=np.float32) else: D = np.array(D, dtype=np.float32) if R is None: R = np.eye(3) else: R = np.array(R, dtype=np.float32) if new_K is None: new_K = K else: new_K = np.array(new_K, dtype=np.float32) map1, map2 = self.omnidir.initUndistortRectifyMap( K, D, xi, R, new_K, (width, height), self.CV_16SC2, self.omnidir.RECTIFY_PERSPECTIVE ) # Initialize the first batch ffmpeg writer os.makedirs(self.output_video_dir, exist_ok=True) batch_number = 0 writer = self.create_ffmpeg_writer( os.path.join(self.output_video_dir, f"{video_name}_b{batch_number:04d}.ts"), width, height, fps, self.crf ) idx = 0 # Read and process frames while True: ret, frame = cap.read() if not ret: # End of video stream: close the last writer writer.stdin.close() writer.wait() break # Undistort the frame undistorted_frame = self.remap( frame, map1, map2, interpolation=self.INTER_CUBIC, borderMode=self.BORDER_CONSTANT ) # Convert BGR to RGB before writing to ffmpeg (FFmpeg expects RGB) undistorted_frame = self.cvtColor(undistorted_frame, self.COLOR_BGR2RGB) # Write to ffmpeg stdin writer.stdin.write(undistorted_frame.tobytes()) # Check if the current batch is complete (for idx + 1) if (idx + 1) % self.batch_size_each_video == 0: # Finalize the current batch writer writer.stdin.close() writer.wait() # Start the next batch writer batch_number += 1 writer = self.create_ffmpeg_writer( os.path.join(self.output_video_dir, f"{video_name}_b{batch_number:04d}.ts"), width, height, fps, self.crf, ) idx += 1 cap.release() # Merge all temporary TS chunks into the final MP4 file self.concatenate_ts_files(self.output_video_dir, video_name, batch_number + 1) sample[Fields.meta][self.tag_field_name] = { "new_video_path": os.path.join(self.output_video_dir, f"{video_name}.mp4") } return sample