data_juicer.core.executor#

class data_juicer.core.executor.ExecutorBase(cfg: Namespace | None = None)[source]#

Bases: ABC

abstractmethod __init__(cfg: Namespace | None = None)[source]#
abstractmethod run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[source]#

Abstract method for ExecutorBase.run

class data_juicer.core.executor.ExecutorFactory[source]#

Bases: object

static create_executor(executor_type: str) ExecutorBase[source]#
class data_juicer.core.executor.DefaultExecutor(cfg: Namespace | None = None)[source]#

Bases: ExecutorBase, DAGExecutionMixin, EventLoggingMixin

This Executor class is used to process a specific dataset.

It will load the dataset and unify the format, then apply all the ops in the config file in order and generate a processed dataset.

__init__(cfg: Namespace | None = None)[source]#

Initialization method.

Parameters:

cfg – optional jsonargparse Namespace.

run(dataset: Dataset | NestedDataset = None, load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_export: bool = False, skip_return: bool = False)[source]#

Running the dataset process pipeline.

Parameters:
  • dataset – a Dataset object to be executed.

  • load_data_np – number of workers when loading the dataset.

  • skip_export – whether export the results into disk

  • skip_return – skip return for API called.

Returns:

processed dataset.

sample_data(dataset_to_sample: Dataset = None, load_data_np=None, sample_ratio: float = 1.0, sample_algo: str = 'uniform', **kwargs)[source]#

Sample a subset from the given dataset. TODO add support other than LocalExecutor

Parameters:
  • dataset_to_sample – Dataset to sample from. If None, will use the formatter linked by the executor. Default is None.

  • load_data_np – number of workers when loading the dataset.

  • sample_ratio – The ratio of the sample size to the original dataset size. Default is 1.0 (no sampling).

  • sample_algo – Sampling algorithm to use. Options are “uniform”, “frequency_specified_field_selector”, or “topk_specified_field_selector”. Default is “uniform”.

Returns:

A sampled Dataset.

class data_juicer.core.executor.RayExecutor(cfg: Namespace | None = None)[source]#

Bases: ExecutorBase, DAGExecutionMixin, EventLoggingMixin

Executor based on Ray.

Run Data-Juicer data processing in a distributed cluster.

  1. Support Filter, Mapper and Exact Deduplicator operators for now.

  2. Only support loading .json files.

  3. Advanced functions, such as checkpoint, are not supported.

__init__(cfg: Namespace | None = None)[source]#

Initialization method.

Parameters:

cfg – optional config dict.

run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_export: bool = False, skip_return: bool = False)[source]#

Running the dataset process pipeline

Parameters:
  • load_data_np – number of workers when loading the dataset.

  • skip_export – whether export the results into disk

  • skip_return – skip return for API called.

Returns:

processed dataset.

class data_juicer.core.executor.PartitionedRayExecutor(cfg: Namespace | None = None)[source]#

Bases: ExecutorBase, DAGExecutionMixin, EventLoggingMixin

Simplified Ray executor with dataset partitioning using .split().

Features: - Single DatasetBuilder loads the full dataset - Uses Ray’s .split() method for partitioning - Processes partitions in parallel with Ray tasks - Supports convergence points for global operations - Merges results back into a single dataset

__init__(cfg: Namespace | None = None)[source]#

Initialize the partitioned Ray executor.

run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[source]#

Run the simplified partitioned dataset processing pipeline.

Parameters:
  • load_data_np – Number of workers for loading dataset

  • skip_return – Whether to skip returning the dataset

  • job_id – Optional job ID to resume from checkpoints

Returns:

Processed dataset

cleanup_temp_files()[source]#

Manually clean up temporary files from previous runs.