Source code for data_juicer.utils.constant

import copy
import inspect
import io
import os
from enum import Enum

import zstandard as zstd
from loguru import logger

RAY_JOB_ENV_VAR = "RAY_JOB"
SPECIAL_TOKEN_ENV_PREFIX = "_DJ_SPECIAL_TOKEN_"
DEFAULT_PREFIX = "__dj__"


[docs] class Fields(object): # for storing stats generated by filter op stats = DEFAULT_PREFIX + "stats__" # for storing metas generated by mapper op meta = DEFAULT_PREFIX + "meta__" # for storing metas of batch samples generated by aggregator op batch_meta = DEFAULT_PREFIX + "batch_meta__" context = DEFAULT_PREFIX + "context__" suffix = DEFAULT_PREFIX + "suffix__" # text_tags text_tags = DEFAULT_PREFIX + "text_tags__" # the name of the original file from which this sample was derived. source_file = DEFAULT_PREFIX + "source_file__" # the name of directory to store the produced multimodal data multimodal_data_output_dir = DEFAULT_PREFIX + "produced_data__"
[docs] class BatchMetaKeys(object): entity_attribute = "entity_attribute" most_relevant_entities = "most_relevant_entities"
[docs] class MetaKeys(object): # === text related tags === # # sentiment dialog_sentiment_intensity = "dialog_sentiment_intensity" dialog_sentiment_intensity_analysis = "dialog_sentiment_intensity_analysis" query_sentiment_label = "query_sentiment_label" query_sentiment_score = "query_sentiment_label_score" dialog_sentiment_labels = "dialog_sentiment_labels" dialog_sentiment_labels_analysis = "dialog_sentiment_labels_analysis" # # intent dialog_intent_labels = "dialog_intent_labels" dialog_intent_labels_analysis = "dialog_intent_labels_analysis" query_intent_label = "query_intent_label" query_intent_score = "query_intent_label_score" # # topic dialog_topic_labels = "dialog_topic_labels" dialog_topic_labels_analysis = "dialog_topic_labels_analysis" query_topic_label = "query_topic_label" query_topic_score = "query_topic_label_score" # === multi-modal related tags === # # video-frame tags video_frame_tags = "video_frame_tags" # # video-audio tags video_audio_tags = "video_audio_tags" # # video frames video_frames = "video_frames" # # object segment info in video video_object_segment_tags = "video_object_segment_tags" # # depth info in video video_depth_tags = "video_depth_tags" # # video optical flow video_optical_flow = "video_optical_flow" # # info extracted by VGGT vggt_tags = "vggt_tags" # # image tags image_tags = "image_tags" # # hand reconstruction hand_reconstruction_tags = "hand_reconstruction_tags" hand_reconstruction_hawor_tags = "hand_reconstruction_hawor_tags" # # bounding box tag bbox_tag = DEFAULT_PREFIX + "bbox__" # # class label (from detection) tag class_label_tag = DEFAULT_PREFIX + "class_label__" # # 2D whole-body pose estimation pose_estimation_tags = "pose_estimation_tags" # # pose information pose_info = "pose_info" # # Static Camera Calibration Info (for DeepCalib) static_camera_calibration_deepcalib_tags = "static_camera_calibration_deepcalib_tags" # # Static Camera Calibration Info (for Moge-2) static_camera_calibration_moge_tags = "static_camera_calibration_moge_tags" # # Video Undistortion Info video_undistortion_tags = "video_undistortion_tags" # # Camera Pose Info video_camera_pose_tags = "video_camera_pose_tags" # === info extraction related tags === # # for event extraction event_description = "event_description" # # a list of characters relevant to the event relevant_characters = "relevant_characters" # # the given main entities for attribute extraction main_entities = "main_entities" # # the given attributes to be extracted attributes = "attributes" # # the extracted attribute descriptions attribute_descriptions = "attribute_descriptions" # # extract from raw data for support the attribute attribute_support_texts = "attribute_support_texts" # # the nickname relationship nickname = "nickname" # # the entity for knowledge graph entity = "entity" # # # the name of entity entity_name = "entity_name" # # # the type of entity entity_type = "entity_type" # # # the description of entity entity_description = "entity_entity_description" # # the relationship for knowledge graph relation = "relation" # # # the source entity of the relation source_entity = "relation_source_entity" # # # the target entity of the relation target_entity = "relation_target_entity" # # # the description of the relation relation_description = "relation_description" # # # the keywords of the relation relation_keywords = "relation_keywords" # # # the strength of the relation relation_strength = "relation_strength" # # the keyword in a text keyword = "keyword" # # support text support_text = "support_text" # # role relation role_relation = "role_relation" # # html tables html_tables = "html_tables" # # LLM-based structured extraction (user-configurable output_schema) llm_extract = "llm_extract" # # LLM semantic ops: token/cost usage per call (prompt_tokens, completion_tokens, total_tokens, cost_estimate) llm_semantic_usage = "llm_semantic_usage" # === agent / dialog quality analysis tags === # # from agent_dialog_normalize_mapper agent_tool_types = "agent_tool_types" agent_skill_types = "agent_skill_types" agent_skill_insights = "agent_skill_insights" agent_turn_count = "agent_turn_count" # # lineage / cohort fields (copied from raw agent JSON e.g. request_model, pt) agent_request_model = "agent_request_model" agent_pt = "agent_pt" agent_total_cost_time_ms = "agent_total_cost_time_ms" # # stable ids / message indices (for reports, showcase, log correlation) agent_request_id = "agent_request_id" agent_last_user_msg_idx = "agent_last_user_msg_idx" agent_last_assistant_msg_idx = "agent_last_assistant_msg_idx" # # True if dialog_history / text / query / response were shrunk via head+tail cap agent_dialog_history_compressed = "agent_dialog_history_compressed" # # from agent_bad_case_signal_mapper — structured, conservatively triaged agent_bad_case_signals = "agent_bad_case_signals" agent_bad_case_tier = "agent_bad_case_tier" # # from agent_insight_llm_mapper — LLM synthesis for attribution / dashboards agent_insight_llm = "agent_insight_llm" agent_insight_llm_raw = "agent_insight_llm_raw" # last writer (e.g. agent_insight_llm_mapper) records pipeline UI / report locale hint agent_pipeline_output_lang = "agent_pipeline_output_lang" # # from pii_llm_suspect_mapper — optional LLM audit for missed regex PII pii_llm_suspect = "pii_llm_suspect" pii_llm_suspect_raw = "pii_llm_suspect_raw" # # dialog/agent turn-quality LLM mappers (see dialog_* / agent_* mapper files) dialog_memory_consistency = "dialog_memory_consistency" dialog_coreference = "dialog_coreference" dialog_topic_shift = "dialog_topic_shift" dialog_error_recovery = "dialog_error_recovery" dialog_clarification_quality = "dialog_clarification_quality" dialog_proactivity = "dialog_proactivity" dialog_non_repetition = "dialog_non_repetition" agent_trace_coherence = "agent_trace_coherence" agent_tool_relevance = "agent_tool_relevance" # # from agent_tool_type_mapper primary_tool_type = "primary_tool_type" dominant_tool_types = "dominant_tool_types" # # from usage_counter_mapper prompt_tokens = "prompt_tokens" completion_tokens = "completion_tokens" total_tokens = "total_tokens" # # from tool_success_tagger_mapper tool_success_count = "tool_success_count" tool_fail_count = "tool_fail_count" tool_unknown_count = "tool_unknown_count" tool_success_ratio = "tool_success_ratio" tool_results = "tool_results"
[docs] class StatsKeysMeta(type): """ a helper class to track the mapping from OP's name to its used stats_keys e.g., # once the AlphanumericFilter's compute_stats method has been called res = TrackingDescriptor.get_access_log() print(res) # {"AlphanumericFilter": ["alnum_ratio", "alpha_token_ratio"]} """ _accessed_by = {} def __getattr__(cls, attr): caller_class = inspect.currentframe().f_back.f_globals["__name__"] # no need to track the parent classes caller_class = caller_class.split(".")[-1] stat_key = getattr(cls._constants_class, attr) if caller_class not in cls._accessed_by: cls._accessed_by[caller_class] = set() if stat_key not in cls._accessed_by[caller_class]: cls._accessed_by[caller_class].add(stat_key) return stat_key
[docs] def get_access_log(cls, dj_cfg=None, dataset=None): if cls._accessed_by: return cls._accessed_by elif dj_cfg and dataset: tmp_dj_cfg = copy.deepcopy(dj_cfg) tmp_dj_cfg.use_cache = False tmp_dj_cfg.use_checkpoint = False tmp_dj_cfg.auto_op_parallelism = False # Disable auto parallelism to track StatsKeys access tmp_dj_cfg.np = None # Disable multiprocessing to track StatsKeys access # Force disable multiprocessing for each op to track StatsKeys access for op_config in tmp_dj_cfg.process: op_name = list(op_config.keys())[0] if op_config[op_name] is not None: op_config[op_name]["auto_op_parallelism"] = False op_config[op_name]["num_proc"] = None from data_juicer.config import get_init_configs from data_juicer.core import Analyzer tmp_analyzer = Analyzer(get_init_configs(tmp_dj_cfg)) dataset = dataset.take(1) # do not overwrite the true analysis results tmp_analyzer.run(dataset=dataset, skip_export=True) elif dj_cfg: tmp_dj_cfg = copy.deepcopy(dj_cfg) # the access has been skipped due to the use of cache # we will using a temp data sample to get the access log if os.path.exists(dj_cfg.dataset_path) and ( "jsonl" in dj_cfg.dataset_path or "jsonl.zst" in dj_cfg.dataset_path ): logger.info("Begin to track the usage of ops with a dummy data sample") # load the first line as tmp_data tmp_f_name = None first_line = None if "jsonl.zst" in dj_cfg.dataset_path: tmp_f_name = dj_cfg.dataset_path.replace(".jsonl.zst", ".tmp.jsonl") # Open the file in binary mode and # create a Zstandard decompression context with open(dj_cfg.dataset_path, "rb") as compressed_file: dctx = zstd.ZstdDecompressor() # Create a stream reader for the file and decode the # first line with dctx.stream_reader(compressed_file) as reader: text_stream = io.TextIOWrapper(reader, encoding="utf-8") first_line = text_stream.readline() elif "jsonl" in dj_cfg.dataset_path: tmp_f_name = dj_cfg.dataset_path.replace(".jsonl", ".tmp.jsonl") with open(dj_cfg.dataset_path, "r", encoding="utf-8") as orig_file: first_line = orig_file.readline() assert tmp_f_name is not None and first_line is not None, ( "error when loading the first line, when " f"dj_cfg.dataset_path={dj_cfg.dataset_path}" ) with open(tmp_f_name, "w", encoding="utf-8") as tmp_file: tmp_file.write(first_line) tmp_dj_cfg.dataset_path = tmp_f_name tmp_dj_cfg.use_cache = False tmp_dj_cfg.use_checkpoint = False tmp_dj_cfg.auto_op_parallelism = False # Disable auto parallelism to track StatsKeys access tmp_dj_cfg.np = None # Disable multiprocessing to track StatsKeys access # Force disable multiprocessing for each op to track StatsKeys access for op_config in tmp_dj_cfg.process: op_name = list(op_config.keys())[0] if op_config[op_name] is not None: op_config[op_name]["auto_op_parallelism"] = False op_config[op_name]["num_proc"] = None from data_juicer.config import get_init_configs from data_juicer.core import Analyzer tmp_analyzer = Analyzer(get_init_configs(tmp_dj_cfg)) # do not overwrite the true analysis results tmp_analyzer.run(skip_export=True) os.remove(tmp_f_name) else: raise NotImplementedError( f"For now, the dummy data is supported for only jsonl type" f". Please check your config as {dj_cfg.dataset_path} is " f"either not existed or in jsonl type." ) return cls._accessed_by
[docs] class StatsKeysConstant(object): # === text === alpha_token_ratio = "alpha_token_ratio" alnum_ratio = "alnum_ratio" avg_line_length = "avg_line_length" char_rep_ratio = "char_rep_ratio" flagged_words_ratio = "flagged_words_ratio" in_context_influence = "in_context_influence" ifd_score = "ifd_score" lang = "lang" lang_score = "lang_score" max_line_length = "max_line_length" perplexity = "perplexity" special_char_ratio = "special_char_ratio" stopwords_ratio = "stopwords_ratio" text_len = "text_len" text_embd_similarity = "text_embd_similarity" text_pair_similarity = "text_pair_similarity" num_action = "num_action" num_dependency_edges = "num_dependency_edges" num_token = "num_token" num_words = "num_words" word_rep_ratio = "word_rep_ratio" llm_analysis_score = "llm_analysis_score" llm_analysis_record = "llm_analysis_record" llm_analysis_tags = "llm_analysis_tags" llm_quality_score = "llm_quality_score" llm_quality_record = "llm_quality_record" llm_quality_tags = "llm_quality_tags" llm_difficulty_score = "llm_difficulty_score" llm_difficulty_record = "llm_difficulty_record" llm_difficulty_tags = "llm_difficulty_tags" llm_perplexity = "llm_perplexity" llm_task_relevance = "llm_task_relevance" llm_task_relevance_record = "llm_task_relevance_record" llm_task_relevance_tags = "llm_task_relevance_tags" # llm_condition_filter: True if sample satisfies the user-given condition llm_condition_filter_result = "llm_condition_filter_result" # llm_* semantic ops: token/cost usage (dict with prompt_tokens, completion_tokens, total_tokens, cost_estimate) llm_semantic_usage = "llm_semantic_usage" # === image === aspect_ratios = "aspect_ratios" image_width = "image_width" image_height = "image_height" image_sizes = "image_sizes" face_ratios = "face_ratios" face_detections = "face_detections" face_counts = "face_counts" image_aesthetics_scores = "image_aesthetics_scores" image_nsfw_score = "image_nsfw_score" image_watermark_prob = "image_watermark_prob" image_pair_similarity = "image_pair_similarity" image_subplot_confidence = "image_subplot_confidence" horizontal_peak_count = "horizontal_peak_count" vertical_peak_count = "vertical_peak_count" subplot_detected = "subplot_detected" # === audios === audio_duration = "audio_duration" audio_nmf_snr = "audio_nmf_snr" audio_sizes = "audio_sizes" # === videos === video_duration = "video_duration" video_aspect_ratios = "video_aspect_ratios" video_width = "video_width" video_height = "video_height" video_ocr_area_ratio = "video_ocr_area_ratio" video_aesthetic_score = "video_aesthetic_score" video_frames_aesthetics_score = "video_frames_aesthetics_score" video_motion_score = "video_motion_score" video_nsfw_score = "video_nsfw_score" video_watermark_prob = "video_watermark_prob" # === multimodal === # image-text image_text_similarity = "image_text_similarity" image_text_matching_score = "image_text_matching_score" phrase_grounding_recall = "phrase_grounding_recall" # video-text video_frames_text_similarity = "video_frames_text_similarity" # general-field-filter general_field_filter_condition = "general_field_filter_condition"
[docs] class StatsKeys(object, metaclass=StatsKeysMeta): _constants_class = StatsKeysConstant
[docs] class HashKeys(object): uid = DEFAULT_PREFIX + "uid" hash = DEFAULT_PREFIX + "hash" minhash = DEFAULT_PREFIX + "minhash" simhash = DEFAULT_PREFIX + "simhash" line_hashes = DEFAULT_PREFIX + "line_hashes" # image imagehash = DEFAULT_PREFIX + "imagehash" # video videohash = DEFAULT_PREFIX + "videohash" # duplicate flag is_unique = DEFAULT_PREFIX + "is_unique"
[docs] class InterVars(object): # === text === lines = DEFAULT_PREFIX + "lines" words = DEFAULT_PREFIX + "words" refined_words = DEFAULT_PREFIX + "refined_words" # === image === loaded_images = DEFAULT_PREFIX + "loaded_images" # Image # === audios === loaded_audios = DEFAULT_PREFIX + "loaded_audios" # (data, sampling_rate) # === videos === # # InputContainer from av. # # Key: {video_path} loaded_videos = DEFAULT_PREFIX + "loaded_videos" # sampled frames. # # Key: {video_path}-{frame_sampling_method}[-{frame_num}] # # {frame_num} is only used when {frame_sampling_method} is "uniform" sampled_frames = DEFAULT_PREFIX + "sampled_frames"
[docs] class JobRequiredKeys(Enum): hook = "hook" meta_name = "meta_name" input = "input" output = "output" local = "local" dj_configs = "dj_configs" extra_configs = "extra_configs"