Source code for data_juicer_agents.tools.dev.develop_operator.scaffold

# -*- coding: utf-8 -*-
"""Utilities for non-invasive custom operator scaffolding."""

from __future__ import annotations

import json
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple

from data_juicer_agents.utils.llm_gateway import call_model_json


_VALID_NAME_RE = re.compile(r"^[a-z][a-z0-9_]*$")
_SUFFIX_TO_TYPE = {"_mapper": "mapper", "_filter": "filter"}
_DEFAULT_DEV_MODEL = "qwen3-max-2026-01-23"


[docs] @dataclass class ScaffoldResult: operator_name: str operator_type: str class_name: str output_dir: Path generated_files: List[str] summary_path: Path notes: List[str]
def _normalize_operator_name(name: str) -> str: normalized = str(name or "").strip().lower() normalized = normalized.replace("-", "_").replace(" ", "_") normalized = re.sub(r"_+", "_", normalized) return normalized def _camel_case(name: str) -> str: return "".join(part.capitalize() for part in name.split("_") if part) def _resolve_operator_name_and_type( operator_name: str, operator_type: Optional[str], ) -> Tuple[str, str]: name = _normalize_operator_name(operator_name) if not name: raise ValueError("operator-name is required") if not _VALID_NAME_RE.match(name): raise ValueError( "operator-name must match ^[a-z][a-z0-9_]*$ after normalization" ) inferred = None for suffix, mapped_type in _SUFFIX_TO_TYPE.items(): if name.endswith(suffix): inferred = mapped_type break if operator_type: op_type = str(operator_type).strip().lower() if op_type not in {"mapper", "filter"}: raise ValueError("--type must be mapper or filter") if inferred and inferred != op_type: raise ValueError( f"operator-name suffix implies {inferred} but --type={op_type}" ) else: op_type = inferred or "mapper" if not inferred: name = f"{name}_{op_type}" return name, op_type def _extract_retrieved_ops(from_retrieve_path: Optional[str]) -> List[str]: if not from_retrieve_path: return [] path = Path(from_retrieve_path) if not path.exists(): raise ValueError(f"from-retrieve file not found: {path}") payload = json.loads(path.read_text(encoding="utf-8")) rows = payload.get("candidates", []) if isinstance(payload, dict) else [] names: List[str] = [] for row in rows: if not isinstance(row, dict): continue name = str(row.get("operator_name", "")).strip() if name: names.append(name) return names def _build_design_notes( intent: str, operator_name: str, operator_type: str, retrieved_ops: List[str], ) -> Dict[str, str]: prompt = ( "You are helping data engineers scaffold a new Data-Juicer operator.\n" "Return JSON only with keys: purpose, behavior, default_params, caveats.\n" "Each value should be a concise sentence.\n" f"intent: {intent}\n" f"operator_name: {operator_name}\n" f"operator_type: {operator_type}\n" f"retrieved_operators: {json.dumps(retrieved_ops, ensure_ascii=False)}\n" ) model_name = _DEFAULT_DEV_MODEL try: data = call_model_json(model_name, prompt) if isinstance(data, dict): return { "purpose": str(data.get("purpose", "")).strip(), "behavior": str(data.get("behavior", "")).strip(), "default_params": str(data.get("default_params", "")).strip(), "caveats": str(data.get("caveats", "")).strip(), } except Exception: pass return { "purpose": f"Implement intent-driven {operator_type} logic for: {intent}", "behavior": "Provide a minimal, safe baseline implementation and preserve sample schema.", "default_params": "Tune thresholds and keys based on real dataset samples.", "caveats": "Generated code is a starting point; review and refine before production.", } def _mapper_template( class_name: str, operator_name: str, intent: str, notes: Dict[str, str], ) -> str: return f'''# -*- coding: utf-8 -*- """Custom Data-Juicer mapper generated by djx dev. Intent: {intent} Purpose: {notes.get("purpose", "")} """ from data_juicer.ops.base_op import Mapper, OPERATORS @OPERATORS.register_module("{operator_name}") class {class_name}(Mapper): """{notes.get("behavior", "")}""" def __init__(self, enabled: bool = True, *args, **kwargs): super().__init__(*args, **kwargs) self.enabled = enabled def process_single(self, sample): if not self.enabled: return sample # TODO: replace placeholder logic with intent-specific transformation. text = sample.get(self.text_key, "") if isinstance(text, str): sample[self.text_key] = text.strip() return sample ''' def _filter_template( class_name: str, operator_name: str, intent: str, notes: Dict[str, str], ) -> str: metric_key = f"{operator_name}_score" return f'''# -*- coding: utf-8 -*- """Custom Data-Juicer filter generated by djx dev. Intent: {intent} Purpose: {notes.get("purpose", "")} """ from data_juicer.utils.constant import Fields from data_juicer.ops.base_op import Filter, OPERATORS @OPERATORS.register_module("{operator_name}") class {class_name}(Filter): """{notes.get("behavior", "")}""" _batched_op = True def __init__(self, min_score: float = 0.0, max_score: float = 1.0, *args, **kwargs): super().__init__(*args, **kwargs) self.min_score = min_score self.max_score = max_score def compute_stats_batched(self, samples): texts = samples.get(self.text_key, []) stats = samples.get(Fields.stats, []) for idx, stat in enumerate(stats): text = texts[idx] if idx < len(texts) else "" # TODO: replace placeholder metric with task-specific score computation. stat["{metric_key}"] = float(isinstance(text, str) and len(text.strip()) > 0) return samples def process_batched(self, samples): stats = samples[Fields.stats] return map( lambda stat: self.get_keep_boolean(stat["{metric_key}"], self.min_score, self.max_score), stats, ) ''' def _test_template(operator_name: str) -> str: return f'''# -*- coding: utf-8 -*- from data_juicer.ops.base_op import OPERATORS def test_{operator_name}_registered(): assert "{operator_name}" in OPERATORS.modules ''' def _summary_markdown( intent: str, operator_name: str, operator_type: str, class_name: str, notes: Dict[str, str], retrieved_ops: List[str], ) -> str: retrieved_line = ", ".join(retrieved_ops) if retrieved_ops else "None" return f"""# DJX Dev Summary ## Generated Operator - `operator_name`: `{operator_name}` - `operator_type`: `{operator_type}` - `class_name`: `{class_name}` - `intent`: `{intent}` ## Design Notes - purpose: {notes.get("purpose", "")} - behavior: {notes.get("behavior", "")} - default_params: {notes.get("default_params", "")} - caveats: {notes.get("caveats", "")} ## Retrieval Context - candidates: {retrieved_line} ## Next Steps 1. Replace placeholder logic in `{operator_name}.py`. 2. Run unit tests for the generated operator test module. 3. Integrate by adding this directory to Data-Juicer custom operator paths. """ def _upsert_package_init(output_dir: Path, module_file: str, class_name: str) -> None: init_path = output_dir / "__init__.py" import_line = f"from .{module_file} import {class_name}\n" if not init_path.exists(): init_path.write_text(import_line, encoding="utf-8") return current = init_path.read_text(encoding="utf-8") if import_line not in current: if current and not current.endswith("\n"): current += "\n" current += import_line init_path.write_text(current, encoding="utf-8")
[docs] def generate_operator_scaffold( intent: str, operator_name: str, output_dir: str, operator_type: Optional[str] = None, from_retrieve_path: Optional[str] = None, ) -> ScaffoldResult: if not intent.strip(): raise ValueError("intent is required") resolved_name, resolved_type = _resolve_operator_name_and_type( operator_name=operator_name, operator_type=operator_type, ) class_name = _camel_case(resolved_name) out_dir = Path(output_dir).expanduser().resolve() out_dir.mkdir(parents=True, exist_ok=True) retrieved_ops = _extract_retrieved_ops(from_retrieve_path) notes = _build_design_notes( intent=intent, operator_name=resolved_name, operator_type=resolved_type, retrieved_ops=retrieved_ops, ) module_name = resolved_name code_path = out_dir / f"{module_name}.py" test_path = out_dir / f"test_{module_name}.py" summary_path = out_dir / f"{module_name}_SUMMARY.md" if resolved_type == "mapper": content = _mapper_template(class_name, resolved_name, intent, notes) else: content = _filter_template(class_name, resolved_name, intent, notes) code_path.write_text(content, encoding="utf-8") test_path.write_text(_test_template(resolved_name), encoding="utf-8") summary_path.write_text( _summary_markdown( intent=intent, operator_name=resolved_name, operator_type=resolved_type, class_name=class_name, notes=notes, retrieved_ops=retrieved_ops, ), encoding="utf-8", ) _upsert_package_init(out_dir, module_name, class_name) generated_files = [ str(code_path), str(test_path), str(summary_path), str(out_dir / "__init__.py"), ] return ScaffoldResult( operator_name=resolved_name, operator_type=resolved_type, class_name=class_name, output_dir=out_dir, generated_files=generated_files, summary_path=summary_path, notes=[ "Non-invasive mode: no automatic install/registration was performed.", "Use custom_operator_paths to load this operator during dj-process execution.", ], )
[docs] def run_smoke_check(scaffold: ScaffoldResult) -> Tuple[bool, str]: smoke_dir = scaffold.output_dir / ".djx_dev_smoke" smoke_dir.mkdir(parents=True, exist_ok=True) dataset_path = smoke_dir / "input.jsonl" dataset_path.write_text('{"text": "hello world"}\n', encoding="utf-8") recipe_path = smoke_dir / "recipe.yaml" recipe_path.write_text( "\n".join( [ "project_name: djx_dev_smoke", f"dataset_path: {dataset_path}", f"export_path: {smoke_dir / 'output.jsonl'}", "custom_operator_paths:", f" - {scaffold.output_dir}", "process:", f" - {scaffold.operator_name}: {{}}", "", ] ), encoding="utf-8", ) command = ["dj-process", "--config", str(recipe_path)] proc = subprocess.run(command, capture_output=True, text=True) if proc.returncode == 0: return True, "Smoke check passed." stderr = (proc.stderr or "").strip() if len(stderr) > 3000: stderr = stderr[-3000:] return False, f"Smoke check failed (exit={proc.returncode}).\n{stderr}"