data_juicer.core.executor.pipeline_dag module#
Pipeline DAG Representation for Data-Juicer Pipelines
This module provides Pipeline DAG (Directed Acyclic Graph) representation for tracking execution state, dependencies, and monitoring.
Refactored to: - Live in core/executor/ where it's actually used - Use dict nodes consistently (matching strategy output) - Share status enum with DAGNodeStatusTransition
- class data_juicer.core.executor.pipeline_dag.DAGNodeStatus(value)[源代码]#
基类:
EnumStatus of a DAG node during execution.
State machine transitions (enforced by DAGNodeStatusTransition): - pending -> running (node starts execution) - pending -> completed (skipped - already done in previous run) - running -> completed (node finishes successfully) - running -> failed (node fails) - failed -> running (node retries) - completed is terminal (no transitions out)
- PENDING = 'pending'#
- RUNNING = 'running'#
- COMPLETED = 'completed'#
- FAILED = 'failed'#
- class data_juicer.core.executor.pipeline_dag.PipelineDAG(work_dir: str)[源代码]#
基类:
objectPipeline DAG representation and execution state tracker.
Stores DAG nodes as dicts (matching strategy output format). Provides methods for state management, serialization, and visualization.
- __init__(work_dir: str)[源代码]#
Initialize the Pipeline DAG.
- 参数:
work_dir -- Working directory for storing DAG execution plans
- nodes: Dict[str, Dict[str, Any]]#
- edges: List[Any]#
- execution_plan: List[str]#
- parallel_groups: List[List[str]]#
- save_execution_plan(filename: str = 'dag_execution_plan.json') str[源代码]#
Save the execution plan to file.
- 参数:
filename -- Name of the file to save the plan
- 返回:
Path to the saved file
- load_execution_plan(filename: str = 'dag_execution_plan.json') bool[源代码]#
Load execution plan from file.
- 参数:
filename -- Name of the file to load the plan from
- 返回:
True if loaded successfully, False otherwise
- get_node_status(node_id: str) DAGNodeStatus[源代码]#
Get status of a node by ID.
- 参数:
node_id -- The node identifier
- 返回:
DAGNodeStatus of the node