data_juicer.ops.base_op module#

data_juicer.ops.base_op.convert_list_dict_to_dict_list(samples)[source]#
data_juicer.ops.base_op.convert_dict_list_to_list_dict(samples)[source]#
data_juicer.ops.base_op.convert_arrow_to_python(method)[source]#
data_juicer.ops.base_op.catch_map_batches_exception(method, skip_op_error=False, op_name=None)[source]#

For batched-map sample-level fault tolerance.

data_juicer.ops.base_op.catch_map_single_exception(method, return_sample=True, skip_op_error=False, op_name=None)[source]#

For single-map sample-level fault tolerance. The input sample is expected batch_size = 1.

class data_juicer.ops.base_op.OP(*args, **kwargs)[source]#

Bases: object

__init__(*args, **kwargs)[source]#

Base class of operators.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed.

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

  • index_key โ€“ index the samples before process if not None

  • system_key โ€“ the key name of field that stores system prompts

  • instruction_key โ€“ the key name of field that stores instruction

  • index_key โ€“ the key name of field that stores index

  • batch_size โ€“ the batch size for processing

  • work_dir โ€“ the working directory for this operator

  • skip_op_error โ€“ whether to skip the error when processing samples

# Ray related parameters :param num_cpus: number of CPUs required for this operator, only used when

running in Ray mode

Parameters:
  • num_gpus โ€“ number of GPUs required for this operator, only used when running in Ray mode

  • memory โ€“ memory size required for this operator, only used when running in Ray mode

  • runtime_env โ€“ runtime environment for this operator, only used when running in Ray mode. More details can be found in Ray documentation.

  • ray_execution_mode โ€“ execution mode in Ray, can be โ€œactorโ€ or โ€œtaskโ€ or None, if None, the โ€œactorโ€ mode is used when the operator is a CUDA operator, and the โ€œtaskโ€ mode is used if the operator is a CPU operator.

use_auto_proc()[source]#
is_batched_op()[source]#
use_ray_actor()[source]#
process(*args, **kwargs)[source]#
use_cuda()[source]#
runtime_np()[source]#
remove_extra_parameters(param_dict, keys=None)[source]#

at the beginning of the init of the mapper op, call self.remove_extra_parameters(locals()) to get the init parameter dict of the op for convenience

add_parameters(init_parameter_dict, **extra_param_dict)[source]#

add parameters for each sample, need to keep extra_param_dict and init_parameter_dict unchanged.

run(dataset)[source]#
empty_history()[source]#
class data_juicer.ops.base_op.Mapper(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that conducts data editing.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed.

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

process_batched(samples, *args, **kwargs)[source]#
process_single(sample)[source]#

For sample level, sample โ€“> sample

Parameters:

sample โ€“ sample to process

Returns:

processed sample

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.base_op.Filter(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that removes specific info.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

  • min_closed_interval โ€“ whether the min_val of the specified filter range is a closed interval. Itโ€™s True by default.

  • max_closed_interval โ€“ whether the max_val of the specified filter range is a closed interval. Itโ€™s True by default.

  • reversed_range โ€“ whether to reverse the target range [min_val, max_val] to (-โˆž, min_val) or (max_val, +โˆž). Itโ€™s False by default.

get_keep_boolean(val, min_val=None, max_val=None)[source]#
compute_stats_batched(samples, *args, **kwargs)[source]#
process_batched(samples)[source]#
compute_stats_single(sample, context=False)[source]#

Compute stats for the sample which is used as a metric to decide whether to filter this sample.

Parameters:
  • sample โ€“ input sample.

  • context โ€“ whether to store context information of intermediate vars in the sample temporarily.

Returns:

sample with computed stats

process_single(sample)[source]#

For sample level, sample โ€“> Boolean.

Parameters:

sample โ€“ sample to decide whether to filter

Returns:

true for keeping and false for filtering

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#
class data_juicer.ops.base_op.Deduplicator(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that conducts deduplication.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

compute_hash(sample)[source]#

Compute hash values for the sample.

Parameters:

sample โ€“ input sample

Returns:

sample with computed hash value.

process(dataset, show_num=0)[source]#

For doc-level, dataset โ€“> dataset.

Parameters:
  • dataset โ€“ input dataset

  • show_num โ€“ number of traced samples used when tracer is open.

Returns:

deduplicated dataset and the sampled duplicate pairs.

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#
class data_juicer.ops.base_op.Selector(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that conducts selection in dataset-level.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

process(dataset)[source]#

Dataset โ€“> dataset.

Parameters:

dataset โ€“ input dataset

Returns:

selected dataset.

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.base_op.Grouper(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that group samples.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

process(dataset)[source]#

Dataset โ€“> dataset.

Parameters:

dataset โ€“ input dataset

Returns:

dataset of batched samples.

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.base_op.Aggregator(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that group samples.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

process_single(sample)[source]#

For sample level, batched sample โ€“> sample, the input must be the output of some Grouper OP.

Parameters:

sample โ€“ batched sample to aggregate

Returns:

aggregated sample

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.base_op.Pipeline(*args, **kwargs)[source]#

Bases: OP

Base class for Operators that represent a data processing pipeline.

__init__(*args, **kwargs)[source]#

Base class of operators.

Parameters:
  • text_key โ€“ the key name of field that stores sample texts to be processed.

  • image_key โ€“ the key name of field that stores sample image list to be processed

  • audio_key โ€“ the key name of field that stores sample audio list to be processed

  • video_key โ€“ the key name of field that stores sample video list to be processed

  • image_bytes_key โ€“ the key name of field that stores sample image bytes list to be processed

  • query_key โ€“ the key name of field that stores sample queries

  • response_key โ€“ the key name of field that stores responses

  • history_key โ€“ the key name of field that stores history of queries and responses

  • index_key โ€“ index the samples before process if not None

  • batch_size โ€“ the batch size for processing

run(dataset)[source]#