Source code for data_juicer_agents.commands.apply_cmd

# -*- coding: utf-8 -*-
"""Implementation for `djx apply`."""

from __future__ import annotations

from pathlib import Path

import yaml

from data_juicer_agents.capabilities.apply.service import ApplyUseCase
from data_juicer_agents.commands.output_control import emit, emit_json, enabled


def _confirm(plan_data: dict) -> bool:
    print(f"About to execute plan: {str(plan_data.get('plan_id', '')).strip()}")
    print(f"Modality: {str(plan_data.get('modality', '')).strip()}")
    print(f"Dataset: {str(plan_data.get('dataset_path', '')).strip()}")
    print(f"Export: {str(plan_data.get('export_path', '')).strip()}")
    answer = input("Proceed? [y/N]: ").strip().lower()
    return answer in {"y", "yes"}


[docs] def run_apply(args) -> int: if args.timeout <= 0: print("timeout must be > 0") return 2 plan_path = Path(args.plan) if not plan_path.exists(): print(f"Plan file not found: {plan_path}") return 2 with open(plan_path, "r", encoding="utf-8") as f: plan_data = yaml.safe_load(f) if not isinstance(plan_data, dict): print(f"Plan file is not a mapping: {plan_path}") return 2 if not args.yes and not _confirm(plan_data): print("Execution canceled") return 1 runtime_dir = Path(".djx") / "recipes" executor = ApplyUseCase() result, returncode, stdout, stderr = executor.execute( plan_payload=plan_data, runtime_dir=runtime_dir, dry_run=args.dry_run, timeout_seconds=args.timeout, cancel_check=getattr(args, "cancel_check", None), ) interrupted = str(getattr(result, "error_type", "")).strip() == "interrupted" if interrupted: print("Execution interrupted by user.") if stdout and enabled(args, "verbose"): print("STDOUT:") print(stdout) if stderr and enabled(args, "verbose"): print("STDERR:") print(stderr) if enabled(args, "debug"): emit(args, "Debug apply payload:") emit_json(args, result.to_dict(), level="debug") print("Run Summary:") print(f"Execution ID: {result.execution_id}") print(f"Status: {result.status}") print(f"Recipe: {result.generated_recipe_path}") if result.error_type not in {"", "none"}: print(f"Error Type: {result.error_type}") if result.error_message: print(f"Error: {result.error_message}") return returncode