data_juicer.core.executor.pipeline_dag module#

Pipeline DAG Representation for Data-Juicer Pipelines

This module provides Pipeline DAG (Directed Acyclic Graph) representation for tracking execution state, dependencies, and monitoring.

Refactored to: - Live in core/executor/ where it's actually used - Use dict nodes consistently (matching strategy output) - Share status enum with DAGNodeStatusTransition

class data_juicer.core.executor.pipeline_dag.DAGNodeStatus(value)[源代码]#

基类:Enum

Status of a DAG node during execution.

State machine transitions (enforced by DAGNodeStatusTransition): - pending -> running (node starts execution) - pending -> completed (skipped - already done in previous run) - running -> completed (node finishes successfully) - running -> failed (node fails) - failed -> running (node retries) - completed is terminal (no transitions out)

PENDING = 'pending'#
RUNNING = 'running'#
COMPLETED = 'completed'#
FAILED = 'failed'#
class data_juicer.core.executor.pipeline_dag.PipelineDAG(work_dir: str)[源代码]#

基类:object

Pipeline DAG representation and execution state tracker.

Stores DAG nodes as dicts (matching strategy output format). Provides methods for state management, serialization, and visualization.

__init__(work_dir: str)[源代码]#

Initialize the Pipeline DAG.

参数:

work_dir -- Working directory for storing DAG execution plans

nodes: Dict[str, Dict[str, Any]]#
edges: List[Any]#
execution_plan: List[str]#
parallel_groups: List[List[str]]#
save_execution_plan(filename: str = 'dag_execution_plan.json') str[源代码]#

Save the execution plan to file.

参数:

filename -- Name of the file to save the plan

返回:

Path to the saved file

load_execution_plan(filename: str = 'dag_execution_plan.json') bool[源代码]#

Load execution plan from file.

参数:

filename -- Name of the file to load the plan from

返回:

True if loaded successfully, False otherwise

mark_node_started(node_id: str) None[源代码]#

Mark a node as started (running).

mark_node_completed(node_id: str, duration: float = None) None[源代码]#

Mark a node as completed.

mark_node_failed(node_id: str, error_message: str) None[源代码]#

Mark a node as failed.

get_node_status(node_id: str) DAGNodeStatus[源代码]#

Get status of a node by ID.

参数:

node_id -- The node identifier

返回:

DAGNodeStatus of the node

get_ready_nodes() List[str][源代码]#

Get list of nodes ready to execute (all dependencies completed).

get_execution_summary() Dict[str, Any][源代码]#

Get execution summary statistics.

visualize() str[源代码]#

Generate a string representation of the DAG for visualization.