data_juicer_agents.tools.plan#

Plan tools and deterministic planner helpers.

class data_juicer_agents.tools.plan.AssemblePlanInput(*, intent: str, dataset_spec: Dict[str, Any], process_spec: Dict[str, Any], system_spec: Dict[str, Any], approval_required: bool = True)[source]#

Bases: BaseModel

intent: str#
dataset_spec: Dict[str, Any]#
process_spec: Dict[str, Any]#
system_spec: Dict[str, Any]#
approval_required: bool#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.BuildDatasetSpecInput(*, intent: str, export_path: str, dataset_source: DatasetSource, dataset_profile: Dict[str, ~typing.Any]=<factory>, modality_hint: str = '', text_keys_hint: List[str] = <factory>, image_key_hint: str = '', audio_key_hint: str = '', video_key_hint: str = '', image_bytes_key_hint: str = '', **extra_data: Any)[source]#

Bases: BaseModel

model_config = {'extra': 'allow'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

intent: str#
export_path: str#
dataset_source: DatasetSource#
dataset_profile: Dict[str, Any]#
modality_hint: str#
text_keys_hint: List[str]#
image_key_hint: str#
audio_key_hint: str#
video_key_hint: str#
image_bytes_key_hint: str#
class data_juicer_agents.tools.plan.BuildProcessSpecInput(*, operators: List[ProcessOperatorInput])[source]#

Bases: BaseModel

operators: List[ProcessOperatorInput]#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.BuildSystemSpecInput(*, np: int | None = None, executor_type: str | None = None, custom_operator_paths: List[str] = <factory>, **extra_data: Any)[source]#

Bases: BaseModel

Input for building system spec.

Core parameters are exposed directly for common use cases. All other system parameters can be passed as additional kwargs. Use list_system_config tool to discover all available options.

model_config = {'extra': 'allow'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

np: int | None#
executor_type: str | None#
custom_operator_paths: List[str]#
class data_juicer_agents.tools.plan.DatasetBindingSpec(modality: str = 'unknown', text_keys: List[str] = <factory>, image_key: str | None = None, audio_key: str | None = None, video_key: str | None = None, image_bytes_key: str | None = None)[source]#

Bases: object

Shared/default field binding layer for the recipe.

modality: str = 'unknown'#
text_keys: List[str]#
image_key: str | None = None#
audio_key: str | None = None#
video_key: str | None = None#
image_bytes_key: str | None = None#
classmethod from_dict(data: Dict[str, Any]) DatasetBindingSpec[source]#
to_dict() Dict[str, Any][source]#
__init__(modality: str = 'unknown', text_keys: List[str] = <factory>, image_key: str | None = None, audio_key: str | None = None, video_key: str | None = None, image_bytes_key: str | None = None) None#
class data_juicer_agents.tools.plan.DatasetIOSpec(dataset_path: str = '', dataset: DatasetObjectConfig | None = None, generated_dataset_config: GeneratedDatasetConfig | None = None, export_path: str = '', _extra_fields: Dict[str, ~typing.Any]=<factory>)[source]#

Bases: object

Dataset input/output shape used by the recipe.

dataset_path: str = ''#
dataset: DatasetObjectConfig | None = None#
generated_dataset_config: GeneratedDatasetConfig | None = None#
export_path: str = ''#
classmethod from_dict(data: Dict[str, Any]) DatasetIOSpec[source]#
to_dict() Dict[str, Any][source]#
__init__(dataset_path: str = '', dataset: DatasetObjectConfig | None = None, generated_dataset_config: GeneratedDatasetConfig | None = None, export_path: str = '', _extra_fields: Dict[str, ~typing.Any]=<factory>) None#
class data_juicer_agents.tools.plan.DatasetSpec(io: DatasetIOSpec = <factory>, binding: DatasetBindingSpec = <factory>, warnings: List[str] = <factory>)[source]#

Bases: object

Dataset IO and binding spec.

io: DatasetIOSpec#
binding: DatasetBindingSpec#
warnings: List[str]#
classmethod from_dict(data: Dict[str, Any]) DatasetSpec[source]#
to_dict() Dict[str, Any][source]#
__init__(io: DatasetIOSpec = <factory>, binding: DatasetBindingSpec = <factory>, warnings: List[str] = <factory>) None#
class data_juicer_agents.tools.plan.PlanContext(user_intent: str, dataset_path: str, export_path: str, custom_operator_paths: List[str] = <factory>)[source]#

Bases: object

Deterministic inputs required to build a plan.

user_intent: str#
dataset_path: str#
export_path: str#
custom_operator_paths: List[str]#
__init__(user_intent: str, dataset_path: str, export_path: str, custom_operator_paths: List[str] = <factory>) None#
class data_juicer_agents.tools.plan.PlanModel(plan_id: str, user_intent: str, modality: str = 'unknown', operator_names: List[str] = <factory>, recipe: Dict[str, ~typing.Any]=<factory>, risk_notes: List[str] = <factory>, estimation: Dict[str, ~typing.Any]=<factory>, warnings: List[str] = <factory>, approval_required: bool = True, created_at: str = <factory>)[source]#

Bases: object

Execution plan: plan metadata + embedded DJ-native recipe.

The recipe field is a plain dict that maps 1-to-1 with a Data-Juicer YAML config file. All dataset, system, and process settings live inside recipe; PlanModel itself only owns plan-level metadata.

Downstream code should access recipe fields via plan.recipe[key] or plan.recipe.get(key) to keep the boundary between plan metadata and DJ config clear.

plan_id: str#
user_intent: str#
modality: str = 'unknown'#
operator_names: List[str]#
recipe: Dict[str, Any]#
risk_notes: List[str]#
estimation: Dict[str, Any]#
warnings: List[str]#
approval_required: bool = True#
created_at: str#
static new_id() str[source]#
classmethod from_dict(data: Dict[str, Any]) PlanModel[source]#
to_dict() Dict[str, Any][source]#
__init__(plan_id: str, user_intent: str, modality: str = 'unknown', operator_names: List[str] = <factory>, recipe: Dict[str, ~typing.Any]=<factory>, risk_notes: List[str] = <factory>, estimation: Dict[str, ~typing.Any]=<factory>, warnings: List[str] = <factory>, approval_required: bool = True, created_at: str = <factory>) None#
class data_juicer_agents.tools.plan.PlanSaveInput(*, plan_payload: Dict[str, Any], output_path: str, overwrite: bool = False)[source]#

Bases: BaseModel

plan_payload: Dict[str, Any]#
output_path: str#
overwrite: bool#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.PlanValidateInput(*, plan_payload: Dict[str, Any])[source]#

Bases: BaseModel

plan_payload: Dict[str, Any]#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.PlanValidator[source]#

Bases: object

Validate plan schema and local filesystem preconditions.

static validate(plan: PlanModel) List[str][source]#
exception data_juicer_agents.tools.plan.PlannerBuildError[source]#

Bases: ValueError

Raised when planner core cannot build a valid plan.

class data_juicer_agents.tools.plan.PlannerCore[source]#

Bases: object

Pure deterministic planner builder.

classmethod normalize_context(*, user_intent: str, dataset_path: str = '', export_path: str, custom_operator_paths: Iterable[Any] | None = None) PlanContext[source]#
classmethod build_plan_from_specs(*, user_intent: str, dataset_spec: DatasetSpec | Dict[str, Any], process_spec: Dict[str, Any], system_spec: Dict[str, Any] | None = None, risk_notes: Iterable[Any] | None = None, estimation: Dict[str, Any] | None = None, approval_required: bool = True) PlanModel[source]#
class data_juicer_agents.tools.plan.ProcessOperator(name: str, params: Dict[str, ~typing.Any]=<factory>)[source]#

Bases: object

One operator inside the process spec.

name: str#
params: Dict[str, Any]#
__init__(name: str, params: Dict[str, ~typing.Any]=<factory>) None#
class data_juicer_agents.tools.plan.ProcessOperatorInput(*, name: str, params: Dict[str, Any])[source]#

Bases: BaseModel

name: str#
params: Dict[str, Any]#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.ProcessSpec(operators: List[ProcessOperator] = <factory>)[source]#

Bases: object

Ordered process/operator specification.

operators: List[ProcessOperator]#
classmethod from_dict(data: Dict[str, Any]) ProcessSpec[source]#
to_dict() Dict[str, Any][source]#
__init__(operators: List[ProcessOperator] = <factory>) None#
class data_juicer_agents.tools.plan.SystemSpec(executor_type: str = 'default', np: int = 1, custom_operator_paths: List[str] = <factory>, warnings: List[str] = <factory>, _extra_fields: Dict[str, ~typing.Any]=<factory>)[source]#

Bases: object

Runtime/executor-level settings shared by the whole recipe.

executor_type: str = 'default'#
np: int = 1#
custom_operator_paths: List[str]#
warnings: List[str]#
classmethod from_dict(data: Dict[str, Any]) SystemSpec[source]#

Create SystemSpec from dict.

to_dict() Dict[str, Any][source]#

Convert to dict, including all extra fields.

get(key: str, default: Any = None) Any[source]#

Get a field value, checking both core and extra fields.

set(key: str, value: Any) None[source]#

Set a field value, updating core or extra fields as appropriate.

classmethod from_dj_config(dj_system_config: Dict[str, Any]) SystemSpec[source]#

Create SystemSpec directly from Data-Juicer system config.

Parameters:

dj_system_config – System config dict from DJConfigBridge

Returns:

SystemSpec instance with all DJ system fields

__init__(executor_type: str = 'default', np: int = 1, custom_operator_paths: List[str] = <factory>, warnings: List[str] = <factory>, _extra_fields: Dict[str, ~typing.Any]=<factory>) None#
class data_juicer_agents.tools.plan.ValidateDatasetSpecInput(*, dataset_spec: ~typing.Dict[str, ~typing.Any], dataset_profile: ~typing.Dict[str, ~typing.Any] = <factory>)[source]#

Bases: BaseModel

dataset_spec: Dict[str, Any]#
dataset_profile: Dict[str, Any]#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.ValidateProcessSpecInput(*, process_spec: Dict[str, Any])[source]#

Bases: BaseModel

process_spec: Dict[str, Any]#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class data_juicer_agents.tools.plan.ValidateSystemSpecInput(*, system_spec: Dict[str, Any])[source]#

Bases: BaseModel

system_spec: Dict[str, Any]#
model_config = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

data_juicer_agents.tools.plan.assemble_plan(*, user_intent: str, dataset_spec: Dict[str, Any], process_spec: Dict[str, Any], system_spec: Dict[str, Any] | None = None, approval_required: bool = True) Dict[str, Any][source]#
data_juicer_agents.tools.plan.build_dataset_spec(*, user_intent: str, dataset_source: DatasetSource | None = None, export_path: str, dataset_profile: Dict[str, Any] | None = None, modality_hint: str = '', text_keys_hint: Iterable[Any] | None = None, image_key_hint: str = '', audio_key_hint: str = '', video_key_hint: str = '', image_bytes_key_hint: str = '', **kwargs: Any) Dict[str, Any][source]#
data_juicer_agents.tools.plan.build_process_spec(*, operators: Iterable[Any] | None) Dict[str, Any][source]#
data_juicer_agents.tools.plan.build_system_spec(*, custom_operator_paths: Iterable[Any] | None = None, np: int | None = None, executor_type: str | None = None, **kwargs: Any) Dict[str, Any][source]#

Build system spec with complete config dynamically loaded from Data-Juicer.

This function now loads ALL system configuration fields from Data-Juicer, ensuring automatic sync with any upstream changes.

Parameters:
  • custom_operator_paths – Optional list of custom operator paths

  • np – Optional number of processes

  • executor_type – Optional executor type

  • **kwargs – Any additional system config options (must be valid DJ system config fields — unknown keys will raise ValueError)

Returns:

Dict containing the built system spec and validation results

data_juicer_agents.tools.plan.plan_validate(*, plan_payload: Dict[str, Any]) Dict[str, Any][source]#
data_juicer_agents.tools.plan.save_plan_file(*, plan_payload: Dict[str, Any], output_path: str, overwrite: bool = False) Dict[str, Any][source]#
data_juicer_agents.tools.plan.validate_dataset_spec_payload(dataset_spec: DatasetSpec | Dict[str, Any], *, dataset_profile: Dict[str, Any] | None = None) Tuple[List[str], List[str]][source]#

Validate dataset spec with our business rules + DJ parser.

data_juicer_agents.tools.plan.validate_plan_schema(plan: PlanModel) List[str][source]#
data_juicer_agents.tools.plan.validate_process_spec_payload(process_spec: ProcessSpec | Dict[str, Any]) Tuple[List[str], List[str]][source]#

Validate process spec structure and operator names/params via DJ bridge.

data_juicer_agents.tools.plan.validate_system_spec_payload(system_spec: SystemSpec | Dict[str, Any]) Tuple[List[str], List[str]][source]#

Validate system spec using Data-Juicer’s native validation when possible.