From 09712dcca84c71dfd50361e0ce403357ed49047f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Thu, 26 Sep 2024 14:51:51 +0200 Subject: [PATCH 1/7] Priority in async engine --- vllm/engine/async_llm_engine.py | 212 ++++++++++++++++++++------------ 1 file changed, 134 insertions(+), 78 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 34e7e05341f02..a4e4d8c2cbadf 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -2,13 +2,31 @@ import time import weakref from functools import partial -from typing import (Any, AsyncGenerator, Callable, Dict, Iterable, List, - Mapping, Optional, Set, Tuple, Type, Union) +from typing import ( + Any, + AsyncGenerator, + Callable, + Dict, + Iterable, + List, + Mapping, + Optional, + Set, + Tuple, + Type, + Union, +) from weakref import ReferenceType import vllm.envs as envs -from vllm.config import (DecodingConfig, EngineConfig, LoRAConfig, ModelConfig, - ParallelConfig, SchedulerConfig) +from vllm.config import ( + DecodingConfig, + EngineConfig, + LoRAConfig, + ModelConfig, + ParallelConfig, + SchedulerConfig, +) from vllm.core.scheduler import SchedulerOutputs from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout @@ -98,7 +116,7 @@ def finished(self) -> bool: return self._finished async def generator( - self + self, ) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]: try: while True: @@ -114,9 +132,9 @@ async def generator( @staticmethod def _is_raisable(value: Any): - return isinstance(value, BaseException) or \ - (isinstance(value, type) and \ - issubclass(value, BaseException)) + return isinstance( + value, BaseException) or (isinstance(value, type) + and issubclass(value, BaseException)) class RequestTracker: @@ -126,7 +144,7 @@ def __init__(self) -> None: self._request_streams: Dict[str, AsyncStream] = {} self._aborted_requests: asyncio.Queue[str] = asyncio.Queue() self._new_requests: asyncio.Queue[Tuple[AsyncStream, - dict]] = asyncio.Queue() + dict]] = (asyncio.Queue()) self.new_requests_event = asyncio.Event() def __contains__(self, item): @@ -148,11 +166,12 @@ def propagate_exception(self, for rid in tuple(self._request_streams.keys()): self.abort_request(rid, exception=exc) - def process_request_output(self, - request_output: Union[RequestOutput, - EmbeddingRequestOutput], - *, - verbose: bool = False) -> None: + def process_request_output( + self, + request_output: Union[RequestOutput, EmbeddingRequestOutput], + *, + verbose: bool = False, + ) -> None: """Process a request output from the engine.""" request_id = request_output.request_id finished = request_output.finished @@ -171,21 +190,25 @@ def process_request_output(self, if verbose and finished: logger.info("Finished request %s.", request_id) - def process_exception(self, - request_id: str, - exception: BaseException, - *, - verbose: bool = False) -> None: + def process_exception( + self, + request_id: str, + exception: BaseException, + *, + verbose: bool = False, + ) -> None: """Propagate an exception from the engine.""" if verbose: logger.info("Finished request %s.", request_id) self.abort_request(request_id, exception=exception) - def add_request(self, - request_id: str, - *, - verbose: bool = False, - **engine_add_request_kwargs) -> AsyncStream: + def add_request( + self, + request_id: str, + *, + verbose: bool = False, + **engine_add_request_kwargs, + ) -> AsyncStream: """Add a request to be sent to the engine on the next background loop iteration.""" if request_id in self._request_streams: @@ -205,12 +228,13 @@ def add_request(self, return stream - def abort_request(self, - request_id: str, - *, - exception: Optional[Union[BaseException, - Type[BaseException]]] = None, - verbose: bool = False) -> None: + def abort_request( + self, + request_id: str, + *, + exception: Optional[Union[BaseException, Type[BaseException]]] = None, + verbose: bool = False, + ) -> None: """Abort a request during next background loop iteration.""" if verbose: logger.info("Aborted request %s.", request_id) @@ -287,11 +311,12 @@ async def step_async( # This ensures that the scheduler is only called again when the current # batch has completed. if not self._has_remaining_steps(seq_group_metadata_list): - # Schedule iteration - (seq_group_metadata_list, scheduler_outputs, - allow_async_output_proc - ) = self.scheduler[virtual_engine].schedule() + ( + seq_group_metadata_list, + scheduler_outputs, + allow_async_output_proc, + ) = self.scheduler[virtual_engine].schedule() ctx.seq_group_metadata_list = seq_group_metadata_list ctx.scheduler_outputs = scheduler_outputs @@ -305,8 +330,11 @@ async def step_async( # cache the scheduler outputs for the next iteration if we have # lookahead slots self._cache_scheduler_outputs_for_multi_step( - virtual_engine, seq_group_metadata_list, scheduler_outputs, - allow_async_output_proc) + virtual_engine, + seq_group_metadata_list, + scheduler_outputs, + allow_async_output_proc, + ) assert seq_group_metadata_list is not None assert scheduler_outputs is not None @@ -319,8 +347,8 @@ async def step_async( # For supporting PP this is probably the best way to pass the # sampled_token_ids, as a separate broadcast over all the PP stages # will cause one virtual engine's microbatch to block the pipeline. - last_sampled_token_ids = \ - self._get_last_sampled_token_ids(virtual_engine) + last_sampled_token_ids = self._get_last_sampled_token_ids( + virtual_engine) execute_model_req = ExecuteModelRequest( seq_group_metadata_list=seq_group_metadata_list, @@ -333,7 +361,8 @@ async def step_async( finished_requests_ids=finished_requests_ids, # We use ExecuteModelRequest to pass the last sampled_token_ids # to each of the non-last PP stages for in-place prepare_input. - last_sampled_token_ids=last_sampled_token_ids) + last_sampled_token_ids=last_sampled_token_ids, + ) if allow_async_output_proc: execute_model_req.async_callback = self.async_callbacks[ @@ -360,22 +389,26 @@ async def step_async( if not self._has_remaining_steps(seq_group_metadata_list): # Clear the cache if we have finished all the steps if self.scheduler_config.is_multi_step: - self.cached_scheduler_outputs[ - virtual_engine] = SchedulerOutputState() + self.cached_scheduler_outputs[virtual_engine] = ( + SchedulerOutputState()) - ctx.append_output(outputs=outputs, - seq_group_metadata_list=seq_group_metadata_list, - scheduler_outputs=scheduler_outputs, - is_async=allow_async_output_proc, - is_last_step=True) + ctx.append_output( + outputs=outputs, + seq_group_metadata_list=seq_group_metadata_list, + scheduler_outputs=scheduler_outputs, + is_async=allow_async_output_proc, + is_last_step=True, + ) if outputs and allow_async_output_proc: - assert len( - outputs - ) == 1, "Async postprocessor expects only a single output set" + assert ( + len(outputs) == 1 + ), "Async postprocessor expects only a single output set" self._advance_to_next_step( - outputs[0], seq_group_metadata_list, - scheduler_outputs.scheduled_seq_groups) + outputs[0], + seq_group_metadata_list, + scheduler_outputs.scheduled_seq_groups, + ) if not allow_async_output_proc: self._process_model_outputs(ctx=ctx) @@ -411,11 +444,17 @@ async def add_request_async( lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, prompt_adapter_request: Optional[PromptAdapterRequest] = None, + priority: int = 0, ) -> None: """Async version of :meth:`add_request`.""" if lora_request is not None and not self.lora_config: raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") + + if priority > 0 and not self.scheduler_config.policy == "priority": + raise ValueError(f"Got priority {priority} but " + "Priority scheduling is not enabled.") + if arrival_time is None: arrival_time = time.time() @@ -435,6 +474,7 @@ async def add_request_async( lora_request=lora_request, prompt_adapter_request=prompt_adapter_request, trace_headers=trace_headers, + priority=priority, ) async def check_health_async(self) -> None: @@ -462,11 +502,13 @@ class AsyncLLMEngine: _engine_class: Type[_AsyncLLMEngine] = _AsyncLLMEngine - def __init__(self, - *args, - log_requests: bool = True, - start_engine_loop: bool = True, - **kwargs) -> None: + def __init__( + self, + *args, + log_requests: bool = True, + start_engine_loop: bool = True, + **kwargs, + ) -> None: self.log_requests = log_requests self.engine = self._engine_class(*args, **kwargs) @@ -477,8 +519,8 @@ def __init__(self, self.engine.model_config.use_async_output_proc) if self.use_process_request_outputs_callback: - self.engine.process_request_outputs_callback = \ - weak_bind(self.process_request_outputs) + self.engine.process_request_outputs_callback = weak_bind( + self.process_request_outputs) self.background_loop: Optional[asyncio.Future] = None # We need to keep a reference to unshielded @@ -509,47 +551,58 @@ def _get_executor_cls( executor_class = distributed_executor_backend elif engine_config.device_config.device_type == "neuron": from vllm.executor.neuron_executor import NeuronExecutorAsync + executor_class = NeuronExecutorAsync elif engine_config.device_config.device_type == "tpu": if distributed_executor_backend == "ray": from vllm.executor.ray_tpu_executor import RayTPUExecutorAsync + executor_class = RayTPUExecutorAsync else: assert distributed_executor_backend is None from vllm.executor.tpu_executor import TPUExecutorAsync + executor_class = TPUExecutorAsync elif engine_config.device_config.device_type == "cpu": from vllm.executor.cpu_executor import CPUExecutorAsync + executor_class = CPUExecutorAsync elif engine_config.device_config.device_type == "openvino": assert distributed_executor_backend is None, ( "Distributed execution is not supported with " "the OpenVINO backend.") from vllm.executor.openvino_executor import OpenVINOExecutorAsync + executor_class = OpenVINOExecutorAsync elif engine_config.device_config.device_type == "xpu": if distributed_executor_backend is None: from vllm.executor.xpu_executor import XPUExecutorAsync + executor_class = XPUExecutorAsync elif distributed_executor_backend == "ray": from vllm.executor.ray_xpu_executor import RayXPUExecutorAsync + executor_class = RayXPUExecutorAsync elif distributed_executor_backend == "mp": from vllm.executor.multiproc_xpu_executor import ( - MultiprocessingXPUExecutorAsync) + MultiprocessingXPUExecutorAsync, ) + executor_class = MultiprocessingXPUExecutorAsync else: raise RuntimeError( "Not supported distributed execution model on XPU device.") elif distributed_executor_backend == "ray": from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync + executor_class = RayGPUExecutorAsync elif distributed_executor_backend == "mp": from vllm.executor.multiproc_gpu_executor import ( - MultiprocessingGPUExecutorAsync) + MultiprocessingGPUExecutorAsync, ) + executor_class = MultiprocessingGPUExecutorAsync else: from vllm.executor.gpu_executor import GPUExecutorAsync + executor_class = GPUExecutorAsync return executor_class @@ -619,8 +672,8 @@ async def get_tokenizer( self, lora_request: Optional[LoRARequest] = None, ) -> AnyTokenizer: - return await (self.engine.get_tokenizer_group(). - get_lora_tokenizer_async(lora_request)) + return await self.engine.get_tokenizer_group( + ).get_lora_tokenizer_async(lora_request) def start_background_loop(self) -> None: """Start the background loop.""" @@ -711,8 +764,8 @@ async def run_engine_loop(engine_ref: ReferenceType): if not engine: return - pipeline_parallel_size = \ - engine.engine.parallel_config.pipeline_parallel_size + pipeline_parallel_size = ( + engine.engine.parallel_config.pipeline_parallel_size) has_requests_in_progress = [False] * pipeline_parallel_size while True: if not any(has_requests_in_progress): @@ -748,16 +801,15 @@ async def run_engine_loop(engine_ref: ReferenceType): async with asyncio_timeout(ENGINE_ITERATION_TIMEOUT_S): done, _ = await asyncio.wait( requests_in_progress, - return_when=asyncio.FIRST_COMPLETED) + return_when=asyncio.FIRST_COMPLETED, + ) for _ in range(pipeline_parallel_size): await asyncio.sleep(0) for task in done: result = task.result() virtual_engine = requests_in_progress.index(task) - has_unfinished_requests = ( - engine.engine. - has_unfinished_requests_for_virtual_engine( - virtual_engine)) + has_unfinished_requests = engine.engine.has_unfinished_requests_for_virtual_engine( + virtual_engine) if result or has_unfinished_requests: requests_in_progress[virtual_engine] = ( asyncio.create_task( @@ -782,7 +834,7 @@ async def add_request( arrival_time: Optional[float] = None, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, - prompt_adapter_request: Optional[PromptAdapterRequest] = None + prompt_adapter_request: Optional[PromptAdapterRequest] = None, ) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]: if not self.is_running: if self.start_engine_loop: @@ -802,7 +854,8 @@ async def add_request( arrival_time=arrival_time or time.time(), lora_request=lora_request, trace_headers=trace_headers, - prompt_adapter_request=prompt_adapter_request) + prompt_adapter_request=prompt_adapter_request, + ) return stream.generator() @@ -813,7 +866,7 @@ async def generate( request_id: str, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, - prompt_adapter_request: Optional[PromptAdapterRequest] = None + prompt_adapter_request: Optional[PromptAdapterRequest] = None, ) -> AsyncGenerator[RequestOutput, None]: """Generate outputs for a request. @@ -993,9 +1046,11 @@ def _abort(self, request_id: str) -> None: Args: request_id: The unique id of the request. """ - self._request_tracker.abort_request(request_id, - exception=asyncio.CancelledError, - verbose=self.log_requests) + self._request_tracker.abort_request( + request_id, + exception=asyncio.CancelledError, + verbose=self.log_requests, + ) async def get_model_config(self) -> ModelConfig: """Get the model configuration of the vLLM engine.""" @@ -1018,9 +1073,10 @@ async def get_lora_config(self) -> LoRAConfig: return self.engine.get_lora_config() async def do_log_stats( - self, - scheduler_outputs: Optional[SchedulerOutputs] = None, - model_output: Optional[List[SamplerOutput]] = None) -> None: + self, + scheduler_outputs: Optional[SchedulerOutputs] = None, + model_output: Optional[List[SamplerOutput]] = None, + ) -> None: self.engine.do_log_stats() async def check_health(self) -> None: From ed32cb9c8fcd1aa765c60bda9b5288f49acd9b2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Thu, 26 Sep 2024 14:58:40 +0200 Subject: [PATCH 2/7] Formatting --- vllm/engine/async_llm_engine.py | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index a4e4d8c2cbadf..b3eb5a7a820f1 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -2,31 +2,13 @@ import time import weakref from functools import partial -from typing import ( - Any, - AsyncGenerator, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Set, - Tuple, - Type, - Union, -) +from typing import (Any, AsyncGenerator, Callable, Dict, Iterable, List, + Mapping, Optional, Set, Tuple, Type, Union) from weakref import ReferenceType import vllm.envs as envs -from vllm.config import ( - DecodingConfig, - EngineConfig, - LoRAConfig, - ModelConfig, - ParallelConfig, - SchedulerConfig, -) +from vllm.config import (DecodingConfig, EngineConfig, LoRAConfig, ModelConfig, + ParallelConfig, SchedulerConfig) from vllm.core.scheduler import SchedulerOutputs from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout @@ -585,7 +567,7 @@ def _get_executor_cls( executor_class = RayXPUExecutorAsync elif distributed_executor_backend == "mp": from vllm.executor.multiproc_xpu_executor import ( - MultiprocessingXPUExecutorAsync, ) + MultiprocessingXPUExecutorAsync) executor_class = MultiprocessingXPUExecutorAsync else: @@ -597,7 +579,7 @@ def _get_executor_cls( executor_class = RayGPUExecutorAsync elif distributed_executor_backend == "mp": from vllm.executor.multiproc_gpu_executor import ( - MultiprocessingGPUExecutorAsync, ) + MultiprocessingGPUExecutorAsync) executor_class = MultiprocessingGPUExecutorAsync else: From 57c7b4e4c978bbb3e5961ba60c824754a4da577e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Thu, 26 Sep 2024 15:07:34 +0200 Subject: [PATCH 3/7] More formatting --- vllm/engine/async_llm_engine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index b3eb5a7a820f1..e5c85fa51a2c0 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -790,8 +790,10 @@ async def run_engine_loop(engine_ref: ReferenceType): for task in done: result = task.result() virtual_engine = requests_in_progress.index(task) - has_unfinished_requests = engine.engine.has_unfinished_requests_for_virtual_engine( - virtual_engine) + has_unfinished_requests = ( + engine.engine. + has_unfinished_requests_for_virtual_engine( + virtual_engine)) if result or has_unfinished_requests: requests_in_progress[virtual_engine] = ( asyncio.create_task( From feb30b9d3849de6d1408db471eb09a79349170ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Thu, 26 Sep 2024 15:18:07 +0200 Subject: [PATCH 4/7] Rollback of unnecessary formatting --- vllm/engine/async_llm_engine.py | 171 +++++++++++++------------------- 1 file changed, 68 insertions(+), 103 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index e5c85fa51a2c0..7cba3a05083a7 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -98,7 +98,7 @@ def finished(self) -> bool: return self._finished async def generator( - self, + self ) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]: try: while True: @@ -114,9 +114,9 @@ async def generator( @staticmethod def _is_raisable(value: Any): - return isinstance( - value, BaseException) or (isinstance(value, type) - and issubclass(value, BaseException)) + return isinstance(value, BaseException) or \ + (isinstance(value, type) and \ + issubclass(value, BaseException)) class RequestTracker: @@ -126,7 +126,7 @@ def __init__(self) -> None: self._request_streams: Dict[str, AsyncStream] = {} self._aborted_requests: asyncio.Queue[str] = asyncio.Queue() self._new_requests: asyncio.Queue[Tuple[AsyncStream, - dict]] = (asyncio.Queue()) + dict]] = asyncio.Queue() self.new_requests_event = asyncio.Event() def __contains__(self, item): @@ -148,12 +148,11 @@ def propagate_exception(self, for rid in tuple(self._request_streams.keys()): self.abort_request(rid, exception=exc) - def process_request_output( - self, - request_output: Union[RequestOutput, EmbeddingRequestOutput], - *, - verbose: bool = False, - ) -> None: + def process_request_output(self, + request_output: Union[RequestOutput, + EmbeddingRequestOutput], + *, + verbose: bool = False) -> None: """Process a request output from the engine.""" request_id = request_output.request_id finished = request_output.finished @@ -172,25 +171,21 @@ def process_request_output( if verbose and finished: logger.info("Finished request %s.", request_id) - def process_exception( - self, - request_id: str, - exception: BaseException, - *, - verbose: bool = False, - ) -> None: + def process_exception(self, + request_id: str, + exception: BaseException, + *, + verbose: bool = False) -> None: """Propagate an exception from the engine.""" if verbose: logger.info("Finished request %s.", request_id) self.abort_request(request_id, exception=exception) - def add_request( - self, - request_id: str, - *, - verbose: bool = False, - **engine_add_request_kwargs, - ) -> AsyncStream: + def add_request(self, + request_id: str, + *, + verbose: bool = False, + **engine_add_request_kwargs) -> AsyncStream: """Add a request to be sent to the engine on the next background loop iteration.""" if request_id in self._request_streams: @@ -210,13 +205,12 @@ def add_request( return stream - def abort_request( - self, - request_id: str, - *, - exception: Optional[Union[BaseException, Type[BaseException]]] = None, - verbose: bool = False, - ) -> None: + def abort_request(self, + request_id: str, + *, + exception: Optional[Union[BaseException, + Type[BaseException]]] = None, + verbose: bool = False) -> None: """Abort a request during next background loop iteration.""" if verbose: logger.info("Aborted request %s.", request_id) @@ -293,12 +287,11 @@ async def step_async( # This ensures that the scheduler is only called again when the current # batch has completed. if not self._has_remaining_steps(seq_group_metadata_list): + # Schedule iteration - ( - seq_group_metadata_list, - scheduler_outputs, - allow_async_output_proc, - ) = self.scheduler[virtual_engine].schedule() + (seq_group_metadata_list, scheduler_outputs, + allow_async_output_proc + ) = self.scheduler[virtual_engine].schedule() ctx.seq_group_metadata_list = seq_group_metadata_list ctx.scheduler_outputs = scheduler_outputs @@ -312,11 +305,8 @@ async def step_async( # cache the scheduler outputs for the next iteration if we have # lookahead slots self._cache_scheduler_outputs_for_multi_step( - virtual_engine, - seq_group_metadata_list, - scheduler_outputs, - allow_async_output_proc, - ) + virtual_engine, seq_group_metadata_list, scheduler_outputs, + allow_async_output_proc) assert seq_group_metadata_list is not None assert scheduler_outputs is not None @@ -329,8 +319,8 @@ async def step_async( # For supporting PP this is probably the best way to pass the # sampled_token_ids, as a separate broadcast over all the PP stages # will cause one virtual engine's microbatch to block the pipeline. - last_sampled_token_ids = self._get_last_sampled_token_ids( - virtual_engine) + last_sampled_token_ids = \ + self._get_last_sampled_token_ids(virtual_engine) execute_model_req = ExecuteModelRequest( seq_group_metadata_list=seq_group_metadata_list, @@ -343,8 +333,7 @@ async def step_async( finished_requests_ids=finished_requests_ids, # We use ExecuteModelRequest to pass the last sampled_token_ids # to each of the non-last PP stages for in-place prepare_input. - last_sampled_token_ids=last_sampled_token_ids, - ) + last_sampled_token_ids=last_sampled_token_ids) if allow_async_output_proc: execute_model_req.async_callback = self.async_callbacks[ @@ -371,26 +360,22 @@ async def step_async( if not self._has_remaining_steps(seq_group_metadata_list): # Clear the cache if we have finished all the steps if self.scheduler_config.is_multi_step: - self.cached_scheduler_outputs[virtual_engine] = ( - SchedulerOutputState()) + self.cached_scheduler_outputs[ + virtual_engine] = SchedulerOutputState() - ctx.append_output( - outputs=outputs, - seq_group_metadata_list=seq_group_metadata_list, - scheduler_outputs=scheduler_outputs, - is_async=allow_async_output_proc, - is_last_step=True, - ) + ctx.append_output(outputs=outputs, + seq_group_metadata_list=seq_group_metadata_list, + scheduler_outputs=scheduler_outputs, + is_async=allow_async_output_proc, + is_last_step=True) if outputs and allow_async_output_proc: - assert ( - len(outputs) == 1 - ), "Async postprocessor expects only a single output set" + assert len( + outputs + ) == 1, "Async postprocessor expects only a single output set" self._advance_to_next_step( - outputs[0], - seq_group_metadata_list, - scheduler_outputs.scheduled_seq_groups, - ) + outputs[0], seq_group_metadata_list, + scheduler_outputs.scheduled_seq_groups) if not allow_async_output_proc: self._process_model_outputs(ctx=ctx) @@ -432,11 +417,9 @@ async def add_request_async( if lora_request is not None and not self.lora_config: raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") - if priority > 0 and not self.scheduler_config.policy == "priority": raise ValueError(f"Got priority {priority} but " "Priority scheduling is not enabled.") - if arrival_time is None: arrival_time = time.time() @@ -484,13 +467,11 @@ class AsyncLLMEngine: _engine_class: Type[_AsyncLLMEngine] = _AsyncLLMEngine - def __init__( - self, - *args, - log_requests: bool = True, - start_engine_loop: bool = True, - **kwargs, - ) -> None: + def __init__(self, + *args, + log_requests: bool = True, + start_engine_loop: bool = True, + **kwargs) -> None: self.log_requests = log_requests self.engine = self._engine_class(*args, **kwargs) @@ -501,8 +482,8 @@ def __init__( self.engine.model_config.use_async_output_proc) if self.use_process_request_outputs_callback: - self.engine.process_request_outputs_callback = weak_bind( - self.process_request_outputs) + self.engine.process_request_outputs_callback = \ + weak_bind(self.process_request_outputs) self.background_loop: Optional[asyncio.Future] = None # We need to keep a reference to unshielded @@ -533,58 +514,47 @@ def _get_executor_cls( executor_class = distributed_executor_backend elif engine_config.device_config.device_type == "neuron": from vllm.executor.neuron_executor import NeuronExecutorAsync - executor_class = NeuronExecutorAsync elif engine_config.device_config.device_type == "tpu": if distributed_executor_backend == "ray": from vllm.executor.ray_tpu_executor import RayTPUExecutorAsync - executor_class = RayTPUExecutorAsync else: assert distributed_executor_backend is None from vllm.executor.tpu_executor import TPUExecutorAsync - executor_class = TPUExecutorAsync elif engine_config.device_config.device_type == "cpu": from vllm.executor.cpu_executor import CPUExecutorAsync - executor_class = CPUExecutorAsync elif engine_config.device_config.device_type == "openvino": assert distributed_executor_backend is None, ( "Distributed execution is not supported with " "the OpenVINO backend.") from vllm.executor.openvino_executor import OpenVINOExecutorAsync - executor_class = OpenVINOExecutorAsync elif engine_config.device_config.device_type == "xpu": if distributed_executor_backend is None: from vllm.executor.xpu_executor import XPUExecutorAsync - executor_class = XPUExecutorAsync elif distributed_executor_backend == "ray": from vllm.executor.ray_xpu_executor import RayXPUExecutorAsync - executor_class = RayXPUExecutorAsync elif distributed_executor_backend == "mp": from vllm.executor.multiproc_xpu_executor import ( MultiprocessingXPUExecutorAsync) - executor_class = MultiprocessingXPUExecutorAsync else: raise RuntimeError( "Not supported distributed execution model on XPU device.") elif distributed_executor_backend == "ray": from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync - executor_class = RayGPUExecutorAsync elif distributed_executor_backend == "mp": from vllm.executor.multiproc_gpu_executor import ( MultiprocessingGPUExecutorAsync) - executor_class = MultiprocessingGPUExecutorAsync else: from vllm.executor.gpu_executor import GPUExecutorAsync - executor_class = GPUExecutorAsync return executor_class @@ -654,8 +624,8 @@ async def get_tokenizer( self, lora_request: Optional[LoRARequest] = None, ) -> AnyTokenizer: - return await self.engine.get_tokenizer_group( - ).get_lora_tokenizer_async(lora_request) + return await (self.engine.get_tokenizer_group(). + get_lora_tokenizer_async(lora_request)) def start_background_loop(self) -> None: """Start the background loop.""" @@ -746,8 +716,8 @@ async def run_engine_loop(engine_ref: ReferenceType): if not engine: return - pipeline_parallel_size = ( - engine.engine.parallel_config.pipeline_parallel_size) + pipeline_parallel_size = \ + engine.engine.parallel_config.pipeline_parallel_size has_requests_in_progress = [False] * pipeline_parallel_size while True: if not any(has_requests_in_progress): @@ -783,8 +753,7 @@ async def run_engine_loop(engine_ref: ReferenceType): async with asyncio_timeout(ENGINE_ITERATION_TIMEOUT_S): done, _ = await asyncio.wait( requests_in_progress, - return_when=asyncio.FIRST_COMPLETED, - ) + return_when=asyncio.FIRST_COMPLETED) for _ in range(pipeline_parallel_size): await asyncio.sleep(0) for task in done: @@ -818,7 +787,7 @@ async def add_request( arrival_time: Optional[float] = None, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, - prompt_adapter_request: Optional[PromptAdapterRequest] = None, + prompt_adapter_request: Optional[PromptAdapterRequest] = None ) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]: if not self.is_running: if self.start_engine_loop: @@ -838,8 +807,7 @@ async def add_request( arrival_time=arrival_time or time.time(), lora_request=lora_request, trace_headers=trace_headers, - prompt_adapter_request=prompt_adapter_request, - ) + prompt_adapter_request=prompt_adapter_request) return stream.generator() @@ -850,7 +818,7 @@ async def generate( request_id: str, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, - prompt_adapter_request: Optional[PromptAdapterRequest] = None, + prompt_adapter_request: Optional[PromptAdapterRequest] = None ) -> AsyncGenerator[RequestOutput, None]: """Generate outputs for a request. @@ -1030,11 +998,9 @@ def _abort(self, request_id: str) -> None: Args: request_id: The unique id of the request. """ - self._request_tracker.abort_request( - request_id, - exception=asyncio.CancelledError, - verbose=self.log_requests, - ) + self._request_tracker.abort_request(request_id, + exception=asyncio.CancelledError, + verbose=self.log_requests) async def get_model_config(self) -> ModelConfig: """Get the model configuration of the vLLM engine.""" @@ -1057,10 +1023,9 @@ async def get_lora_config(self) -> LoRAConfig: return self.engine.get_lora_config() async def do_log_stats( - self, - scheduler_outputs: Optional[SchedulerOutputs] = None, - model_output: Optional[List[SamplerOutput]] = None, - ) -> None: + self, + scheduler_outputs: Optional[SchedulerOutputs] = None, + model_output: Optional[List[SamplerOutput]] = None) -> None: self.engine.do_log_stats() async def check_health(self) -> None: From c9adc62f0f35f03da25824130a58b753f0157f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Thu, 26 Sep 2024 15:44:52 +0200 Subject: [PATCH 5/7] Add priority to generate and add_request --- vllm/engine/async_llm_engine.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 7cba3a05083a7..d511c2cd48291 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -787,7 +787,8 @@ async def add_request( arrival_time: Optional[float] = None, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, - prompt_adapter_request: Optional[PromptAdapterRequest] = None + prompt_adapter_request: Optional[PromptAdapterRequest] = None, + priority: int = 0, ) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]: if not self.is_running: if self.start_engine_loop: @@ -807,7 +808,9 @@ async def add_request( arrival_time=arrival_time or time.time(), lora_request=lora_request, trace_headers=trace_headers, - prompt_adapter_request=prompt_adapter_request) + prompt_adapter_request=prompt_adapter_request, + priority=priority, + ) return stream.generator() @@ -818,7 +821,8 @@ async def generate( request_id: str, lora_request: Optional[LoRARequest] = None, trace_headers: Optional[Mapping[str, str]] = None, - prompt_adapter_request: Optional[PromptAdapterRequest] = None + prompt_adapter_request: Optional[PromptAdapterRequest] = None, + priority: int = 0, ) -> AsyncGenerator[RequestOutput, None]: """Generate outputs for a request. @@ -836,6 +840,8 @@ async def generate( trace_headers: OpenTelemetry trace headers. prompt_adapter_request: Prompt Adapter request to use for generation, if any. + priority: The priority of the request. + Only applicable with priority scheduling. Yields: The output `RequestOutput` objects from the LLMEngine @@ -891,6 +897,7 @@ async def generate( lora_request=lora_request, trace_headers=trace_headers, prompt_adapter_request=prompt_adapter_request, + priority=priority, ): yield LLMEngine.validate_output(output, RequestOutput) From 2b68c5886f13d049d33c9de6ed8b61513ac17343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Fri, 27 Sep 2024 07:20:51 +0200 Subject: [PATCH 6/7] Fixed error handling --- vllm/engine/async_llm_engine.py | 6 +++++- vllm/engine/llm_engine.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index d511c2cd48291..ef463cafb6282 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -417,7 +417,7 @@ async def add_request_async( if lora_request is not None and not self.lora_config: raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") - if priority > 0 and not self.scheduler_config.policy == "priority": + if priority != 0 and not self.scheduler_config.policy == "priority": raise ValueError(f"Got priority {priority} but " "Priority scheduling is not enabled.") if arrival_time is None: @@ -800,6 +800,10 @@ async def add_request( "error that caused the background loop to stop " "(AsyncEngineDeadError).") + if priority != 0 and not self.scheduler_config.policy == "priority": + raise ValueError(f"Got priority {priority} but " + "Priority scheduling is not enabled.") + stream = self._request_tracker.add_request( request_id, verbose=self.log_requests, diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 768ac69c3692d..06efea19efa8e 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -748,7 +748,7 @@ def add_request( raise ValueError(f"Got lora_request {lora_request} but LoRA is " "not enabled!") - if priority > 0 and not self.scheduler_config.policy == "priority": + if priority != 0 and not self.scheduler_config.policy == "priority": raise ValueError(f"Got priority {priority} but " "Priority scheduling is not enabled.") From 2e3e1852123f2244b609c3f3c40d3eefcbab0fa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Sch=C3=B6nnenbeck?= Date: Fri, 27 Sep 2024 07:34:17 +0200 Subject: [PATCH 7/7] Fixed typo --- vllm/engine/async_llm_engine.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index b63aa3623e315..6e50cc9bc933f 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -880,7 +880,8 @@ async def add_request( "error that caused the background loop to stop " "(AsyncEngineDeadError).") - if priority != 0 and not self.scheduler_config.policy == "priority": + if (priority != 0 + and not self.engine.scheduler_config.policy == "priority"): raise ValueError(f"Got priority {priority} but " "Priority scheduling is not enabled.")