Source code for data_juicer_agents.tools.plan.build_process_spec.logic
# -*- coding: utf-8 -*-
"""Pure logic for build_process_spec."""
from __future__ import annotations
from typing import Any, Dict, Iterable
from .._shared.schema import ProcessSpec
from .._shared.process_spec import validate_process_spec_payload
[docs]
def build_process_spec(*, operators: Iterable[Any] | None) -> Dict[str, Any]:
if operators is None:
return {
"ok": False,
"error_type": "missing_required",
"message": "operators is required for build_process_spec",
"requires": ["operators"],
}
spec = ProcessSpec.from_dict({"operators": list(operators)})
errors, warnings = validate_process_spec_payload(spec)
return {
"ok": len(errors) == 0,
"process_spec": spec.to_dict(),
"validation_errors": errors,
"warnings": warnings,
"operator_names": [item.name for item in spec.operators],
"message": "process spec built" if not errors else "process spec build failed",
}
__all__ = ["build_process_spec"]