data_juicer_agents.tools.apply.apply_recipe#
apply_recipe tool package.
- class data_juicer_agents.tools.apply.apply_recipe.ApplyRecipeInput(*, plan_path: str, dry_run: bool = False, timeout: Annotated[int, Ge(ge=1)] = 300, confirm: bool = False)[source]#
Bases:
BaseModel- plan_path: str#
- dry_run: bool#
- timeout: int#
- confirm: bool#
- model_config = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class data_juicer_agents.tools.apply.apply_recipe.ApplyResult(execution_id: str, plan_id: str, start_time: str, end_time: str, duration_seconds: float, model_info: Dict[str, str], generated_recipe_path: str, command: str, status: str, artifacts: Dict[str, str], error_type: str, error_message: str, retry_level: str, next_actions: List[str])[source]#
Bases:
objectExecution summary for one plan apply run.
- execution_id: str#
- plan_id: str#
- start_time: str#
- end_time: str#
- duration_seconds: float#
- model_info: Dict[str, str]#
- generated_recipe_path: str#
- command: str#
- status: str#
- artifacts: Dict[str, str]#
- error_type: str#
- error_message: str#
- retry_level: str#
- next_actions: List[str]#
- __init__(execution_id: str, plan_id: str, start_time: str, end_time: str, duration_seconds: float, model_info: Dict[str, str], generated_recipe_path: str, command: str, status: str, artifacts: Dict[str, str], error_type: str, error_message: str, retry_level: str, next_actions: List[str]) None#
- class data_juicer_agents.tools.apply.apply_recipe.ApplyUseCase[source]#
Bases:
objectExecute validated plans and return execution summaries.
- execute(plan_payload: Dict[str, Any], runtime_dir: Path, dry_run: bool = False, timeout_seconds: int = 300, command_override: str | Iterable[str] | None = None, cancel_check: Callable[[], bool] | None = None) Tuple[ApplyResult, int, str, str][source]#