Source code for data_juicer_agents.tools.plan._shared.process_spec

# -*- coding: utf-8 -*-
"""Shared process-spec helpers for plan tools."""

from __future__ import annotations

from typing import Any, Dict, List, Tuple

from .normalize import normalize_params
from .schema import ProcessOperator, ProcessSpec

PROCESS_SPEC_DEFERRED_WARNING = (
    "operator parameter validation deferred; runtime errors will be used as the repair signal"
)


def normalize_process_spec(process_spec: ProcessSpec | Dict[str, Any]) -> ProcessSpec:
    """Normalize process spec: strip names, ensure params are dicts."""
    if isinstance(process_spec, ProcessSpec):
        source = process_spec
    elif isinstance(process_spec, dict):
        source = ProcessSpec.from_dict(process_spec)
    else:
        raise ValueError("process_spec must be a dict object")

    operators: List[ProcessOperator] = []
    for item in source.operators:
        raw_name = str(item.name or "").strip()
        if not raw_name:
            continue
        operators.append(
            ProcessOperator(name=raw_name, params=normalize_params(item.params))
        )

    spec = ProcessSpec(operators=operators)
    if not spec.operators:
        raise ValueError("process_spec.operators must contain at least one operator")
    return spec


[docs] def validate_process_spec_payload( process_spec: ProcessSpec | Dict[str, Any], ) -> Tuple[List[str], List[str]]: """Validate process spec structure and operator names/params via DJ bridge.""" if isinstance(process_spec, dict): process_spec = ProcessSpec.from_dict(process_spec) errors: List[str] = [] warnings: List[str] = [] # Basic structural validation if not process_spec.operators: errors.append("operators must not be empty") for idx, op in enumerate(process_spec.operators): if not op.name: errors.append(f"operators[{idx}].name is required") if not isinstance(op.params, dict): errors.append(f"operators[{idx}].params must be an object") # DJ bridge validation (two steps) try: from data_juicer_agents.utils.dj_config_bridge import get_dj_config_bridge bridge = get_dj_config_bridge() # Step 1: op_registry validation (dj-agents-side business logic) # ProcessSpec structure is natural for this: use op.name / op.params directly op_names = {op.name for op in process_spec.operators if op.name} op_param_map, known_op_names = bridge.get_op_valid_params(op_names) for idx, op in enumerate(process_spec.operators): if not op.name: continue if op.name not in known_op_names: errors.append(f"operators[{idx}]: unknown operator '{op.name}'") elif op.name in op_param_map: for param_key in (op.params or {}): if param_key not in op_param_map[op.name]: errors.append( f"operators[{idx}].{op.name}: unknown param '{param_key}'" ) except Exception: warnings.append( "operator name/param validation skipped: DJ bridge unavailable" ) if PROCESS_SPEC_DEFERRED_WARNING not in warnings: warnings.append(PROCESS_SPEC_DEFERRED_WARNING) return errors, warnings
__all__ = [ "PROCESS_SPEC_DEFERRED_WARNING", "normalize_process_spec", "validate_process_spec_payload", ]