data_juicer.core.executor.dag_execution_strategies module#

class data_juicer.core.executor.dag_execution_strategies.DAGNodeType(value)[source]#

Bases: Enum

Types of DAG nodes.

OPERATION = 'operation'#
PARTITION_OPERATION = 'partition_operation'#
SCATTER_GATHER = 'scatter_gather'#
class data_juicer.core.executor.dag_execution_strategies.DAGNodeStatusTransition[source]#

Bases: object

Validates DAG node status transitions.

Uses DAGNodeStatus enum for type safety. Valid transitions: - PENDING -> RUNNING (node starts execution) - PENDING -> COMPLETED (skipped - already done in previous run) - RUNNING -> COMPLETED (node finishes successfully) - RUNNING -> FAILED (node fails) - FAILED -> RUNNING (node retries) - COMPLETED is terminal (no transitions out)

VALID_TRANSITIONS = {DAGNodeStatus.COMPLETED: {}, DAGNodeStatus.FAILED: {DAGNodeStatus.RUNNING}, DAGNodeStatus.PENDING: {DAGNodeStatus.COMPLETED, DAGNodeStatus.RUNNING}, DAGNodeStatus.RUNNING: {DAGNodeStatus.COMPLETED, DAGNodeStatus.FAILED}}#
classmethod is_valid(from_status: str | DAGNodeStatus, to_status: str | DAGNodeStatus) bool[source]#

Check if a status transition is valid.

Parameters:
  • from_status – Current status (string or enum)

  • to_status – Target status (string or enum)

Returns:

True if transition is valid, False otherwise

classmethod validate_and_log(node_id: str, from_status: str | DAGNodeStatus, to_status: str | DAGNodeStatus) bool[source]#

Validate transition and log warning if invalid.

Parameters:
  • node_id – Node identifier for logging

  • from_status – Current status (string or enum)

  • to_status – Target status (string or enum)

Returns:

True if transition is valid, False otherwise

class data_juicer.core.executor.dag_execution_strategies.ScatterGatherNode(operation_index: int, operation_name: str, input_partitions: List[int], output_partitions: List[int])[source]#

Bases: object

Represents a scatter-gather operation in partitioned execution.

Encapsulates the complete scatter-gather pattern: 1. Convergence: All partitions complete their work and converge 2. Global Operation: A single operation runs on the gathered data 3. Redistribution: Results are redistributed back to partitions

operation_index: int#
operation_name: str#
input_partitions: List[int]#
output_partitions: List[int]#
property node_id: str#

Generate unique node ID for scatter-gather operation.

__init__(operation_index: int, operation_name: str, input_partitions: List[int], output_partitions: List[int]) None#
class data_juicer.core.executor.dag_execution_strategies.NodeID[source]#

Bases: object

Utility for creating and parsing standardized node IDs.

Node ID formats: - Operation: “op_{idx:03d}_{name}” - Partition Operation: “op_{idx:03d}_{name}_partition_{pid}” - Scatter-Gather: “sg_{idx:03d}_{name}”

static for_operation(op_idx: int, op_name: str) str[source]#

Create node ID for global operation.

Parameters:
  • op_idx – Operation index (0-based)

  • op_name – Operation name

Returns:

Standardized node ID

static for_partition_operation(partition_id: int, op_idx: int, op_name: str) str[source]#

Create node ID for partition operation.

Parameters:
  • partition_id – Partition ID

  • op_idx – Operation index (0-based)

  • op_name – Operation name

Returns:

Standardized node ID

static for_scatter_gather(op_idx: int, op_name: str) str[source]#

Create node ID for scatter-gather operation.

Parameters:
  • op_idx – Operation index (0-based)

  • op_name – Operation name

Returns:

Standardized node ID

static parse(node_id: str) Dict[str, Any] | None[source]#

Parse node ID into components.

Parameters:

node_id – The node ID to parse

Returns:

Dictionary with node type and components, or None if invalid format

Example

>>> NodeID.parse("op_001_mapper_partition_0")
{'type': DAGNodeType.PARTITION_OPERATION, 'partition_id': 0,
 'operation_index': 0, 'operation_name': 'mapper'}
>>> NodeID.parse("sg_002_deduplicator")
{'type': DAGNodeType.SCATTER_GATHER, 'operation_index': 2,
 'operation_name': 'deduplicator'}
class data_juicer.core.executor.dag_execution_strategies.DAGExecutionStrategy[source]#

Bases: ABC

Abstract base class for different DAG execution strategies.

abstractmethod generate_dag_nodes(operations: List, **kwargs) Dict[str, Any][source]#

Generate DAG nodes based on execution strategy.

abstractmethod get_dag_node_id(op_name: str, op_idx: int, **kwargs) str[source]#

Get DAG node ID for operation based on strategy.

abstractmethod build_dependencies(nodes: Dict[str, Any], operations: List, **kwargs) None[source]#

Build dependencies between nodes based on strategy.

abstractmethod can_execute_node(node_id: str, nodes: Dict[str, Any], completed_nodes: set) bool[source]#

Check if a node can be executed based on strategy.

validate_dag(nodes: Dict[str, Any]) bool[source]#

Validate DAG has no cycles using DFS.

Returns:

True if DAG is valid (no cycles), False otherwise

class data_juicer.core.executor.dag_execution_strategies.NonPartitionedDAGStrategy[source]#

Bases: DAGExecutionStrategy

Strategy for non-partitioned executors (default, ray).

generate_dag_nodes(operations: List, **kwargs) Dict[str, Any][source]#

Generate DAG nodes for non-partitioned execution.

get_dag_node_id(op_name: str, op_idx: int, **kwargs) str[source]#

Get DAG node ID for non-partitioned operation.

build_dependencies(nodes: Dict[str, Any], operations: List, **kwargs) None[source]#

Build sequential dependencies for non-partitioned execution.

can_execute_node(node_id: str, nodes: Dict[str, Any], completed_nodes: set) bool[source]#

Check if a node can be executed (all dependencies completed).

class data_juicer.core.executor.dag_execution_strategies.PartitionedDAGStrategy(num_partitions: int)[source]#

Bases: DAGExecutionStrategy

Strategy for partitioned executors (ray_partitioned).

__init__(num_partitions: int)[source]#
generate_dag_nodes(operations: List, **kwargs) Dict[str, Any][source]#

Generate DAG nodes for partitioned execution using scatter-gather pattern.

get_dag_node_id(op_name: str, op_idx: int, partition_id: int = None, **kwargs) str[source]#

Get DAG node ID for partitioned operation.

build_dependencies(nodes: Dict[str, Any], operations: List, **kwargs) None[source]#

Build dependencies for partitioned execution using scatter-gather pattern.

  • Partition operations depend on previous operation in same partition

  • Scatter-gather nodes depend on ALL partitions from previous op

  • Post-scatter-gather partition ops depend on the scatter-gather node

can_execute_node(node_id: str, nodes: Dict[str, Any], completed_nodes: set) bool[source]#

Check if a node can be executed (all dependencies completed).

data_juicer.core.executor.dag_execution_strategies.is_global_operation(operation) bool[source]#

Check if an operation is a global operation that requires convergence.

Global operations need to see all data at once (e.g., deduplication, global sorting). They cannot be partitioned and require a scatter-gather pattern.

Detection priority: 1. Explicit is_global_operation flag on the operation 2. Base class inheritance (Deduplicator) 3. Operation name pattern (fallback for unknown operations)