data_juicer.ops.pipeline#

class data_juicer.ops.pipeline.LLMRayVLLMEnginePipeline(*args, **kwargs)[source]#

Bases: RayVLLMEnginePipeline

Pipeline to generate response using vLLM engine on Ray. This pipeline leverages the vLLM engine for efficient large language model inference. More details about ray vLLM engine can be found at: https://docs.ray.io/en/latest/data/working-with-llms.html

__init__(api_or_hf_model: str = 'Qwen/Qwen2.5-7B-Instruct', is_hf_model: bool = True, *, system_prompt: str | None = None, accelerator_type: str | None = None, sampling_params: Dict | None = None, engine_kwargs: Dict | None = None, api_url: str = None, api_key: str = None, **kwargs)[source]#

Initialization method.

Parameters:
  • api_or_hf_model – API or huggingface model name.

  • system_prompt – System prompt for guiding the optimization task.

  • accelerator_type – The type of accelerator to use (e.g., “V100”, “A100”). Default to None, meaning that only the CPU will be used.

  • sampling_params – Sampling parameters for text generation (e.g., {‘temperature’: 0.9, ‘top_p’: 0.95}).

  • engine_kwargs – The kwargs to pass to the vLLM engine. See documentation for details: https://docs.vllm.ai/en/latest/api/vllm/engine/arg_utils/#vllm.engine.arg_utils.AsyncEngineArgs.

  • api_url – Base URL of the OpenAI API

  • api_key – API key for authentication

  • kwargs – Extra keyword arguments.

static preprocess_fn(row: Dict, query_key: str, system_prompt: str | None, sampling_params: Dict) Dict[source]#
static postprocess_fn(row: Dict, response_key: str, ori_columns: list) Dict[source]#
static preprocess_fn_api(row: Dict, model: str, query_key: str, system_prompt: str | None, sampling_params: Dict | None = None) Dict[source]#
static postprocess_fn_api(row: Dict, response_key: str, ori_columns: list) Dict[source]#
run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#
class data_juicer.ops.pipeline.RayRepartitionPipeline(*args, **kwargs)[source]#

Bases: Pipeline

Repartition a Ray Dataset into a target number of blocks.

This operator performs dataset-level block repartitioning through Ray Dataset’s repartition API. It is intended for Ray executor pipelines only because local datasets do not expose Ray Dataset blocks.

__init__(num_blocks: int = 1, shuffle: bool = False, *args, **kwargs)[source]#

Initialization method.

Parameters:
  • num_blocks – target number of Ray Dataset blocks.

  • shuffle – whether to shuffle records during repartition.

run(dataset, *, exporter=None, tracer=None)[source]#
class data_juicer.ops.pipeline.VLMRayVLLMEnginePipeline(*args, **kwargs)[source]#

Bases: RayVLLMEnginePipeline

Pipeline to generate response using vLLM engine on Ray. This pipeline leverages the vLLM engine for efficient large vision language model inference. More details about ray vLLM engine can be found at: https://docs.ray.io/en/latest/data/working-with-llms.html

__init__(api_or_hf_model: str = 'Qwen/Qwen2.5-7B-Instruct', is_hf_model: bool = True, *, system_prompt: str | None = None, accelerator_type: str | None = None, sampling_params: Dict | None = None, engine_kwargs: Dict | None = None, **kwargs)[source]#

Initialization method.

Parameters:
  • api_or_hf_model – API or huggingface model name.

  • system_prompt – System prompt for guiding the optimization task.

  • accelerator_type – The type of accelerator to use (e.g., “V100”, “A100”). Default to None, meaning that only the CPU will be used.

  • sampling_params – Sampling parameters for text generation (e.g., {‘temperature’: 0.9, ‘top_p’: 0.95}).

  • engine_kwargs – The kwargs to pass to the vLLM engine. See documentation for details: https://docs.vllm.ai/en/latest/api/vllm/engine/arg_utils/#vllm.engine.arg_utils.AsyncEngineArgs.

  • kwargs – Extra keyword arguments.

static vision_preprocess(row: dict, query_key: str, image_key: str, system_prompt: str | None, sampling_params: Dict) dict[source]#

Preprocessing function for vision-language model inputs.

static postprocess_fn(row: Dict, response_key: str, ori_columns: list) Dict[source]#
run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#