diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 1334fb789aa4..cf3b2944bc3a 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -120,7 +120,8 @@ def __init__( executor_class=executor_class, log_stats=self.log_stats, ) - + for stat_logger in self.stat_loggers[0]: + stat_logger.log_engine_initialized() self.output_handler: Optional[asyncio.Task] = None try: # Start output handler eagerly if we are in the asyncio eventloop. diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 80807665e779..5912318f19ff 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1,4 +1,5 @@ # SPDX-License-Identifier: Apache-2.0 +import json import os import queue import signal @@ -116,6 +117,7 @@ def __init__(self, logger.info("Batch queue is enabled with size %d", self.batch_queue_size) self.batch_queue = queue.Queue(self.batch_queue_size) + self.vllm_config = vllm_config def _initialize_kv_caches( self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]: @@ -507,7 +509,12 @@ def process_input_socket(self, input_path: str, engine_index: int): bind=False) as socket: # Send ready message to front-end once input socket is connected. - socket.send(b'READY') + message_dict = { + 'type': 'READY', + 'num_gpu_blocks': self.vllm_config.cache_config.num_gpu_blocks, + } + message = json.dumps(message_dict).encode('utf-8') + socket.send(message) while True: # (RequestType, RequestData) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index dd5190996196..0d5d92f72537 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio import contextlib +import json import queue import uuid import weakref @@ -362,6 +363,7 @@ def __init__( executor_class: type[Executor], log_stats: bool, ): + self.vllm_config = vllm_config # Serialization setup. self.encoder = MsgpackEncoder() self.decoder = MsgpackDecoder(EngineCoreOutputs) @@ -430,14 +432,19 @@ def _wait_for_engine_startup(self): raise RuntimeError("Engine core initialization failed. " "See root cause above.") - eng_id_bytes, msg = sync_input_socket.recv_multipart() + eng_id_bytes, data = sync_input_socket.recv_multipart() eng_id = int.from_bytes(eng_id_bytes, byteorder="little") if eng_id not in identities: raise RuntimeError(f"Unexpected or duplicate engine: {eng_id}") - if msg != b'READY': - raise RuntimeError(f"Engine {eng_id} failed: {msg.decode()}") + message_dict = json.loads(data.decode('utf-8')) + if message_dict['type'] != 'READY': + raise RuntimeError(f"Engine {eng_id} failed: {data.decode()}") logger.info("Core engine process %d ready.", eng_id) identities.discard(eng_id) + # Setup KV cache config with initialization state from + # engine core process. + self.vllm_config.cache_config.num_gpu_blocks = message_dict[ + 'num_gpu_blocks'] def _init_core_engines( self, diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index 7051c681b1a0..9109bdcf42f2 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -39,6 +39,10 @@ def record(self, scheduler_stats: SchedulerStats, iteration_stats: Optional[IterationStats]): ... + @abstractmethod + def log_engine_initialized(self): + ... + def log(self): # noqa pass @@ -47,6 +51,7 @@ class LoggingStatLogger(StatLoggerBase): def __init__(self, vllm_config: VllmConfig, engine_index: int = 0): self.engine_index = engine_index + self.vllm_config = vllm_config self._reset(time.monotonic()) self.last_scheduler_stats = SchedulerStats() # Prefix cache metrics. This cannot be reset. @@ -127,12 +132,19 @@ def log(self): if scheduler_stats.spec_decoding_stats is not None: self.spec_decoding_logging.log(log_fn=log_fn) + def log_engine_initialized(self): + logger.info( + "vllm cache_config_info with initialization " \ + "after num_gpu_blocks is: %d", + self.vllm_config.cache_config.num_gpu_blocks) + class PrometheusStatLogger(StatLoggerBase): def __init__(self, vllm_config: VllmConfig, engine_index: int = 0): self._unregister_vllm_metrics() - + self.vllm_config = vllm_config + self.engine_index = engine_index # Use this flag to hide metrics that were deprecated in # a previous release and which will be removed future self.show_hidden_metrics = \ @@ -342,13 +354,9 @@ def __init__(self, vllm_config: VllmConfig, engine_index: int = 0): self.labelname_running_lora_adapters, ]) - # - # Cache config info metric - # - self.log_metrics_info("cache_config", vllm_config.cache_config) - def log_metrics_info(self, type: str, config_obj: SupportsMetricsInfo): metrics_info = config_obj.metrics_info() + metrics_info["engine"] = self.engine_index name, documentation = None, None if type == "cache_config": @@ -442,6 +450,9 @@ def _unregister_vllm_metrics(): if hasattr(collector, "_name") and "vllm" in collector._name: prometheus_client.REGISTRY.unregister(collector) + def log_engine_initialized(self): + self.log_metrics_info("cache_config", self.vllm_config.cache_config) + def build_buckets(mantissa_lst: list[int], max_value: int) -> list[int]: """