data_juicer.ops#

data_juicer.ops.load_ops(process_list, op_env_manager=None)[source]#

Load op list according to the process list from config file.

Parameters:
  • process_list – A process list. Each item is an op name and its arguments.

  • op_env_manager – The OPEnvManager to try to merge environment specs of different OPs that have common dependencies. Only available when min_common_dep_num_to_combine >= 0.

Returns:

The op instance list.

class data_juicer.ops.Filter(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that removes specific info.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

  • min_closed_interval – whether the min_val of the specified filter range is a closed interval. It’s True by default.

  • max_closed_interval – whether the max_val of the specified filter range is a closed interval. It’s True by default.

  • reversed_range – whether to reverse the target range [min_val, max_val] to (-∞, min_val) or (max_val, +∞). It’s False by default.

get_keep_boolean(val, min_val=None, max_val=None)[source]#
compute_stats_batched(samples, *args, **kwargs)[source]#
process_batched(samples)[source]#
compute_stats_single(sample, context=False)[source]#

Compute stats for the sample which is used as a metric to decide whether to filter this sample.

Parameters:
  • sample – input sample.

  • context – whether to store context information of intermediate vars in the sample temporarily.

Returns:

sample with computed stats

process_single(sample)[source]#

For sample level, sample –> Boolean.

Parameters:

sample – sample to decide whether to filter

Returns:

true for keeping and false for filtering

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#
class data_juicer.ops.Mapper(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that conducts data editing.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_batched(samples, *args, **kwargs)[source]#
process_single(sample)[source]#

For sample level, sample –> sample

Parameters:

sample – sample to process

Returns:

processed sample

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.Deduplicator(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that conducts deduplication.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

compute_hash(sample)[source]#

Compute hash values for the sample.

Parameters:

sample – input sample

Returns:

sample with computed hash value.

process(dataset, show_num=0)[source]#

For doc-level, dataset –> dataset.

Parameters:
  • dataset – input dataset

  • show_num – number of traced samples used when tracer is open.

Returns:

deduplicated dataset and the sampled duplicate pairs.

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#
class data_juicer.ops.Selector(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that conducts selection in dataset-level.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]#

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

selected dataset.

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.Grouper(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]#

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

dataset of batched samples.

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.Aggregator(*args, **kwargs)[source]#

Bases: OP

__init__(*args, **kwargs)[source]#

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_single(sample)[source]#

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.Pipeline(*args, **kwargs)[source]#

Bases: OP

Base class for Operators that represent a data processing pipeline.

__init__(*args, **kwargs)[source]#

Base class of operators.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • image_bytes_key – the key name of field that stores sample image bytes list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

  • index_key – index the samples before process if not None

  • batch_size – the batch size for processing

run(dataset)[source]#
class data_juicer.ops.OPEnvSpec(pip_pkgs: List[str] | str | None = None, env_vars: Dict[str, str] | None = None, working_dir: str | None = None, backend: str = 'uv', extra_env_params: Dict | None = None, parsed_requirements: Dict[str, Requirement] | None = None)[source]#

Bases: object

Specification of the environment dependencies for an operator.

__init__(pip_pkgs: List[str] | str | None = None, env_vars: Dict[str, str] | None = None, working_dir: str | None = None, backend: str = 'uv', extra_env_params: Dict | None = None, parsed_requirements: Dict[str, Requirement] | None = None)[source]#

Initialize an OPEnvSpec instance.

Parameters:
  • pip_pkgs – Pip packages to install, default is None. Could be a list or a str path to the requirement file

  • env_vars – Dictionary of environment variables, default is None

  • working_dir – Path to the working directory, default is None

  • backend – Package management backend, default is “uv”. Should be one of [“pip”, “uv”].

  • extra_env_params – Additional parameters dictionary passed to the ray runtime environment, default is None

  • parsed_requirements – a resolved version of requirements. It’s a dict of req_name-resolved_info, where the parsed package info includes version/url/…

to_dict()[source]#

Convert the OPEnvSpec instance to a dictionary.

Returns:

Dictionary representation of the OPEnvSpec instance

get_hash()[source]#
get_requirement_name_list()[source]#
data_juicer.ops.op_requirements_to_op_env_spec(op_name: str, requirements: List[str] | str | None = None, auto_recommended_requirements: List[str] | None = None) OPEnvSpec[source]#
class data_juicer.ops.OPEnvManager(min_common_dep_num_to_combine: int | None = -1, conflict_resolve_strategy: ConflictResolveStrategy | str = ConflictResolveStrategy.SPLIT)[source]#

Bases: object

OPEnvManager is a class that manages the environment dependencies for operators, including recording OP dependencies, resolving dependency conflicts, merging OP environments, and so on.

__init__(min_common_dep_num_to_combine: int | None = -1, conflict_resolve_strategy: ConflictResolveStrategy | str = ConflictResolveStrategy.SPLIT)[source]#

Initialize OPEnvManager instance.

Parameters:
  • min_common_dep_num_to_combine – The minimum number of common dependencies required to determine whether to merge two operation environment specifications. If set to -1, it means no combination of operation environments.

  • conflict_resolve_strategy – Strategy for resolving dependency conflicts, default is SPLIT strategy. SPLIT: Keep the two specs split when there is a conflict. OVERWRITE: Overwrite the existing dependency with one from the later OP. LATEST: Use the latest version of all specified dependency versions.

print_the_current_states()[source]#

Get the current states of OPEnvManager, including: - number of recorded OPs - number of used env specs - what OPs share the same env spec

Returns:

A dictionary containing the current states of OPEnvManager

record_op_env_spec(op_name: str, op_env_spec: OPEnvSpec)[source]#

Record the OP environment specification for an operator.

Parameters:
  • op_name – Name of the operator

  • op_env_spec – OP environment specification

merge_op_env_specs(new_env_spec: OPEnvSpec)[source]#

Merge the OP environment specification for an operator with existing OP environment specification.

Parameters:

new_env_spec – OP environment specification

can_combine_op_env_specs(first_env_spec: OPEnvSpec, second_env_spec: OPEnvSpec) bool[source]#

Check if two OP environment specifications can be combined.

Parameters:
  • first_env_spec – Existing OP environment specification

  • second_env_spec – New OP environment specification

Returns:

True if the two specifications can be combined, False otherwise

try_to_combine_op_env_specs(first_env_spec: OPEnvSpec, second_env_spec: OPEnvSpec)[source]#

Try to combine the OP environment specification for an operator with existing OP environment specification.

Parameters:
  • first_env_spec – Name of the operator

  • second_env_spec – OP environment specification

Returns:

True if the two specifications can be combined, False otherwise

get_op_env_spec(op_name: str) OPEnvSpec[source]#

Get the OP environment specification for an operator.

Parameters:

op_name – Name of the operator

Returns:

OP environment specification

data_juicer.ops.analyze_lazy_loaded_requirements(code_content: str) List[str][source]#
data_juicer.ops.analyze_lazy_loaded_requirements_for_code_file(code_file: str) List[str][source]#