Source code for data_juicer.tools.DJ_mcp_recipe_flow

import argparse
import os
from typing import Dict, List, Optional

from mcp.server.fastmcp import FastMCP

from data_juicer.tools.mcp_tool import execute_analyze, execute_op
from data_juicer.tools.op_search import OPSearcher

# Operator Management
ops_list_path = os.getenv("DJ_OPS_LIST_PATH", None)
if ops_list_path:
    with open(ops_list_path, "r", encoding="utf-8") as file:
        ops_list = [line.strip() for line in file if line.strip()]
else:
    ops_list = None
searcher = OPSearcher(ops_list)


[docs] def get_global_config_schema() -> dict: """ Get the full schema of all available global configuration options for Data-Juicer. Returns a dictionary where each key is a config parameter name, and the value is a dict containing: - type: the expected type of the parameter (e.g. "bool", "int", "str") - default: the default value - description: a human-readable description of the parameter Use this tool to discover what configuration options can be passed to run_data_recipe via the extra_config parameter. This dynamically reflects the latest Data-Juicer configuration, so it will always be up-to-date even as new config options are added. :returns: A dict mapping config parameter names to their schema info """ from data_juicer.config.config import build_base_parser parser = build_base_parser() if parser is None: return {"error": "Failed to initialize config parser"} # Internal parameters that should not be exposed to users excluded_params = { "config", "auto", "help", "print_config", } schema = {} for action in parser._actions: # Skip suppressed or internal actions if not action.option_strings: continue # Use the longest option string as the parameter name param_name = max(action.option_strings, key=len).lstrip("-") dest = action.dest if dest in excluded_params or param_name in excluded_params: continue # Determine type name type_name = "str" if action.type is not None: if hasattr(action.type, "__name__"): type_name = action.type.__name__ elif hasattr(action.type, "__class__"): type_name = str(action.type) else: type_name = str(action.type) elif isinstance(action.const, bool): type_name = "bool" # Handle choices choices = None if action.choices: choices = list(action.choices) entry = { "type": type_name, "default": action.default, "description": action.help or "", } if choices: entry["choices"] = choices schema[param_name] = entry return schema
[docs] def get_dataset_load_strategies() -> dict: """ Get all available dataset loading strategies supported by Data-Juicer. Returns information about each strategy including its executor type, data type, data source, required/optional configuration fields, and description. Use this tool to understand how to configure the 'dataset' parameter in run_data_recipe for different data sources (e.g., local files, HuggingFace, S3, ModelScope, etc.). The 'dataset' parameter in run_data_recipe accepts a dict with: - configs: a list of dataset config dicts, each containing a 'type' field that maps to a data source strategy (e.g., 'local', 'huggingface') - max_sample_num: optional max number of samples to load Each dataset config dict should follow the required/optional fields described in the returned strategy information. :returns: A dict mapping strategy identifiers to their configuration info """ from data_juicer.core.data.load_strategy import DataLoadStrategyRegistry strategies_info = {} for strategy_key, strategy_class in DataLoadStrategyRegistry._strategies.items(): identifier = f"{strategy_key.executor_type}/" f"{strategy_key.data_type}/" f"{strategy_key.data_source}" # Extract CONFIG_VALIDATION_RULES if available validation_rules = getattr(strategy_class, "CONFIG_VALIDATION_RULES", {}) # Extract class docstring description = strategy_class.__doc__ or "" description = description.strip() entry = { "executor_type": strategy_key.executor_type, "data_type": strategy_key.data_type, "data_source": strategy_key.data_source, "description": description, "class_name": strategy_class.__name__, } if validation_rules: entry["required_fields"] = validation_rules.get("required_fields", []) entry["optional_fields"] = validation_rules.get("optional_fields", []) # Convert field_types to string representation for serialization field_types = validation_rules.get("field_types", {}) entry["field_types"] = { key: (val.__name__ if hasattr(val, "__name__") else str(val)) for key, val in field_types.items() } strategies_info[identifier] = entry return strategies_info
[docs] def search_ops( query: Optional[str] = None, op_type: Optional[str] = None, tags: Optional[List[str]] = None, match_all: bool = True, search_mode: str = "tags", top_k: int = 10, ) -> dict: """ Search for available data processing operators. Operators are a collection of basic processes that assist in data modification, cleaning, filtering, deduplication, etc. Supports multiple search modes: - "basic": filter by op_type and/or tags (default, original behavior). If both tags and op_type are None, returns all operators. - "regex": Python regex pattern matching against OP names, descriptions, and parameters. Requires the query parameter. - "bm25": BM25 text relevance ranking for natural language queries. Returns top_k most relevant operators. Requires the query parameter. op_type and tags can be combined with any search_mode as additional filters to narrow down results. The following `op_type` values are supported: - aggregator: Aggregate for batched samples, such as summary or conclusion. - deduplicator: Detects and removes duplicate samples. - filter: Filters out low-quality samples. - grouper: Group samples to batched samples. - mapper: Edits and transforms samples. - selector: Selects top samples based on ranking. - pipeline: Applies dataset-level processing; both input and output are datasets. The `tags` parameter specifies the characteristics of the data or the required resources. Available tags are: Modality Tags: - text: process text data specifically. - image: process image data specifically. - audio: process audio data specifically. - video: process video data specifically. - multimodal: process multimodal data. Resource Tags: - cpu: only requires CPU resource. - gpu: requires GPU/CUDA resource as well. Model Tags: - api: equipped with API-based models (e.g. ChatGPT, GPT-4o). - vllm: equipped with models supported by vLLM. - hf: equipped with models from HuggingFace Hub. :param query: Search query string. Required for "regex" and "bm25" modes. For "regex" mode, this should be a Python regex pattern. For "bm25" mode, this should be a natural language description of the desired functionality. :param op_type: The type of data processing operator to filter by. If None, no type-based filtering is applied. Defaults to None. :param tags: An optional list of tags to filter operators. If None, no tag-based filtering is applied. Defaults to None. :param match_all: If True, only operators matching all specified tags are returned. If False, operators matching any tag are returned. Defaults to True. :param search_mode: The search strategy to use. One of "tags", "regex", or "bm25". Defaults to "tags". :param top_k: Maximum number of results to return for "bm25" mode. Defaults to 10. Ignored for other modes. :returns: A dict containing detailed information about the matched operators, keyed by operator name. """ if search_mode == "regex": if not query: return {"error": "query is required for regex search mode"} op_results = searcher.search_by_regex( query=query, tags=tags, op_type=op_type, match_all=match_all, ) elif search_mode == "bm25": if not query: return {"error": "query is required for bm25 search mode"} op_results = searcher.search_by_bm25( query=query, top_k=top_k, tags=tags, op_type=op_type, match_all=match_all, ) else: # Default "tags" mode: filter by op_type and tags op_results = searcher.search(tags=tags, op_type=op_type, match_all=match_all) ops_dict = {} for op in op_results: ops_dict[op["name"]] = "\n".join([op["desc"], op["param_desc"], "Parameters: ", str(op["sig"])]) return ops_dict
[docs] def run_data_recipe( process: list[Dict], dataset_path: Optional[str] = None, dataset: Optional[Dict] = None, export_path: Optional[str] = None, np: int = 1, extra_config: Optional[Dict] = None, ) -> str: """ Run a data processing recipe using Data-Juicer operators. If you want to run one or more DataJuicer data processing operators, use this tool. Supported operators and their arguments should be obtained through the `search_ops` tool. For advanced configuration options (e.g., enabling tracing, op fusion, checkpoint, multimodal keys, etc.), first call `get_global_config_schema` to discover available options, then pass them via `extra_config`. For loading datasets from different sources (e.g., HuggingFace, S3), first call `get_dataset_load_strategies` to discover available loading strategies and their required fields, then pass the configuration via the `dataset` parameter. :param process: List of processing operations to be executed sequentially. Each element is a dictionary with operator name as key and its configuration as value. :param dataset_path: Path to the dataset to be processed. This is the simplest way to specify input data (local file path). :param dataset: Optional dataset configuration dict for advanced data loading. Supports multiple data sources (local, HuggingFace, S3, etc.). Format follows Data-Juicer's dataset config schema: {"configs": [{"type": "local", "path": "..."}, ...], "max_sample_num": 10000} Use `get_dataset_load_strategies` to discover available options. When provided alongside dataset_path, both are passed to Data-Juicer (dataset_path serves as a fallback). :param export_path: Path to export the processed dataset. Defaults to None, which exports to './outputs' directory. :param np: Number of processes to use. Defaults to 1. :param extra_config: Optional dict of additional global configuration options. Use `get_global_config_schema` to discover all available options. Example: {"open_tracer": true, "trace_num": 20, "op_fusion": true, "text_keys": "instruction"} Example: # Basic usage: filter text samples >>> run_data_recipe( ... "/path/to/dataset.jsonl", ... [{"text_length_filter": {"min_len": 10, "max_len": 50}}] ... ) # Advanced usage with tracing and HuggingFace dataset >>> run_data_recipe( ... dataset_path="", ... process=[{"language_id_score_filter": {"lang": "en"}}], ... dataset={ ... "configs": [{ ... "type": "huggingface", ... "path": "tatsu-lab/alpaca", ... "split": "train" ... }] ... }, ... extra_config={ ... "open_tracer": True, ... "trace_num": 20, ... "text_keys": "instruction" ... } ... ) """ dj_cfg = { "dataset_path": dataset_path, "process": process, "export_path": export_path, "np": np, } if dataset is not None: dj_cfg["dataset"] = dataset if extra_config is not None: for key, value in extra_config.items(): dj_cfg[key] = value return execute_op(dj_cfg)
[docs] def analyze_dataset( process: list[Dict], dataset_path: Optional[str] = None, dataset: Optional[Dict] = None, export_path: Optional[str] = None, np: int = 1, percentiles: Optional[List[float]] = None, extra_config: Optional[Dict] = None, ) -> str: """ Analyze a dataset using Data-Juicer's Analyzer pipeline. This tool computes statistics for the specified filter and tagging operators on the dataset, then performs overall analysis, column-wise analysis, and correlation analysis. It generates stats tables and distribution figures to help understand the dataset characteristics before applying actual data processing. This is the equivalent of the ``dj-analyze`` command. Use it to understand your dataset's quality distribution, identify outliers, and determine appropriate filter thresholds before running ``run_data_recipe``. Supported operators and their arguments should be obtained through the ``search_ops`` tool. Only filter-type and tagging-type operators will produce meaningful analysis results. :param process: List of filter/tagging operations to compute stats for. Each element is a dictionary with operator name as key and its configuration as value. Only filter and tagging operators produce analysis stats. :param dataset_path: Path to the dataset to be analyzed. This is the simplest way to specify input data (local file path). :param dataset: Optional dataset configuration dict for advanced data loading. Same format as in run_data_recipe. :param export_path: Path to export the analyzed dataset with stats. Defaults to None, which exports to './outputs' directory. :param np: Number of processes to use. Defaults to 1. :param percentiles: List of percentiles to compute for the dataset distribution analysis. Defaults to [0.25, 0.5, 0.75]. :param extra_config: Optional dict of additional global configuration options. Use ``get_global_config_schema`` to discover all available options. Analysis-specific options include: - export_original_dataset (bool): whether to export the original dataset with stats (default: False) - save_stats_in_one_file (bool): whether to save all stats into one file (default: False) Example: # Analyze text length and language distribution >>> analyze_dataset( ... dataset_path="/path/to/dataset.jsonl", ... process=[ ... {"text_length_filter": {"min_len": 10, "max_len": 1000}}, ... {"language_id_score_filter": {"lang": "en"}} ... ], ... percentiles=[0.1, 0.25, 0.5, 0.75, 0.9] ... ) :returns: A message indicating where the analysis results are saved, including the export path and the analysis directory containing stats tables and distribution figures. """ dj_cfg = { "dataset_path": dataset_path, "process": process, "export_path": export_path, "np": np, } if percentiles is not None: dj_cfg["percentiles"] = percentiles if dataset is not None: dj_cfg["dataset"] = dataset if extra_config is not None: for key, value in extra_config.items(): dj_cfg[key] = value return execute_analyze(dj_cfg)
[docs] def create_mcp_server(port: str = "8000"): """ Creates the FastMCP server and registers the tools. Args: port (str, optional): Port number. Defaults to "8000". """ mcp = FastMCP("Data-Juicer Server", port=port) mcp.tool()(get_global_config_schema) mcp.tool()(get_dataset_load_strategies) mcp.tool()(search_ops) mcp.tool()(run_data_recipe) mcp.tool()(analyze_dataset) return mcp
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Data-Juicer MCP Server") parser.add_argument( "--port", type=str, default="8000", help="Port number for the MCP server", ) args = parser.parse_args() mcp = create_mcp_server(port=args.port) mcp.run(transport=os.getenv("SERVER_TRANSPORT", "sse"))