data_juicer.ops#
- data_juicer.ops.load_ops(process_list, op_env_manager=None)[源代码]#
Load op list according to the process list from config file.
- 参数:
process_list -- A process list. Each item is an op name and its arguments.
op_env_manager -- The OPEnvManager to try to merge environment specs of different OPs that have common dependencies. Only available when min_common_dep_num_to_combine >= 0.
- 返回:
The op instance list.
- class data_juicer.ops.Filter(*args, **kwargs)[源代码]#
基类:
OP- __init__(*args, **kwargs)[源代码]#
Base class that removes specific info.
- 参数:
text_key -- the key name of field that stores sample texts to be processed
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
min_closed_interval -- whether the min_val of the specified filter range is a closed interval. It's True by default.
max_closed_interval -- whether the max_val of the specified filter range is a closed interval. It's True by default.
reversed_range -- whether to reverse the target range [min_val, max_val] to (-∞, min_val) or (max_val, +∞). It's False by default.
- compute_stats_single(sample, context=False)[源代码]#
Compute stats for the sample which is used as a metric to decide whether to filter this sample.
- 参数:
sample -- input sample.
context -- whether to store context information of intermediate vars in the sample temporarily.
- 返回:
sample with computed stats
- class data_juicer.ops.Mapper(*args, **kwargs)[源代码]#
基类:
OP- __init__(*args, **kwargs)[源代码]#
Base class that conducts data editing.
- 参数:
text_key -- the key name of field that stores sample texts to be processed.
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
- class data_juicer.ops.Deduplicator(*args, **kwargs)[源代码]#
基类:
OP- __init__(*args, **kwargs)[源代码]#
Base class that conducts deduplication.
- 参数:
text_key -- the key name of field that stores sample texts to be processed
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
- compute_hash(sample)[源代码]#
Compute hash values for the sample.
- 参数:
sample -- input sample
- 返回:
sample with computed hash value.
- class data_juicer.ops.Selector(*args, **kwargs)[源代码]#
基类:
OP- __init__(*args, **kwargs)[源代码]#
Base class that conducts selection in dataset-level.
- 参数:
text_key -- the key name of field that stores sample texts to be processed
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
- class data_juicer.ops.Grouper(*args, **kwargs)[源代码]#
基类:
OP- __init__(*args, **kwargs)[源代码]#
Base class that group samples.
- 参数:
text_key -- the key name of field that stores sample texts to be processed
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
- class data_juicer.ops.Aggregator(*args, **kwargs)[源代码]#
基类:
OP- __init__(*args, **kwargs)[源代码]#
Base class that group samples.
- 参数:
text_key -- the key name of field that stores sample texts to be processed
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
- class data_juicer.ops.Pipeline(*args, **kwargs)[源代码]#
基类:
OPBase class for Operators that represent a data processing pipeline.
- __init__(*args, **kwargs)[源代码]#
Base class of operators.
- 参数:
text_key -- the key name of field that stores sample texts to be processed.
image_key -- the key name of field that stores sample image list to be processed
audio_key -- the key name of field that stores sample audio list to be processed
video_key -- the key name of field that stores sample video list to be processed
image_bytes_key -- the key name of field that stores sample image bytes list to be processed
query_key -- the key name of field that stores sample queries
response_key -- the key name of field that stores responses
history_key -- the key name of field that stores history of queries and responses
index_key -- index the samples before process if not None
batch_size -- the batch size for processing
- class data_juicer.ops.OPEnvSpec(pip_pkgs: List[str] | str | None = None, env_vars: Dict[str, str] | None = None, working_dir: str | None = None, backend: str = 'uv', extra_env_params: Dict | None = None, parsed_requirements: Dict[str, Requirement] | None = None)[源代码]#
基类:
objectSpecification of the environment dependencies for an operator.
- __init__(pip_pkgs: List[str] | str | None = None, env_vars: Dict[str, str] | None = None, working_dir: str | None = None, backend: str = 'uv', extra_env_params: Dict | None = None, parsed_requirements: Dict[str, Requirement] | None = None)[源代码]#
Initialize an OPEnvSpec instance.
- 参数:
pip_pkgs -- Pip packages to install, default is None. Could be a list or a str path to the requirement file
env_vars -- Dictionary of environment variables, default is None
working_dir -- Path to the working directory, default is None
backend -- Package management backend, default is "uv". Should be one of ["pip", "uv"].
extra_env_params -- Additional parameters dictionary passed to the ray runtime environment, default is None
parsed_requirements -- a resolved version of requirements. It's a dict of req_name-resolved_info, where the parsed package info includes version/url/...
- data_juicer.ops.op_requirements_to_op_env_spec(op_name: str, requirements: List[str] | str | None = None, auto_recommended_requirements: List[str] | None = None) OPEnvSpec[源代码]#
- class data_juicer.ops.OPEnvManager(min_common_dep_num_to_combine: int | None = -1, conflict_resolve_strategy: ConflictResolveStrategy | str = ConflictResolveStrategy.SPLIT)[源代码]#
基类:
objectOPEnvManager is a class that manages the environment dependencies for operators, including recording OP dependencies, resolving dependency conflicts, merging OP environments, and so on.
- __init__(min_common_dep_num_to_combine: int | None = -1, conflict_resolve_strategy: ConflictResolveStrategy | str = ConflictResolveStrategy.SPLIT)[源代码]#
Initialize OPEnvManager instance.
- 参数:
min_common_dep_num_to_combine -- The minimum number of common dependencies required to determine whether to merge two operation environment specifications. If set to -1, it means no combination of operation environments.
conflict_resolve_strategy -- Strategy for resolving dependency conflicts, default is SPLIT strategy. SPLIT: Keep the two specs split when there is a conflict. OVERWRITE: Overwrite the existing dependency with one from the later OP. LATEST: Use the latest version of all specified dependency versions.
- print_the_current_states()[源代码]#
Get the current states of OPEnvManager, including: - number of recorded OPs - number of used env specs - what OPs share the same env spec
- 返回:
A dictionary containing the current states of OPEnvManager
- record_op_env_spec(op_name: str, op_env_spec: OPEnvSpec)[源代码]#
Record the OP environment specification for an operator.
- 参数:
op_name -- Name of the operator
op_env_spec -- OP environment specification
- merge_op_env_specs(new_env_spec: OPEnvSpec)[源代码]#
Merge the OP environment specification for an operator with existing OP environment specification.
- 参数:
new_env_spec -- OP environment specification
- can_combine_op_env_specs(first_env_spec: OPEnvSpec, second_env_spec: OPEnvSpec) bool[源代码]#
Check if two OP environment specifications can be combined.
- 参数:
first_env_spec -- Existing OP environment specification
second_env_spec -- New OP environment specification
- 返回:
True if the two specifications can be combined, False otherwise
- try_to_combine_op_env_specs(first_env_spec: OPEnvSpec, second_env_spec: OPEnvSpec)[源代码]#
Try to combine the OP environment specification for an operator with existing OP environment specification.
- 参数:
first_env_spec -- Name of the operator
second_env_spec -- OP environment specification
- 返回:
True if the two specifications can be combined, False otherwise