# -*- coding: utf-8 -*-
"""LLM generator for process operator lists used by CLI plan orchestration."""
from __future__ import annotations
import json
from typing import Any, Dict
from data_juicer_agents.utils.llm_gateway import call_model_json
[文档]
class ProcessOperatorGenerator:
"""Generate an operator list for staged plan assembly."""
[文档]
def __init__(
self,
*,
model_name: str,
api_key: str | None = None,
base_url: str | None = None,
thinking: bool | None = None,
):
self.model_name = str(model_name or "").strip()
self.api_key = str(api_key or "").strip() or None
self.base_url = str(base_url or "").strip() or None
self.thinking = thinking
@staticmethod
def _prompt(
*,
user_intent: str,
retrieval_payload: Dict[str, Any],
dataset_spec: Dict[str, Any],
dataset_profile: Dict[str, Any] | None = None,
) -> str:
candidates = retrieval_payload.get("candidates", [])
profile_payload = dataset_profile if isinstance(dataset_profile, dict) else {}
dataset_binding = {}
if isinstance(dataset_spec, dict):
binding = dataset_spec.get("binding", {})
if isinstance(binding, dict):
dataset_binding = binding
return (
"You generate only the operator list for a staged deterministic Data-Juicer planner.\n"
"Return JSON only with one key: operators.\n"
"operators must be a non-empty array of objects: {name: string, params: object}.\n"
"Use canonical operator names from retrieved candidates.\n"
"Fill concrete params whenever a threshold, mode, or explicit option is already known.\n"
"Do not include modality, text_keys, image_key, risk_notes, estimation, approval_required, workflow, or markdown.\n\n"
f"user_intent: {user_intent}\n"
f"dataset_binding:\n{json.dumps(dataset_binding, ensure_ascii=False, indent=2)}\n"
f"retrieved_candidates:\n{json.dumps(candidates, ensure_ascii=False, indent=2)}\n"
f"dataset_profile:\n{json.dumps(profile_payload, ensure_ascii=False, indent=2)}\n"
)
[文档]
def generate(
self,
*,
user_intent: str,
retrieval_payload: Dict[str, Any],
dataset_spec: Dict[str, Any],
dataset_profile: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
prompt = self._prompt(
user_intent=user_intent,
retrieval_payload=retrieval_payload,
dataset_spec=dataset_spec,
dataset_profile=dataset_profile,
)
payload = call_model_json(
self.model_name,
prompt,
api_key=self.api_key,
base_url=self.base_url,
thinking=self.thinking,
)
if not isinstance(payload, dict):
raise ValueError("planner operator output must be a JSON object")
operators = payload.get("operators", [])
if not isinstance(operators, list) or not operators:
raise ValueError("planner operator output must contain non-empty operators")
return {"operators": operators}
__all__ = ["ProcessOperatorGenerator"]