data_juicer.core.executor.ray_executor_partitioned module#

Simplified Partitioned Ray Executor for Large Dataset Processing

This module implements a streamlined partitioned execution strategy for Ray mode that: 2. Splits the dataset into manageable partitions using Ray’s .split() method 3. Processes each partition independently with Ray tasks 4. Merges results back into a single dataset for export 5. Supports convergence points for global operations (like deduplicators)

class data_juicer.core.executor.ray_executor_partitioned.TempDirManager(tmp_dir)[source]#

Bases: object

Context manager for temporary directory cleanup.

__init__(tmp_dir)[source]#
class data_juicer.core.executor.ray_executor_partitioned.PartitionResult(partition_id: int, dataset: Any | None = None, success: bool = False, error: str | None = None)[source]#

Bases: object

Simple result container for partition processing.

partition_id: int#
dataset: Any | None = None#
success: bool = False#
error: str | None = None#
__init__(partition_id: int, dataset: Any | None = None, success: bool = False, error: str | None = None) None#
class data_juicer.core.executor.ray_executor_partitioned.PartitionMetadata(partition_id: int, row_count: int, first_row_hash: str, last_row_hash: str)[source]#

Bases: object

Metadata for a single partition to enable validation on resume.

Stores information about each partition that can be used to verify that re-partitioning produces the same result on job resumption.

partition_id: int#
row_count: int#
first_row_hash: str#
last_row_hash: str#
to_dict() Dict[source]#
classmethod from_dict(data: Dict) PartitionMetadata[source]#
__init__(partition_id: int, row_count: int, first_row_hash: str, last_row_hash: str) None#
class data_juicer.core.executor.ray_executor_partitioned.PartitioningInfo(num_partitions: int, total_rows: int, partitions: List[PartitionMetadata] = <factory>, deterministic: bool = True)[source]#

Bases: object

Complete partitioning information for a job.

Stored alongside checkpoints to enable validation that re-partitioning on resume produces identical partitions.

num_partitions: int#
total_rows: int#
partitions: List[PartitionMetadata]#
deterministic: bool = True#
to_dict() Dict[source]#
classmethod from_dict(data: Dict) PartitioningInfo[source]#
save(path: str) None[source]#

Save partitioning info to JSON file.

classmethod load(path: str) PartitioningInfo | None[source]#

Load partitioning info from JSON file.

__init__(num_partitions: int, total_rows: int, partitions: List[PartitionMetadata] = <factory>, deterministic: bool = True) None#
class data_juicer.core.executor.ray_executor_partitioned.PartitionedRayExecutor(cfg: Namespace | None = None)[source]#

Bases: ExecutorBase, DAGExecutionMixin, EventLoggingMixin

Simplified Ray executor with dataset partitioning using .split().

Features: - Single DatasetBuilder loads the full dataset - Uses Ray’s .split() method for partitioning - Processes partitions in parallel with Ray tasks - Supports convergence points for global operations - Merges results back into a single dataset

__init__(cfg: Namespace | None = None)[source]#

Initialize the partitioned Ray executor.

run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[source]#

Run the simplified partitioned dataset processing pipeline.

Parameters:
  • load_data_np – Number of workers for loading dataset

  • skip_return – Whether to skip returning the dataset

  • job_id – Optional job ID to resume from checkpoints

Returns:

Processed dataset

cleanup_temp_files()[source]#

Manually clean up temporary files from previous runs.