Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import Counter as collectionsCounter
from collections import deque
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from functools import partial
from typing import (TYPE_CHECKING, Callable, ClassVar, Deque, Dict, Iterable,
List, Mapping, NamedTuple, Optional)
Expand Down Expand Up @@ -45,7 +45,8 @@
from vllm.pooling_params import PoolingParams
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import RequestOutputKind, SamplingParams
from vllm.sequence import (ExecuteModelRequest, ParallelSampleSequenceGroup,
from vllm.sequence import (ExecuteModelRequest, InbandEngineStats,
ParallelSampleSequenceGroup,
PoolingSequenceGroupOutput, Sequence, SequenceGroup,
SequenceGroupBase, SequenceGroupMetadata,
SequenceGroupOutput, SequenceStatus)
Expand Down Expand Up @@ -222,7 +223,6 @@ def __init__(
"This should not happen. As a workaround, try using "
"LLMEngine.from_vllm_config(...) or explicitly set "
"VLLM_USE_V1=0 or 1 and report this issue on Github.")

self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
Expand All @@ -237,7 +237,6 @@ def __init__(
self.prompt_adapter_config = vllm_config.prompt_adapter_config # noqa
self.observability_config = vllm_config.observability_config or ObservabilityConfig( # noqa
)

logger.info(
"Initializing a V0 LLM engine (v%s) with config: %s, "
"use_cached_outputs=%s, ",
Expand Down Expand Up @@ -1013,6 +1012,14 @@ def _update_num_computed_tokens_for_multi_step_prefill(
seq_group.update_num_computed_tokens(
seq_group_meta.token_chunk_size)

def _get_inband_engine_stats(self, cur_stats: Stats):
stats_dict = asdict(cur_stats)
inband_fields = {
field_name: stats_dict[field_name]
for field_name in InbandEngineStats.__annotations__
}
return InbandEngineStats(**inband_fields)

def _process_model_outputs(self,
ctx: SchedulerContext,
request_id: Optional[str] = None) -> None:
Expand Down Expand Up @@ -1156,6 +1163,10 @@ def _process_model_outputs(self,
seq_group.maybe_set_first_token_time(now)
if not seq_group.is_prefill():
seq_group.set_last_token_time(now)
stats_snapshot = self._get_stats(scheduler_outputs, outputs,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create a function to include get_stats then set inband_stats logic, since it repeats.

finished_before, skip)
inband_stats = self._get_inband_engine_stats(stats_snapshot)
seq_group.set_inband_engine_stats(inband_stats)
request_output = RequestOutputFactory.create(
seq_group,
self.seq_id_to_seq_group,
Expand Down Expand Up @@ -1199,6 +1210,10 @@ def _process_model_outputs(self,
seq_group = scheduled_seq_group.seq_group
seq_group.maybe_set_first_token_time(now)
if not seq_group.is_prefill():
stats_snapshot = self._get_stats(scheduler_outputs, outputs,
finished_before, skip)
inband_stats = self._get_inband_engine_stats(stats_snapshot)
seq_group.set_inband_engine_stats(inband_stats)
seq_group.set_last_token_time(now)
request_output = RequestOutputFactory.create(
seq_group,
Expand Down
24 changes: 18 additions & 6 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.cli_args import (make_arg_parser,
validate_parsed_serve_args)
from vllm.entrypoints.openai.orca_metrics import metrics_header
# yapf conflicts with isort for this block
# yapf: disable
from vllm.entrypoints.openai.protocol import (ChatCompletionRequest,
ChatCompletionResponse,
CompletionRequest,
CompletionResponse,
DetokenizeRequest,
DetokenizeResponse,
EmbeddingChatRequest,
Expand Down Expand Up @@ -99,6 +98,8 @@
# Cannot use __name__ (https://github.com/vllm-project/vllm/pull/4765)
logger = init_logger('vllm.entrypoints.openai.api_server')

ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL = "endpoint-load-metrics-format"

_running_tasks: set[asyncio.Task] = set()


Expand Down Expand Up @@ -455,6 +456,8 @@ async def show_version():
@load_aware_call
async def create_chat_completion(request: ChatCompletionRequest,
raw_request: Request):
metrics_header_format = raw_request.headers.get(
ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "")
handler = chat(raw_request)
if handler is None:
return base(raw_request).create_error_response(
Expand All @@ -466,8 +469,11 @@ async def create_chat_completion(request: ChatCompletionRequest,
return JSONResponse(content=generator.model_dump(),
status_code=generator.code)

elif isinstance(generator, ChatCompletionResponse):
return JSONResponse(content=generator.model_dump())
# Tuple[ChatCompletionResponse,Optional[InbandEngineStats]]
elif isinstance(generator, tuple):
return JSONResponse(content=generator[0].model_dump(),
headers=metrics_header(generator[1],
metrics_header_format))

return StreamingResponse(content=generator, media_type="text/event-stream")

Expand All @@ -476,6 +482,8 @@ async def create_chat_completion(request: ChatCompletionRequest,
@with_cancellation
@load_aware_call
async def create_completion(request: CompletionRequest, raw_request: Request):
metrics_header_format = raw_request.headers.get(
ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "")
handler = completion(raw_request)
if handler is None:
return base(raw_request).create_error_response(
Expand All @@ -485,8 +493,12 @@ async def create_completion(request: CompletionRequest, raw_request: Request):
if isinstance(generator, ErrorResponse):
return JSONResponse(content=generator.model_dump(),
status_code=generator.code)
elif isinstance(generator, CompletionResponse):
return JSONResponse(content=generator.model_dump())

# Tuple[ChatCompletionResponse,Optional[InbandEngineStats]]
elif isinstance(generator, tuple):
return JSONResponse(content=generator[0].model_dump(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check len(generator) == 2, and the correpsonding type? Maybe create a helper function.

headers=metrics_header(generator[1],
metrics_header_format))

return StreamingResponse(content=generator, media_type="text/event-stream")

Expand Down
85 changes: 85 additions & 0 deletions vllm/entrypoints/openai/orca_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# SPDX-License-Identifier: Apache-2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some unittest?

"""
This file contains the command line arguments for the vLLM's
OpenAI-compatible server. It is kept in a separate file for documentation
purposes.
"""

import json
from collections.abc import Mapping
from typing import Optional

from vllm.logger import init_logger
from vllm.sequence import InbandEngineStats

logger = init_logger(__name__)


def create_orca_header(metrics_format: str,
named_metrics: list[tuple[str, float]],
metadata_fields=None) -> Optional[Mapping[str, str]]:
"""
Creates ORCA headers named 'endpoint-load-metrics' in the specified format
and adds custom metrics to named_metrics.
ORCA headers format description: https://docs.google.com/document/d/1C1ybMmDKJIVlrbOLbywhu9iRYo4rilR-cT50OTtOFTs/edit?tab=t.0
ORCA proto https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto
Parameters:
- format (str): The format of the header ('BIN', 'TEXT', 'JSON').
- named_metrics (List[Tuple[str, float]]): List of tuples with metric names
and their corresponding double values.
- metadata_fields (list): List of additional metadata fields
(currently unsupported).
Returns:
- Optional[Mapping[str,str]]: A dictionary with header key as
'endpoint-load-metrics' and values as the ORCA header strings with
format prefix and data in with named_metrics in.
"""

if metadata_fields:
logger.warning("Warning: `metadata_fields` are not supported in the"
"ORCA response header yet.")

if metrics_format.lower() not in ["text", "json"]:
logger.warning(
"Warning: `%s` format is not supported in the ORCA response header",
format,
)
return None

header = {}
orca_report = {
"named_metrics": {
metric_name: value
for metric_name, value in named_metrics
if isinstance(metric_name, str) and isinstance(value, float)
}
}
# output example:
# endpoint-load-metrics: TEXT named_metrics.kv_cache_utilization=0.4
if metrics_format.lower() == "text":
native_http_header = ", ".join([
f"named_metrics.{metric_name}={value}"
for metric_name, value in named_metrics
if isinstance(metric_name, str) and isinstance(value, float)
])
header["endpoint-load-metrics"] = f"TEXT {native_http_header}"

# output example:
# endpoint-load-metrics: JSON “named_metrics”: {“custom-metric-util”: 0.4}
elif metrics_format.lower() == "json":
header["endpoint-load-metrics"] = f"JSON {json.dumps(orca_report)}"

return header


def metrics_header(m: Optional[InbandEngineStats],
metrics_format: str) -> Optional[Mapping[str, str]]:
if not m or not metrics_format:
return None
named_metrics: list[tuple[str, float]] = []
for metric, val in vars(m).items():
if isinstance(val, float) and metric != "now":
named_metrics.append((str(metric), float(val)))
return create_orca_header(metrics_format, named_metrics)
12 changes: 6 additions & 6 deletions vllm/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from vllm.logger import init_logger
from vllm.outputs import CompletionOutput, RequestOutput
from vllm.sampling_params import BeamSearchParams, SamplingParams
from vllm.sequence import Logprob
from vllm.sequence import InbandEngineStats, Logprob
from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer
from vllm.transformers_utils.tokenizers import (maybe_serialize_tool_calls,
truncate_tool_call_ids)
Expand Down Expand Up @@ -119,7 +119,8 @@ async def create_chat_completion(
self,
request: ChatCompletionRequest,
raw_request: Optional[Request] = None,
) -> Union[AsyncGenerator[str, None], ChatCompletionResponse,
) -> Union[AsyncGenerator[str, None], tuple[ChatCompletionResponse,
Optional[InbandEngineStats]],
ErrorResponse]:
"""
Chat Completion API similar to OpenAI's API.
Expand Down Expand Up @@ -769,11 +770,11 @@ async def chat_completion_full_generator(
conversation: list[ConversationMessage],
tokenizer: AnyTokenizer,
request_metadata: RequestResponseMetadata,
) -> Union[ErrorResponse, ChatCompletionResponse]:
) -> Union[ErrorResponse, tuple[ChatCompletionResponse,
Optional[InbandEngineStats]]]:

created_time = int(time.time())
final_res: Optional[RequestOutput] = None

try:
async for res in result_generator:
final_res = res
Expand All @@ -786,7 +787,6 @@ async def chat_completion_full_generator(
assert final_res is not None

choices: list[ChatCompletionResponseChoice] = []

role = self.get_chat_request_role(request)
for output in final_res.outputs:
token_ids = output.token_ids
Expand Down Expand Up @@ -949,7 +949,7 @@ async def chat_completion_full_generator(
choices=choices,
usage=usage,
prompt_logprobs=clamp_prompt_logprobs(final_res.prompt_logprobs),
)
), final_res.inband_engine_stats

return response

Expand Down
37 changes: 22 additions & 15 deletions vllm/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from vllm.logger import init_logger
from vllm.outputs import RequestOutput
from vllm.sampling_params import BeamSearchParams, SamplingParams
from vllm.sequence import Logprob
from vllm.sequence import InbandEngineStats, Logprob
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.utils import merge_async_iterators

Expand Down Expand Up @@ -65,7 +65,8 @@ async def create_completion(
self,
request: CompletionRequest,
raw_request: Optional[Request] = None,
) -> Union[AsyncGenerator[str, None], CompletionResponse, ErrorResponse]:
) -> Union[AsyncGenerator[str, None], tuple[
CompletionResponse, Optional[InbandEngineStats]], ErrorResponse]:
"""Completion API similar to OpenAI's API.

See https://platform.openai.com/docs/api-reference/completions/create
Expand Down Expand Up @@ -216,15 +217,16 @@ async def create_completion(
final_res_batch_checked = cast(list[RequestOutput],
final_res_batch)

response = self.request_output_to_completion_response(
final_res_batch_checked,
request,
request_id,
created_time,
model_name,
tokenizer,
request_metadata,
)
response, inband_engine_stats = (
self.request_output_to_completion_response(
final_res_batch_checked,
request,
request_id,
created_time,
model_name,
tokenizer,
request_metadata,
))
except asyncio.CancelledError:
return self.create_error_response("Client disconnected")
except ValueError as e:
Expand All @@ -242,7 +244,7 @@ async def fake_stream_generator() -> AsyncGenerator[str, None]:

return fake_stream_generator()

return response
return response, inband_engine_stats

async def completion_stream_generator(
self,
Expand Down Expand Up @@ -400,16 +402,21 @@ def request_output_to_completion_response(
model_name: str,
tokenizer: AnyTokenizer,
request_metadata: RequestResponseMetadata,
) -> CompletionResponse:
) -> tuple[CompletionResponse, Optional[InbandEngineStats]]:
choices: list[CompletionResponseChoice] = []
num_prompt_tokens = 0
num_generated_tokens = 0

# choose latest stats from the batch of request outputs
latest_engine_stats: Optional[InbandEngineStats] = None
for final_res in final_res_batch:
prompt_token_ids = final_res.prompt_token_ids
assert prompt_token_ids is not None
prompt_logprobs = clamp_prompt_logprobs(final_res.prompt_logprobs)
prompt_text = final_res.prompt
if not latest_engine_stats or (
final_res.inband_engine_stats and latest_engine_stats.now
< final_res.inband_engine_stats.now):
latest_engine_stats = final_res.inband_engine_stats

token_ids: GenericSequence[int]
out_logprobs: Optional[GenericSequence[Optional[dict[int,
Expand Down Expand Up @@ -482,7 +489,7 @@ def request_output_to_completion_response(
model=model_name,
choices=choices,
usage=usage,
)
), latest_engine_stats

def _create_completion_logprobs(
self,
Expand Down
Loading