Source code for data_juicer.ops.mapper.video_depth_estimation_mapper

import os
import subprocess

import numpy as np

from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.model_utils import get_model, prepare_model

from ..base_op import OPERATORS, TAGGING_OPS, UNFORKABLE, Mapper
from ..op_fusion import LOADED_VIDEOS

OP_NAME = "video_depth_estimation_mapper"


cv2 = LazyLoader("cv2", "opencv-python")
torch = LazyLoader("torch")
open3d = LazyLoader("open3d")


[docs] @TAGGING_OPS.register_module(OP_NAME) @UNFORKABLE.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) class VideoDepthEstimationMapper(Mapper): """Perform depth estimation on the video.""" _accelerator = "cuda"
[docs] def __init__( self, video_depth_model_path: str = "video_depth_anything_vitb.pth", point_cloud_dir_for_metric: str = DATA_JUICER_ASSETS_CACHE, max_res: int = 1280, torch_dtype: str = "fp16", if_save_visualization: bool = False, save_visualization_dir: str = DATA_JUICER_ASSETS_CACHE, grayscale: bool = False, *args, **kwargs, ): """ Initialization method. :param video_depth_model_path: The path to the Video-Depth-Anything model. If the model is a 'metric' model, the code will automatically switch to metric mode, and the user should input the path for storing point clouds. :param point_cloud_dir_for_metric: The path for storing point clouds (for a 'metric' model). :param max_res: The maximum resolution threshold for videos; videos exceeding this threshold will be resized. :param torch_dtype: The floating point type used for model inference. Can be one of ['fp32', 'fp16'] :param if_save_visualization: Whether to save visualization results. :param save_visualization_dir: The path for saving visualization results. :param grayscale: If True, the colorful palette will not be applied. """ super().__init__(*args, **kwargs) LazyLoader.check_packages(["easydict", "xformers", "imageio", "imageio-ffmpeg"]) video_depth_anything_repo_path = os.path.join(DATA_JUICER_ASSETS_CACHE, "Video-Depth-Anything") if not os.path.exists(video_depth_anything_repo_path): subprocess.run( [ "git", "clone", "https://github.com/DepthAnything/Video-Depth-Anything.git", video_depth_anything_repo_path, ], check=True, ) import sys sys.path.append(video_depth_anything_repo_path) print(f"====exist====: {os.path.exists(os.path.join(sys.path[-1], 'utils', 'dc_utils.py'))}") from utils.dc_utils import read_video_frames, save_video if "metric" in video_depth_model_path: self.metric = True else: self.metric = False self.read_video_frames = read_video_frames self.save_video = save_video self.tag_field_name = MetaKeys.video_depth_tags self.max_res = max_res self.torch_dtype = torch_dtype self.point_cloud_dir_for_metric = point_cloud_dir_for_metric self.if_save_visualization = if_save_visualization self.save_visualization_dir = save_visualization_dir self.grayscale = grayscale self.model_key = prepare_model(model_type="video_depth_anything", model_path=video_depth_model_path)
[docs] def process_single(self, sample=None, rank=None): # check if it's generated already if self.tag_field_name in sample[Fields.meta]: return sample # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: sample[Fields.meta][self.tag_field_name] = {"depth_data": [], "fps": -1} return sample video_depth_anything_model = get_model(model_key=self.model_key, rank=rank, use_cuda=self.use_cuda()) if rank is not None: device = f"cuda:{str(rank)}" else: device = "cuda" frames, target_fps = self.read_video_frames(sample[self.video_key][0], -1, -1, self.max_res) depths, fps = video_depth_anything_model.infer_video_depth( frames, target_fps, input_size=518, device=device if self.use_cuda() else "cpu", fp32=False if self.torch_dtype == "fp16" else True, ) if self.if_save_visualization: video_name = os.path.basename(sample[self.video_key][0]) os.makedirs(self.save_visualization_dir, exist_ok=True) processed_video_path = os.path.join( self.save_visualization_dir, os.path.splitext(video_name)[0] + "_src.mp4" ) depth_vis_path = os.path.join(self.save_visualization_dir, os.path.splitext(video_name)[0] + "_vis.mp4") self.save_video(frames, processed_video_path, fps=fps) self.save_video(depths, depth_vis_path, fps=fps, is_depths=True, grayscale=self.grayscale) if self.metric: os.makedirs(self.point_cloud_dir_for_metric, exist_ok=True) width, height = depths[0].shape[-1], depths[0].shape[-2] x, y = np.meshgrid(np.arange(width), np.arange(height)) x = (x - width / 2) / 470.4 y = (y - height / 2) / 470.4 for i, (color_image, depth) in enumerate(zip(frames, depths)): z = np.array(depth) points = np.stack((np.multiply(x, z), np.multiply(y, z), z), axis=-1).reshape(-1, 3) colors = np.array(color_image).reshape(-1, 3) / 255.0 pcd = open3d.geometry.PointCloud() pcd.points = open3d.utility.Vector3dVector(points) pcd.colors = open3d.utility.Vector3dVector(colors) open3d.io.write_point_cloud( os.path.join(self.point_cloud_dir_for_metric, "point" + str(i).zfill(4) + ".ply"), pcd ) sample[Fields.meta][self.tag_field_name] = {} sample[Fields.meta][self.tag_field_name]["depth_data"] = depths sample[Fields.meta][self.tag_field_name]["fps"] = fps return sample