data_juicer.ops.pipeline.llm_inference_with_ray_vllm_pipeline module#

class data_juicer.ops.pipeline.llm_inference_with_ray_vllm_pipeline.LLMRayVLLMEnginePipeline(api_or_hf_model: str = 'Qwen/Qwen2.5-7B-Instruct', is_hf_model: bool = True, *, system_prompt: str | None = None, accelerator_type: str | None = None, sampling_params: Dict | None = None, engine_kwargs: Dict | None = None, api_url: str = None, api_key: str = None, **kwargs)[source]#

Bases: RayVLLMEnginePipeline

Pipeline to generate response using vLLM engine on Ray. This pipeline leverages the vLLM engine for efficient large language model inference. More details about ray vLLM engine can be found at: https://docs.ray.io/en/latest/data/working-with-llms.html

__init__(api_or_hf_model: str = 'Qwen/Qwen2.5-7B-Instruct', is_hf_model: bool = True, *, system_prompt: str | None = None, accelerator_type: str | None = None, sampling_params: Dict | None = None, engine_kwargs: Dict | None = None, api_url: str = None, api_key: str = None, **kwargs)[source]#

Initialization method.

Parameters:
  • api_or_hf_model – API or huggingface model name.

  • system_prompt – System prompt for guiding the optimization task.

  • accelerator_type – The type of accelerator to use (e.g., “V100”, “A100”). Default to None, meaning that only the CPU will be used.

  • sampling_params – Sampling parameters for text generation (e.g., {‘temperature’: 0.9, ‘top_p’: 0.95}).

  • engine_kwargs – The kwargs to pass to the vLLM engine. See documentation for details: https://docs.vllm.ai/en/latest/api/vllm/engine/arg_utils/#vllm.engine.arg_utils.AsyncEngineArgs.

  • api_url – Base URL of the OpenAI API

  • api_key – API key for authentication

  • kwargs – Extra keyword arguments.

static preprocess_fn(row: Dict, query_key: str, system_prompt: str | None, sampling_params: Dict) Dict[source]#
static postprocess_fn(row: Dict, response_key: str, ori_columns: list) Dict[source]#
static preprocess_fn_api(row: Dict, model: str, query_key: str, system_prompt: str | None, sampling_params: Dict | None = None) Dict[source]#
static postprocess_fn_api(row: Dict, response_key: str, ori_columns: list) Dict[source]#
run(dataset, *, exporter=None, tracer=None, reduce=True)[source]#