data_juicer.ops.op_env module#

data_juicer.ops.op_env.parse_single_requirement(req_str: str)[source]#
data_juicer.ops.op_env.parse_requirements_list(req_list: List[str])[source]#
data_juicer.ops.op_env.parse_requirements_file(req_file: str)[source]#
class data_juicer.ops.op_env.Requirement(name: str | None = None, version: SpecifierSet | str | None = None, extras: List[str] = None, markers: str | None = None, url: str | None = None, is_editable: bool = False, is_local: bool = False, path: str | None = None)[source]#

Bases: object

A requirement for an operator.

name: str | None = None#
version: SpecifierSet | str | None = None#
extras: List[str] = None#
markers: str | None = None#
url: str | None = None#
is_editable: bool = False#
is_local: bool = False#
path: str | None = None#
__init__(name: str | None = None, version: SpecifierSet | str | None = None, extras: List[str] = None, markers: str | None = None, url: str | None = None, is_editable: bool = False, is_local: bool = False, path: str | None = None) None#
class data_juicer.ops.op_env.OPEnvSpec(pip_pkgs: List[str] | str | None = None, env_vars: Dict[str, str] | None = None, working_dir: str | None = None, backend: str = 'uv', extra_env_params: Dict | None = None, parsed_requirements: Dict[str, Requirement] | None = None)[source]#

Bases: object

Specification of the environment dependencies for an operator.

__init__(pip_pkgs: List[str] | str | None = None, env_vars: Dict[str, str] | None = None, working_dir: str | None = None, backend: str = 'uv', extra_env_params: Dict | None = None, parsed_requirements: Dict[str, Requirement] | None = None)[source]#

Initialize an OPEnvSpec instance.

Parameters:
  • pip_pkgs – Pip packages to install, default is None. Could be a list or a str path to the requirement file

  • env_vars – Dictionary of environment variables, default is None

  • working_dir – Path to the working directory, default is None

  • backend – Package management backend, default is “uv”. Should be one of [“pip”, “uv”].

  • extra_env_params – Additional parameters dictionary passed to the ray runtime environment, default is None

  • parsed_requirements – a resolved version of requirements. It’s a dict of req_name-resolved_info, where the parsed package info includes version/url/…

to_dict()[source]#

Convert the OPEnvSpec instance to a dictionary.

Returns:

Dictionary representation of the OPEnvSpec instance

get_hash()[source]#
get_requirement_name_list()[source]#
data_juicer.ops.op_env.op_requirements_to_op_env_spec(op_name: str, requirements: List[str] | str | None = None, auto_recommended_requirements: List[str] | None = None) OPEnvSpec[source]#
class data_juicer.ops.op_env.ConflictResolveStrategy(value)[source]#

Bases: Enum

SPLIT = 'split'#
OVERWRITE = 'overwrite'#
LATEST = 'latest'#
class data_juicer.ops.op_env.OPEnvManager(min_common_dep_num_to_combine: int | None = -1, conflict_resolve_strategy: ConflictResolveStrategy | str = ConflictResolveStrategy.SPLIT)[source]#

Bases: object

OPEnvManager is a class that manages the environment dependencies for operators, including recording OP dependencies, resolving dependency conflicts, merging OP environments, and so on.

__init__(min_common_dep_num_to_combine: int | None = -1, conflict_resolve_strategy: ConflictResolveStrategy | str = ConflictResolveStrategy.SPLIT)[source]#

Initialize OPEnvManager instance.

Parameters:
  • min_common_dep_num_to_combine – The minimum number of common dependencies required to determine whether to merge two operation environment specifications. If set to -1, it means no combination of operation environments.

  • conflict_resolve_strategy – Strategy for resolving dependency conflicts, default is SPLIT strategy. SPLIT: Keep the two specs split when there is a conflict. OVERWRITE: Overwrite the existing dependency with one from the later OP. LATEST: Use the latest version of all specified dependency versions.

print_the_current_states()[source]#

Get the current states of OPEnvManager, including: - number of recorded OPs - number of used env specs - what OPs share the same env spec

Returns:

A dictionary containing the current states of OPEnvManager

record_op_env_spec(op_name: str, op_env_spec: OPEnvSpec)[source]#

Record the OP environment specification for an operator.

Parameters:
  • op_name – Name of the operator

  • op_env_spec – OP environment specification

merge_op_env_specs(new_env_spec: OPEnvSpec)[source]#

Merge the OP environment specification for an operator with existing OP environment specification.

Parameters:

new_env_spec – OP environment specification

can_combine_op_env_specs(first_env_spec: OPEnvSpec, second_env_spec: OPEnvSpec) bool[source]#

Check if two OP environment specifications can be combined.

Parameters:
  • first_env_spec – Existing OP environment specification

  • second_env_spec – New OP environment specification

Returns:

True if the two specifications can be combined, False otherwise

try_to_combine_op_env_specs(first_env_spec: OPEnvSpec, second_env_spec: OPEnvSpec)[source]#

Try to combine the OP environment specification for an operator with existing OP environment specification.

Parameters:
  • first_env_spec – Name of the operator

  • second_env_spec – OP environment specification

Returns:

True if the two specifications can be combined, False otherwise

get_op_env_spec(op_name: str) OPEnvSpec[source]#

Get the OP environment specification for an operator.

Parameters:

op_name – Name of the operator

Returns:

OP environment specification

data_juicer.ops.op_env.analyze_lazy_loaded_requirements_for_code_file(code_file: str) List[str][source]#
data_juicer.ops.op_env.analyze_lazy_loaded_requirements(code_content: str) List[str][source]#