data_juicer.core.executor.ray_executor_partitioned module#
Simplified Partitioned Ray Executor for Large Dataset Processing
This module implements a streamlined partitioned execution strategy for Ray mode that: 2. Splits the dataset into manageable partitions using Ray’s .split() method 3. Processes each partition independently with Ray tasks 4. Merges results back into a single dataset for export 5. Supports convergence points for global operations (like deduplicators)
- class data_juicer.core.executor.ray_executor_partitioned.TempDirManager(tmp_dir)[source]#
Bases:
objectContext manager for temporary directory cleanup.
- class data_juicer.core.executor.ray_executor_partitioned.PartitionResult(partition_id: int, dataset: Any | None = None, success: bool = False, error: str | None = None)[source]#
Bases:
objectSimple result container for partition processing.
- partition_id: int#
- dataset: Any | None = None#
- success: bool = False#
- error: str | None = None#
- __init__(partition_id: int, dataset: Any | None = None, success: bool = False, error: str | None = None) None#
- class data_juicer.core.executor.ray_executor_partitioned.PartitionMetadata(partition_id: int, row_count: int, first_row_hash: str, last_row_hash: str)[source]#
Bases:
objectMetadata for a single partition to enable validation on resume.
Stores information about each partition that can be used to verify that re-partitioning produces the same result on job resumption.
- partition_id: int#
- row_count: int#
- first_row_hash: str#
- last_row_hash: str#
- classmethod from_dict(data: Dict) PartitionMetadata[source]#
- __init__(partition_id: int, row_count: int, first_row_hash: str, last_row_hash: str) None#
- class data_juicer.core.executor.ray_executor_partitioned.PartitioningInfo(num_partitions: int, total_rows: int, partitions: List[PartitionMetadata] = <factory>, deterministic: bool = True)[source]#
Bases:
objectComplete partitioning information for a job.
Stored alongside checkpoints to enable validation that re-partitioning on resume produces identical partitions.
- num_partitions: int#
- total_rows: int#
- partitions: List[PartitionMetadata]#
- deterministic: bool = True#
- classmethod from_dict(data: Dict) PartitioningInfo[source]#
- classmethod load(path: str) PartitioningInfo | None[source]#
Load partitioning info from JSON file.
- __init__(num_partitions: int, total_rows: int, partitions: List[PartitionMetadata] = <factory>, deterministic: bool = True) None#
- class data_juicer.core.executor.ray_executor_partitioned.PartitionedRayExecutor(cfg: Namespace | None = None)[source]#
Bases:
ExecutorBase,DAGExecutionMixin,EventLoggingMixinSimplified Ray executor with dataset partitioning using .split().
Features: - Single DatasetBuilder loads the full dataset - Uses Ray’s .split() method for partitioning - Processes partitions in parallel with Ray tasks - Supports convergence points for global operations - Merges results back into a single dataset
- run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[source]#
Run the simplified partitioned dataset processing pipeline.
- Parameters:
load_data_np – Number of workers for loading dataset
skip_return – Whether to skip returning the dataset
job_id – Optional job ID to resume from checkpoints
- Returns:
Processed dataset