data_juicer.core.executor.pipeline_dag module#

Pipeline DAG Representation for Data-Juicer Pipelines

This module provides Pipeline DAG (Directed Acyclic Graph) representation for tracking execution state, dependencies, and monitoring.

Refactored to: - Live in core/executor/ where itโ€™s actually used - Use dict nodes consistently (matching strategy output) - Share status enum with DAGNodeStatusTransition

class data_juicer.core.executor.pipeline_dag.DAGNodeStatus(value)[source]#

Bases: Enum

Status of a DAG node during execution.

State machine transitions (enforced by DAGNodeStatusTransition): - pending -> running (node starts execution) - pending -> completed (skipped - already done in previous run) - running -> completed (node finishes successfully) - running -> failed (node fails) - failed -> running (node retries) - completed is terminal (no transitions out)

PENDING = 'pending'#
RUNNING = 'running'#
COMPLETED = 'completed'#
FAILED = 'failed'#
class data_juicer.core.executor.pipeline_dag.PipelineDAG(work_dir: str)[source]#

Bases: object

Pipeline DAG representation and execution state tracker.

Stores DAG nodes as dicts (matching strategy output format). Provides methods for state management, serialization, and visualization.

__init__(work_dir: str)[source]#

Initialize the Pipeline DAG.

Parameters:

work_dir โ€“ Working directory for storing DAG execution plans

nodes: Dict[str, Dict[str, Any]]#
edges: List[Any]#
execution_plan: List[str]#
parallel_groups: List[List[str]]#
save_execution_plan(filename: str = 'dag_execution_plan.json') str[source]#

Save the execution plan to file.

Parameters:

filename โ€“ Name of the file to save the plan

Returns:

Path to the saved file

load_execution_plan(filename: str = 'dag_execution_plan.json') bool[source]#

Load execution plan from file.

Parameters:

filename โ€“ Name of the file to load the plan from

Returns:

True if loaded successfully, False otherwise

mark_node_started(node_id: str) None[source]#

Mark a node as started (running).

mark_node_completed(node_id: str, duration: float = None) None[source]#

Mark a node as completed.

mark_node_failed(node_id: str, error_message: str) None[source]#

Mark a node as failed.

get_node_status(node_id: str) DAGNodeStatus[source]#

Get status of a node by ID.

Parameters:

node_id โ€“ The node identifier

Returns:

DAGNodeStatus of the node

get_ready_nodes() List[str][source]#

Get list of nodes ready to execute (all dependencies completed).

get_execution_summary() Dict[str, Any][source]#

Get execution summary statistics.

visualize() str[source]#

Generate a string representation of the DAG for visualization.