Source code for data_juicer_agents.tools.plan._shared.process_spec
# -*- coding: utf-8 -*-
"""Shared process-spec helpers for plan tools."""
from __future__ import annotations
from typing import Any, Dict, List, Tuple
from .._shared.schema import ProcessOperator, ProcessSpec
PROCESS_SPEC_DEFERRED_WARNING = (
"operator parameter validation deferred; runtime errors will be used as the repair signal"
)
def _normalize_params(value: Any) -> Dict[str, Any]:
return dict(value) if isinstance(value, dict) else {}
def normalize_process_spec(process_spec: ProcessSpec | Dict[str, Any]) -> ProcessSpec:
if isinstance(process_spec, ProcessSpec):
source = process_spec
elif isinstance(process_spec, dict):
source = ProcessSpec.from_dict(process_spec)
else:
raise ValueError("process_spec must be a dict object")
operators: List[ProcessOperator] = []
for item in source.operators:
raw_name = str(item.name or "").strip()
if not raw_name:
continue
operators.append(ProcessOperator(name=raw_name, params=_normalize_params(item.params)))
spec = ProcessSpec(operators=operators)
if not spec.operators:
raise ValueError("process_spec.operators must contain at least one operator")
return spec
[docs]
def validate_process_spec_payload(
process_spec: ProcessSpec | Dict[str, Any],
) -> Tuple[List[str], List[str]]:
if isinstance(process_spec, dict):
process_spec = ProcessSpec.from_dict(process_spec)
errors: List[str] = []
warnings: List[str] = []
if not process_spec.operators:
errors.append("operators must not be empty")
for idx, op in enumerate(process_spec.operators):
if not op.name:
errors.append(f"operators[{idx}].name is required")
if not isinstance(op.params, dict):
errors.append(f"operators[{idx}].params must be an object")
if PROCESS_SPEC_DEFERRED_WARNING not in warnings:
warnings.append(PROCESS_SPEC_DEFERRED_WARNING)
return errors, warnings
__all__ = [
"PROCESS_SPEC_DEFERRED_WARNING",
"normalize_process_spec",
"validate_process_spec_payload",
]