data_juicer.core.executor.dag_execution_strategies module#
- class data_juicer.core.executor.dag_execution_strategies.DAGNodeType(value)[source]#
Bases:
EnumTypes of DAG nodes.
- OPERATION = 'operation'#
- PARTITION_OPERATION = 'partition_operation'#
- SCATTER_GATHER = 'scatter_gather'#
- class data_juicer.core.executor.dag_execution_strategies.DAGNodeStatusTransition[source]#
Bases:
objectValidates DAG node status transitions.
Uses DAGNodeStatus enum for type safety. Valid transitions: - PENDING -> RUNNING (node starts execution) - PENDING -> COMPLETED (skipped - already done in previous run) - RUNNING -> COMPLETED (node finishes successfully) - RUNNING -> FAILED (node fails) - FAILED -> RUNNING (node retries) - COMPLETED is terminal (no transitions out)
- VALID_TRANSITIONS = {DAGNodeStatus.COMPLETED: {}, DAGNodeStatus.FAILED: {DAGNodeStatus.RUNNING}, DAGNodeStatus.PENDING: {DAGNodeStatus.COMPLETED, DAGNodeStatus.RUNNING}, DAGNodeStatus.RUNNING: {DAGNodeStatus.COMPLETED, DAGNodeStatus.FAILED}}#
- classmethod is_valid(from_status: str | DAGNodeStatus, to_status: str | DAGNodeStatus) bool[source]#
Check if a status transition is valid.
- Parameters:
from_status – Current status (string or enum)
to_status – Target status (string or enum)
- Returns:
True if transition is valid, False otherwise
- classmethod validate_and_log(node_id: str, from_status: str | DAGNodeStatus, to_status: str | DAGNodeStatus) bool[source]#
Validate transition and log warning if invalid.
- Parameters:
node_id – Node identifier for logging
from_status – Current status (string or enum)
to_status – Target status (string or enum)
- Returns:
True if transition is valid, False otherwise
- class data_juicer.core.executor.dag_execution_strategies.ScatterGatherNode(operation_index: int, operation_name: str, input_partitions: List[int], output_partitions: List[int])[source]#
Bases:
objectRepresents a scatter-gather operation in partitioned execution.
Encapsulates the complete scatter-gather pattern: 1. Convergence: All partitions complete their work and converge 2. Global Operation: A single operation runs on the gathered data 3. Redistribution: Results are redistributed back to partitions
- operation_index: int#
- operation_name: str#
- input_partitions: List[int]#
- output_partitions: List[int]#
- property node_id: str#
Generate unique node ID for scatter-gather operation.
- __init__(operation_index: int, operation_name: str, input_partitions: List[int], output_partitions: List[int]) None#
- class data_juicer.core.executor.dag_execution_strategies.NodeID[source]#
Bases:
objectUtility for creating and parsing standardized node IDs.
Node ID formats: - Operation: “op_{idx:03d}_{name}” - Partition Operation: “op_{idx:03d}_{name}_partition_{pid}” - Scatter-Gather: “sg_{idx:03d}_{name}”
- static for_operation(op_idx: int, op_name: str) str[source]#
Create node ID for global operation.
- Parameters:
op_idx – Operation index (0-based)
op_name – Operation name
- Returns:
Standardized node ID
- static for_partition_operation(partition_id: int, op_idx: int, op_name: str) str[source]#
Create node ID for partition operation.
- Parameters:
partition_id – Partition ID
op_idx – Operation index (0-based)
op_name – Operation name
- Returns:
Standardized node ID
- static for_scatter_gather(op_idx: int, op_name: str) str[source]#
Create node ID for scatter-gather operation.
- Parameters:
op_idx – Operation index (0-based)
op_name – Operation name
- Returns:
Standardized node ID
- static parse(node_id: str) Dict[str, Any] | None[source]#
Parse node ID into components.
- Parameters:
node_id – The node ID to parse
- Returns:
Dictionary with node type and components, or None if invalid format
Example
>>> NodeID.parse("op_001_mapper_partition_0") {'type': DAGNodeType.PARTITION_OPERATION, 'partition_id': 0, 'operation_index': 0, 'operation_name': 'mapper'}
>>> NodeID.parse("sg_002_deduplicator") {'type': DAGNodeType.SCATTER_GATHER, 'operation_index': 2, 'operation_name': 'deduplicator'}
- class data_juicer.core.executor.dag_execution_strategies.DAGExecutionStrategy[source]#
Bases:
ABCAbstract base class for different DAG execution strategies.
- abstractmethod generate_dag_nodes(operations: List, **kwargs) Dict[str, Any][source]#
Generate DAG nodes based on execution strategy.
- abstractmethod get_dag_node_id(op_name: str, op_idx: int, **kwargs) str[source]#
Get DAG node ID for operation based on strategy.
- abstractmethod build_dependencies(nodes: Dict[str, Any], operations: List, **kwargs) None[source]#
Build dependencies between nodes based on strategy.
- class data_juicer.core.executor.dag_execution_strategies.NonPartitionedDAGStrategy[source]#
Bases:
DAGExecutionStrategyStrategy for non-partitioned executors (default, ray).
- generate_dag_nodes(operations: List, **kwargs) Dict[str, Any][source]#
Generate DAG nodes for non-partitioned execution.
- get_dag_node_id(op_name: str, op_idx: int, **kwargs) str[source]#
Get DAG node ID for non-partitioned operation.
- class data_juicer.core.executor.dag_execution_strategies.PartitionedDAGStrategy(num_partitions: int)[source]#
Bases:
DAGExecutionStrategyStrategy for partitioned executors (ray_partitioned).
- generate_dag_nodes(operations: List, **kwargs) Dict[str, Any][source]#
Generate DAG nodes for partitioned execution using scatter-gather pattern.
- get_dag_node_id(op_name: str, op_idx: int, partition_id: int = None, **kwargs) str[source]#
Get DAG node ID for partitioned operation.
- build_dependencies(nodes: Dict[str, Any], operations: List, **kwargs) None[source]#
Build dependencies for partitioned execution using scatter-gather pattern.
Partition operations depend on previous operation in same partition
Scatter-gather nodes depend on ALL partitions from previous op
Post-scatter-gather partition ops depend on the scatter-gather node
- data_juicer.core.executor.dag_execution_strategies.is_global_operation(operation) bool[source]#
Check if an operation is a global operation that requires convergence.
Global operations need to see all data at once (e.g., deduplication, global sorting). They cannot be partitioned and require a scatter-gather pattern.
Detection priority: 1. Explicit is_global_operation flag on the operation 2. Base class inheritance (Deduplicator) 3. Operation name pattern (fallback for unknown operations)