Source code for data_juicer_agents.tools.plan._shared.system_spec

# -*- coding: utf-8 -*-
"""Shared system-spec helpers for plan tools."""

from __future__ import annotations

from typing import Any, Dict, Iterable, List, Tuple

from .._shared.schema import SystemSpec

SYSTEM_SPEC_DEFERRED_WARNING = (
    "system spec inference deferred; using deterministic minimal profile"
)


def _normalize_string_list(values: Iterable[Any] | None) -> List[str]:
    normalized: List[str] = []
    seen = set()
    for item in values or []:
        text = str(item or "").strip()
        if not text or text in seen:
            continue
        normalized.append(text)
        seen.add(text)
    return normalized


def normalize_system_spec(
    system_spec: SystemSpec | Dict[str, Any] | None,
    *,
    custom_operator_paths: Iterable[Any] | None = None,
) -> SystemSpec:
    if isinstance(system_spec, SystemSpec):
        source = system_spec
    elif isinstance(system_spec, dict):
        source = SystemSpec.from_dict(system_spec)
    elif system_spec is None:
        source = SystemSpec()
    else:
        raise ValueError("system_spec must be a dict object")

    warnings = _normalize_string_list(source.warnings)
    if SYSTEM_SPEC_DEFERRED_WARNING not in warnings:
        warnings.append(SYSTEM_SPEC_DEFERRED_WARNING)

    return SystemSpec(
        executor_type=str(source.executor_type or "default").strip() or "default",
        np=max(int(source.np or 1), 1),
        open_tracer=bool(source.open_tracer),
        open_monitor=source.open_monitor if isinstance(source.open_monitor, bool) else None,
        use_cache=source.use_cache if isinstance(source.use_cache, bool) else None,
        skip_op_error=bool(source.skip_op_error),
        custom_operator_paths=(
            _normalize_string_list(custom_operator_paths)
            or _normalize_string_list(source.custom_operator_paths)
        ),
        warnings=warnings,
    )


[docs] def validate_system_spec_payload(system_spec: SystemSpec | Dict[str, Any]) -> Tuple[List[str], List[str]]: if isinstance(system_spec, dict): system_spec = SystemSpec.from_dict(system_spec) errors: List[str] = [] warnings: List[str] = [] if not system_spec.executor_type: errors.append("executor_type is required") if int(system_spec.np or 0) <= 0: errors.append("np must be >= 1") if SYSTEM_SPEC_DEFERRED_WARNING not in system_spec.warnings: warnings.append(SYSTEM_SPEC_DEFERRED_WARNING) warnings.extend([item for item in system_spec.warnings if item and item not in warnings]) return errors, warnings
__all__ = [ "SYSTEM_SPEC_DEFERRED_WARNING", "normalize_system_spec", "validate_system_spec_payload", ]