data_juicer.core.executor.ray_executor_partitioned module#

Simplified Partitioned Ray Executor for Large Dataset Processing

This module implements a streamlined partitioned execution strategy for Ray mode that: 2. Splits the dataset into manageable partitions using Ray's .split() method 3. Processes each partition independently with Ray tasks 4. Merges results back into a single dataset for export 5. Supports convergence points for global operations (like deduplicators)

class data_juicer.core.executor.ray_executor_partitioned.TempDirManager(tmp_dir)[源代码]#

基类:object

Context manager for temporary directory cleanup.

__init__(tmp_dir)[源代码]#
class data_juicer.core.executor.ray_executor_partitioned.PartitionResult(partition_id: int, dataset: Any | None = None, success: bool = False, error: str | None = None)[源代码]#

基类:object

Simple result container for partition processing.

partition_id: int#
dataset: Any | None = None#
success: bool = False#
error: str | None = None#
__init__(partition_id: int, dataset: Any | None = None, success: bool = False, error: str | None = None) None#
class data_juicer.core.executor.ray_executor_partitioned.PartitionMetadata(partition_id: int, row_count: int, first_row_hash: str, last_row_hash: str)[源代码]#

基类:object

Metadata for a single partition to enable validation on resume.

Stores information about each partition that can be used to verify that re-partitioning produces the same result on job resumption.

partition_id: int#
row_count: int#
first_row_hash: str#
last_row_hash: str#
to_dict() Dict[源代码]#
classmethod from_dict(data: Dict) PartitionMetadata[源代码]#
__init__(partition_id: int, row_count: int, first_row_hash: str, last_row_hash: str) None#
class data_juicer.core.executor.ray_executor_partitioned.PartitioningInfo(num_partitions: int, total_rows: int, partitions: List[PartitionMetadata] = <factory>, deterministic: bool = True)[源代码]#

基类:object

Complete partitioning information for a job.

Stored alongside checkpoints to enable validation that re-partitioning on resume produces identical partitions.

num_partitions: int#
total_rows: int#
partitions: List[PartitionMetadata]#
deterministic: bool = True#
to_dict() Dict[源代码]#
classmethod from_dict(data: Dict) PartitioningInfo[源代码]#
save(path: str) None[源代码]#

Save partitioning info to JSON file.

classmethod load(path: str) PartitioningInfo | None[源代码]#

Load partitioning info from JSON file.

__init__(num_partitions: int, total_rows: int, partitions: List[PartitionMetadata] = <factory>, deterministic: bool = True) None#
class data_juicer.core.executor.ray_executor_partitioned.PartitionedRayExecutor(cfg: Namespace | None = None)[源代码]#

基类:ExecutorBase, DAGExecutionMixin, EventLoggingMixin

Simplified Ray executor with dataset partitioning using .split().

Features: - Single DatasetBuilder loads the full dataset - Uses Ray's .split() method for partitioning - Processes partitions in parallel with Ray tasks - Supports convergence points for global operations - Merges results back into a single dataset

__init__(cfg: Namespace | None = None)[源代码]#

Initialize the partitioned Ray executor.

run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[源代码]#

Run the simplified partitioned dataset processing pipeline.

参数:
  • load_data_np -- Number of workers for loading dataset

  • skip_return -- Whether to skip returning the dataset

  • job_id -- Optional job ID to resume from checkpoints

返回:

Processed dataset

cleanup_temp_files()[源代码]#

Manually clean up temporary files from previous runs.