Source code for data_juicer_agents.tools.plan.validate_process_spec.logic
# -*- coding: utf-8 -*-
"""Pure logic for validate_process_spec."""
from __future__ import annotations
from typing import Any, Dict
from .._shared.schema import ProcessSpec
from .._shared.process_spec import validate_process_spec_payload
[docs]
def validate_process_spec(*, process_spec: Dict[str, Any]) -> Dict[str, Any]:
spec = ProcessSpec.from_dict(process_spec)
errors, warnings = validate_process_spec_payload(spec)
return {
"ok": len(errors) == 0,
"process_spec": spec.to_dict(),
"validation_errors": errors,
"warnings": warnings,
"operator_names": [item.name for item in spec.operators],
"message": "process spec is valid" if not errors else "process spec validation failed",
}
__all__ = ["validate_process_spec", "validate_process_spec_payload"]