Source code for data_juicer_agents.tools.plan.build_system_spec.logic
# -*- coding: utf-8 -*-
"""Pure logic for build_system_spec."""
from __future__ import annotations
from typing import Any, Dict, Iterable
from .._shared.normalize import normalize_string_list
from .._shared.schema import SystemSpec
from .._shared.system_spec import validate_system_spec_payload
def _load_dj_system_config() -> Dict[str, Any]:
"""Load complete system configuration from Data-Juicer dynamically.
Returns:
Dict of all system parameters with their default values from DJ
"""
try:
from data_juicer_agents.utils.dj_config_bridge import get_dj_config_bridge
bridge = get_dj_config_bridge()
return bridge.extract_system_config()
except Exception:
# Fallback to minimal defaults if DJ is not available
return {
"executor_type": "default",
"np": 1,
"open_tracer": False,
"open_monitor": None,
"use_cache": None,
"skip_op_error": False,
}
[docs]
def build_system_spec(
*,
custom_operator_paths: Iterable[Any] | None = None,
np: int | None = None,
executor_type: str | None = None,
**kwargs: Any
) -> Dict[str, Any]:
"""Build system spec with complete config dynamically loaded from Data-Juicer.
This function now loads ALL system configuration fields from Data-Juicer,
ensuring automatic sync with any upstream changes.
Args:
custom_operator_paths: Optional list of custom operator paths
np: Optional number of processes
executor_type: Optional executor type
**kwargs: Any additional system config options (must be valid DJ system
config fields — unknown keys will raise ValueError)
Returns:
Dict containing the built system spec and validation results
"""
# Load complete system config from Data-Juicer
dj_system_config = _load_dj_system_config()
# Override core parameters if provided
if custom_operator_paths is not None:
dj_system_config['custom_operator_paths'] = normalize_string_list(custom_operator_paths)
if np is not None:
dj_system_config['np'] = np
if executor_type is not None:
dj_system_config['executor_type'] = executor_type
# Merge kwargs
if kwargs:
unknown_keys = [k for k in kwargs if k not in dj_system_config]
if unknown_keys:
raise ValueError(
f"Unknown system config field(s): {unknown_keys}. "
f"Valid fields are: {sorted(dj_system_config.keys())}"
)
dj_system_config.update(kwargs)
# Create SystemSpec from DJ config (dynamically handles all fields)
spec = SystemSpec.from_dj_config(dj_system_config)
# Validate using DJ-aware validation
errors, warnings = validate_system_spec_payload(spec)
return {
"ok": len(errors) == 0,
"system_spec": spec.to_dict(),
"validation_errors": errors,
"warnings": warnings,
"message": "system spec built" if not errors else "system spec build failed",
}
__all__ = ["build_system_spec"]