Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def __init__(
executor_class=executor_class,
log_stats=self.log_stats,
)

for stat_logger in self.stat_loggers[0]:
stat_logger.log_engine_initialized()
self.output_handler: Optional[asyncio.Task] = None
try:
# Start output handler eagerly if we are in the asyncio eventloop.
Expand Down
9 changes: 8 additions & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# SPDX-License-Identifier: Apache-2.0
import json
import os
import queue
import signal
Expand Down Expand Up @@ -116,6 +117,7 @@ def __init__(self,
logger.info("Batch queue is enabled with size %d",
self.batch_queue_size)
self.batch_queue = queue.Queue(self.batch_queue_size)
self.vllm_config = vllm_config

def _initialize_kv_caches(
self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:
Expand Down Expand Up @@ -507,7 +509,12 @@ def process_input_socket(self, input_path: str, engine_index: int):
bind=False) as socket:

# Send ready message to front-end once input socket is connected.
socket.send(b'READY')
message_dict = {
'type': 'READY',
'num_gpu_blocks': self.vllm_config.cache_config.num_gpu_blocks,
}
message = json.dumps(message_dict).encode('utf-8')
socket.send(message)

while True:
# (RequestType, RequestData)
Expand Down
13 changes: 10 additions & 3 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
import asyncio
import contextlib
import json
import queue
import uuid
import weakref
Expand Down Expand Up @@ -362,6 +363,7 @@ def __init__(
executor_class: type[Executor],
log_stats: bool,
):
self.vllm_config = vllm_config
# Serialization setup.
self.encoder = MsgpackEncoder()
self.decoder = MsgpackDecoder(EngineCoreOutputs)
Expand Down Expand Up @@ -430,14 +432,19 @@ def _wait_for_engine_startup(self):
raise RuntimeError("Engine core initialization failed. "
"See root cause above.")

eng_id_bytes, msg = sync_input_socket.recv_multipart()
eng_id_bytes, data = sync_input_socket.recv_multipart()
eng_id = int.from_bytes(eng_id_bytes, byteorder="little")
if eng_id not in identities:
raise RuntimeError(f"Unexpected or duplicate engine: {eng_id}")
if msg != b'READY':
raise RuntimeError(f"Engine {eng_id} failed: {msg.decode()}")
message_dict = json.loads(data.decode('utf-8'))
if message_dict['type'] != 'READY':
raise RuntimeError(f"Engine {eng_id} failed: {data.decode()}")
logger.info("Core engine process %d ready.", eng_id)
identities.discard(eng_id)
# Setup KV cache config with initialization state from
# engine core process.
self.vllm_config.cache_config.num_gpu_blocks = message_dict[
'num_gpu_blocks']

def _init_core_engines(
self,
Expand Down
23 changes: 17 additions & 6 deletions vllm/v1/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def record(self, scheduler_stats: SchedulerStats,
iteration_stats: Optional[IterationStats]):
...

@abstractmethod
def log_engine_initialized(self):
...

def log(self): # noqa
pass

Expand All @@ -47,6 +51,7 @@ class LoggingStatLogger(StatLoggerBase):

def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
self.engine_index = engine_index
self.vllm_config = vllm_config
self._reset(time.monotonic())
self.last_scheduler_stats = SchedulerStats()
# Prefix cache metrics. This cannot be reset.
Expand Down Expand Up @@ -127,12 +132,19 @@ def log(self):
if scheduler_stats.spec_decoding_stats is not None:
self.spec_decoding_logging.log(log_fn=log_fn)

def log_engine_initialized(self):
logger.info(
"vllm cache_config_info with initialization " \
"after num_gpu_blocks is: %d",
self.vllm_config.cache_config.num_gpu_blocks)


class PrometheusStatLogger(StatLoggerBase):

def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
self._unregister_vllm_metrics()

self.vllm_config = vllm_config
self.engine_index = engine_index
# Use this flag to hide metrics that were deprecated in
# a previous release and which will be removed future
self.show_hidden_metrics = \
Expand Down Expand Up @@ -342,13 +354,9 @@ def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
self.labelname_running_lora_adapters,
])

#
# Cache config info metric
#
self.log_metrics_info("cache_config", vllm_config.cache_config)

def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo):
metrics_info = config_obj.metrics_info()
metrics_info["engine"] = self.engine_index

name, documentation = None, None
if type == "cache_config":
Expand Down Expand Up @@ -442,6 +450,9 @@ def _unregister_vllm_metrics():
if hasattr(collector, "_name") and "vllm" in collector._name:
prometheus_client.REGISTRY.unregister(collector)

def log_engine_initialized(self):
self.log_metrics_info("cache_config", self.vllm_config.cache_config)


def build_buckets(mantissa_lst: list[int], max_value: int) -> list[int]:
"""
Expand Down