data_juicer.core.executor#
- class data_juicer.core.executor.ExecutorFactory[源代码]#
基类:
object- static create_executor(executor_type: str) ExecutorBase[源代码]#
- class data_juicer.core.executor.DefaultExecutor(cfg: Namespace | None = None)[源代码]#
基类:
ExecutorBase,DAGExecutionMixin,EventLoggingMixinThis Executor class is used to process a specific dataset.
It will load the dataset and unify the format, then apply all the ops in the config file in order and generate a processed dataset.
- __init__(cfg: Namespace | None = None)[源代码]#
Initialization method.
- 参数:
cfg -- optional jsonargparse Namespace.
- run(dataset: Dataset | NestedDataset = None, load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_export: bool = False, skip_return: bool = False)[源代码]#
Running the dataset process pipeline.
- 参数:
dataset -- a Dataset object to be executed.
load_data_np -- number of workers when loading the dataset.
skip_export -- whether export the results into disk
skip_return -- skip return for API called.
- 返回:
processed dataset.
- sample_data(dataset_to_sample: Dataset = None, load_data_np=None, sample_ratio: float = 1.0, sample_algo: str = 'uniform', **kwargs)[源代码]#
Sample a subset from the given dataset. TODO add support other than LocalExecutor
- 参数:
dataset_to_sample -- Dataset to sample from. If None, will use the formatter linked by the executor. Default is None.
load_data_np -- number of workers when loading the dataset.
sample_ratio -- The ratio of the sample size to the original dataset size. Default is 1.0 (no sampling).
sample_algo -- Sampling algorithm to use. Options are "uniform", "frequency_specified_field_selector", or "topk_specified_field_selector". Default is "uniform".
- 返回:
A sampled Dataset.
- class data_juicer.core.executor.RayExecutor(cfg: Namespace | None = None)[源代码]#
基类:
ExecutorBase,DAGExecutionMixin,EventLoggingMixinExecutor based on Ray.
Run Data-Juicer data processing in a distributed cluster.
Support Filter, Mapper and Exact Deduplicator operators for now.
Only support loading .json files.
Advanced functions, such as checkpoint, are not supported.
- __init__(cfg: Namespace | None = None)[源代码]#
Initialization method.
- 参数:
cfg -- optional config dict.
- run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_export: bool = False, skip_return: bool = False)[源代码]#
Running the dataset process pipeline
- 参数:
load_data_np -- number of workers when loading the dataset.
skip_export -- whether export the results into disk
skip_return -- skip return for API called.
- 返回:
processed dataset.
- class data_juicer.core.executor.PartitionedRayExecutor(cfg: Namespace | None = None)[源代码]#
基类:
ExecutorBase,DAGExecutionMixin,EventLoggingMixinSimplified Ray executor with dataset partitioning using .split().
Features: - Single DatasetBuilder loads the full dataset - Uses Ray's .split() method for partitioning - Processes partitions in parallel with Ray tasks - Supports convergence points for global operations - Merges results back into a single dataset
- run(load_data_np: Annotated[int, Gt(gt=0)] | None = None, skip_return=False)[源代码]#
Run the simplified partitioned dataset processing pipeline.
- 参数:
load_data_np -- Number of workers for loading dataset
skip_return -- Whether to skip returning the dataset
job_id -- Optional job ID to resume from checkpoints
- 返回:
Processed dataset