# -*- coding: utf-8 -*-
"""Utilities for non-invasive custom operator scaffolding."""
from __future__ import annotations
import json
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from data_juicer_agents.utils.llm_gateway import call_model_json
_VALID_NAME_RE = re.compile(r"^[a-z][a-z0-9_]*$")
_SUFFIX_TO_TYPE = {"_mapper": "mapper", "_filter": "filter"}
_DEFAULT_DEV_MODEL = "qwen3-max-2026-01-23"
[docs]
@dataclass
class ScaffoldResult:
operator_name: str
operator_type: str
class_name: str
output_dir: Path
generated_files: List[str]
summary_path: Path
notes: List[str]
def _normalize_operator_name(name: str) -> str:
normalized = str(name or "").strip().lower()
normalized = normalized.replace("-", "_").replace(" ", "_")
normalized = re.sub(r"_+", "_", normalized)
return normalized
def _camel_case(name: str) -> str:
return "".join(part.capitalize() for part in name.split("_") if part)
def _resolve_operator_name_and_type(
operator_name: str,
operator_type: Optional[str],
) -> Tuple[str, str]:
name = _normalize_operator_name(operator_name)
if not name:
raise ValueError("operator-name is required")
if not _VALID_NAME_RE.match(name):
raise ValueError(
"operator-name must match ^[a-z][a-z0-9_]*$ after normalization"
)
inferred = None
for suffix, mapped_type in _SUFFIX_TO_TYPE.items():
if name.endswith(suffix):
inferred = mapped_type
break
if operator_type:
op_type = str(operator_type).strip().lower()
if op_type not in {"mapper", "filter"}:
raise ValueError("--type must be mapper or filter")
if inferred and inferred != op_type:
raise ValueError(
f"operator-name suffix implies {inferred} but --type={op_type}"
)
else:
op_type = inferred or "mapper"
if not inferred:
name = f"{name}_{op_type}"
return name, op_type
def _extract_retrieved_ops(from_retrieve_path: Optional[str]) -> List[str]:
if not from_retrieve_path:
return []
path = Path(from_retrieve_path)
if not path.exists():
raise ValueError(f"from-retrieve file not found: {path}")
payload = json.loads(path.read_text(encoding="utf-8"))
rows = payload.get("candidates", []) if isinstance(payload, dict) else []
names: List[str] = []
for row in rows:
if not isinstance(row, dict):
continue
name = str(row.get("operator_name", "")).strip()
if name:
names.append(name)
return names
def _build_design_notes(
intent: str,
operator_name: str,
operator_type: str,
retrieved_ops: List[str],
) -> Dict[str, str]:
prompt = (
"You are helping data engineers scaffold a new Data-Juicer operator.\n"
"Return JSON only with keys: purpose, behavior, default_params, caveats.\n"
"Each value should be a concise sentence.\n"
f"intent: {intent}\n"
f"operator_name: {operator_name}\n"
f"operator_type: {operator_type}\n"
f"retrieved_operators: {json.dumps(retrieved_ops, ensure_ascii=False)}\n"
)
model_name = _DEFAULT_DEV_MODEL
try:
data = call_model_json(model_name, prompt)
if isinstance(data, dict):
return {
"purpose": str(data.get("purpose", "")).strip(),
"behavior": str(data.get("behavior", "")).strip(),
"default_params": str(data.get("default_params", "")).strip(),
"caveats": str(data.get("caveats", "")).strip(),
}
except Exception:
pass
return {
"purpose": f"Implement intent-driven {operator_type} logic for: {intent}",
"behavior": "Provide a minimal, safe baseline implementation and preserve sample schema.",
"default_params": "Tune thresholds and keys based on real dataset samples.",
"caveats": "Generated code is a starting point; review and refine before production.",
}
def _mapper_template(
class_name: str,
operator_name: str,
intent: str,
notes: Dict[str, str],
) -> str:
return f'''# -*- coding: utf-8 -*-
"""Custom Data-Juicer mapper generated by djx dev.
Intent:
{intent}
Purpose:
{notes.get("purpose", "")}
"""
from data_juicer.ops.base_op import Mapper, OPERATORS
@OPERATORS.register_module("{operator_name}")
class {class_name}(Mapper):
"""{notes.get("behavior", "")}"""
def __init__(self, enabled: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enabled = enabled
def process_single(self, sample):
if not self.enabled:
return sample
# TODO: replace placeholder logic with intent-specific transformation.
text = sample.get(self.text_key, "")
if isinstance(text, str):
sample[self.text_key] = text.strip()
return sample
'''
def _filter_template(
class_name: str,
operator_name: str,
intent: str,
notes: Dict[str, str],
) -> str:
metric_key = f"{operator_name}_score"
return f'''# -*- coding: utf-8 -*-
"""Custom Data-Juicer filter generated by djx dev.
Intent:
{intent}
Purpose:
{notes.get("purpose", "")}
"""
from data_juicer.utils.constant import Fields
from data_juicer.ops.base_op import Filter, OPERATORS
@OPERATORS.register_module("{operator_name}")
class {class_name}(Filter):
"""{notes.get("behavior", "")}"""
_batched_op = True
def __init__(self, min_score: float = 0.0, max_score: float = 1.0, *args, **kwargs):
super().__init__(*args, **kwargs)
self.min_score = min_score
self.max_score = max_score
def compute_stats_batched(self, samples):
texts = samples.get(self.text_key, [])
stats = samples.get(Fields.stats, [])
for idx, stat in enumerate(stats):
text = texts[idx] if idx < len(texts) else ""
# TODO: replace placeholder metric with task-specific score computation.
stat["{metric_key}"] = float(isinstance(text, str) and len(text.strip()) > 0)
return samples
def process_batched(self, samples):
stats = samples[Fields.stats]
return map(
lambda stat: self.get_keep_boolean(stat["{metric_key}"], self.min_score, self.max_score),
stats,
)
'''
def _test_template(operator_name: str) -> str:
return f'''# -*- coding: utf-8 -*-
from data_juicer.ops.base_op import OPERATORS
def test_{operator_name}_registered():
assert "{operator_name}" in OPERATORS.modules
'''
def _summary_markdown(
intent: str,
operator_name: str,
operator_type: str,
class_name: str,
notes: Dict[str, str],
retrieved_ops: List[str],
) -> str:
retrieved_line = ", ".join(retrieved_ops) if retrieved_ops else "None"
return f"""# DJX Dev Summary
## Generated Operator
- `operator_name`: `{operator_name}`
- `operator_type`: `{operator_type}`
- `class_name`: `{class_name}`
- `intent`: `{intent}`
## Design Notes
- purpose: {notes.get("purpose", "")}
- behavior: {notes.get("behavior", "")}
- default_params: {notes.get("default_params", "")}
- caveats: {notes.get("caveats", "")}
## Retrieval Context
- candidates: {retrieved_line}
## Next Steps
1. Replace placeholder logic in `{operator_name}.py`.
2. Run unit tests for the generated operator test module.
3. Integrate by adding this directory to Data-Juicer custom operator paths.
"""
def _upsert_package_init(output_dir: Path, module_file: str, class_name: str) -> None:
init_path = output_dir / "__init__.py"
import_line = f"from .{module_file} import {class_name}\n"
if not init_path.exists():
init_path.write_text(import_line, encoding="utf-8")
return
current = init_path.read_text(encoding="utf-8")
if import_line not in current:
if current and not current.endswith("\n"):
current += "\n"
current += import_line
init_path.write_text(current, encoding="utf-8")
[docs]
def generate_operator_scaffold(
intent: str,
operator_name: str,
output_dir: str,
operator_type: Optional[str] = None,
from_retrieve_path: Optional[str] = None,
) -> ScaffoldResult:
if not intent.strip():
raise ValueError("intent is required")
resolved_name, resolved_type = _resolve_operator_name_and_type(
operator_name=operator_name,
operator_type=operator_type,
)
class_name = _camel_case(resolved_name)
out_dir = Path(output_dir).expanduser().resolve()
out_dir.mkdir(parents=True, exist_ok=True)
retrieved_ops = _extract_retrieved_ops(from_retrieve_path)
notes = _build_design_notes(
intent=intent,
operator_name=resolved_name,
operator_type=resolved_type,
retrieved_ops=retrieved_ops,
)
module_name = resolved_name
code_path = out_dir / f"{module_name}.py"
test_path = out_dir / f"test_{module_name}.py"
summary_path = out_dir / f"{module_name}_SUMMARY.md"
if resolved_type == "mapper":
content = _mapper_template(class_name, resolved_name, intent, notes)
else:
content = _filter_template(class_name, resolved_name, intent, notes)
code_path.write_text(content, encoding="utf-8")
test_path.write_text(_test_template(resolved_name), encoding="utf-8")
summary_path.write_text(
_summary_markdown(
intent=intent,
operator_name=resolved_name,
operator_type=resolved_type,
class_name=class_name,
notes=notes,
retrieved_ops=retrieved_ops,
),
encoding="utf-8",
)
_upsert_package_init(out_dir, module_name, class_name)
generated_files = [
str(code_path),
str(test_path),
str(summary_path),
str(out_dir / "__init__.py"),
]
return ScaffoldResult(
operator_name=resolved_name,
operator_type=resolved_type,
class_name=class_name,
output_dir=out_dir,
generated_files=generated_files,
summary_path=summary_path,
notes=[
"Non-invasive mode: no automatic install/registration was performed.",
"Use custom_operator_paths to load this operator during dj-process execution.",
],
)
[docs]
def run_smoke_check(scaffold: ScaffoldResult) -> Tuple[bool, str]:
smoke_dir = scaffold.output_dir / ".djx_dev_smoke"
smoke_dir.mkdir(parents=True, exist_ok=True)
dataset_path = smoke_dir / "input.jsonl"
dataset_path.write_text('{"text": "hello world"}\n', encoding="utf-8")
recipe_path = smoke_dir / "recipe.yaml"
recipe_path.write_text(
"\n".join(
[
"project_name: djx_dev_smoke",
f"dataset_path: {dataset_path}",
f"export_path: {smoke_dir / 'output.jsonl'}",
"custom_operator_paths:",
f" - {scaffold.output_dir}",
"process:",
f" - {scaffold.operator_name}: {{}}",
"",
]
),
encoding="utf-8",
)
command = ["dj-process", "--config", str(recipe_path)]
proc = subprocess.run(command, capture_output=True, text=True)
if proc.returncode == 0:
return True, "Smoke check passed."
stderr = (proc.stderr or "").strip()
if len(stderr) > 3000:
stderr = stderr[-3000:]
return False, f"Smoke check failed (exit={proc.returncode}).\n{stderr}"