Source code for data_juicer.ops.pipeline.ray_vllm_pipeline

from typing import Optional

from data_juicer.ops.base_op import Pipeline
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.ray_utils import is_ray_mode

ray = LazyLoader("ray")


[docs] class RayVLLMEnginePipeline(Pipeline): """Pipeline for Ray vLLM engine.""" _accelerator = "cuda"
[docs] def __init__( self, accelerator_type: Optional[str] = None, *args, **kwargs, ): """ Initialization method. :param accelerator_type: The type of accelerator to use (e.g., "V100", "A100"). Default to None, meaning that only the CPU will be used. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self.accelerator_type = accelerator_type assert is_ray_mode(), "Ray vLLM engine only works in Ray mode." from ray.llm._internal.serve.core.configs.llm_config import GPUType if self.accelerator_type: all_accelerator_types = [t.value for t in GPUType] assert self.accelerator_type in all_accelerator_types, ( f"Unsupported accelerator type: {self.accelerator_type}. " f"Supported types are: {all_accelerator_types}" )
[docs] def run(self, dataset: ray.data.Dataset, *, exporter=None, tracer=None, reduce=True) -> ray.data.Dataset: raise NotImplementedError