1313# See the License for the specific language governing permissions and
1414# limitations under the License.
1515
16+ import asyncio
17+ import json
1618import logging
19+ import os
1720import socket
1821import sys
22+ import time
1923from typing import Optional
2024
2125from vllm .config import KVTransferConfig
3034DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
3135
3236
33- def find_free_port () -> int :
34- """Find a free port by binding to port 0."""
35- with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as s :
36- s .bind (("" , 0 ))
37- port = s .getsockname ()[1 ]
38- return port
39-
40-
4137class Config :
4238 """Command line parameters or defaults"""
4339
4440 # dynamo specific
4541 namespace : str
4642 component : str
4743 endpoint : str
48- kv_events_port : int
4944 is_prefill_worker : bool
45+ kv_port : Optional [int ] = None
46+ side_channel_port : Optional [int ] = None
5047
5148 # mirror vLLM
5249 model : str
@@ -56,38 +53,6 @@ class Config:
5653 engine_args : AsyncEngineArgs
5754
5855
59- def overwrite_args (config ):
60- defaults = {
61- "task" : "generate" ,
62- "skip_tokenizer_init" : True ,
63- "disable_log_requests" : True ,
64- "enable_prefix_caching" : True ,
65- # KV routing relies on logging KV metrics
66- "disable_log_stats" : False ,
67- # Always set up KV Events for routing
68- "kv_events_config" : KVEventsConfig (
69- enable_kv_cache_events = True ,
70- publisher = "zmq" ,
71- endpoint = f"tcp://*:{ config .kv_events_port } " ,
72- ),
73- # Always setting up kv transfer for disagg
74- "kv_transfer_config" : KVTransferConfig (
75- kv_connector = "NixlConnector" , kv_role = "kv_both"
76- ),
77- }
78-
79- # Made decision to always overwrite.
80- # Respecting users original cmd line args at all costs requires a bunch of arg parse work
81-
82- logger .debug ("Setting Dynamo defaults for vLLM" )
83- for key , value in defaults .items ():
84- if hasattr (config .engine_args , key ):
85- setattr (config .engine_args , key , value )
86- logger .debug (f" engine_args.{ key } = { value } " )
87- else :
88- raise ValueError (f"{ key } not found in AsyncEngineArgs from vLLM." )
89-
90-
9156def parse_args () -> Config :
9257 parser = FlexibleArgumentParser (
9358 description = "vLLM server integrated with Dynamo LLM."
@@ -103,12 +68,6 @@ def parse_args() -> Config:
10368 action = "store_true" ,
10469 help = "Enable prefill functionality for this worker. Currently overwrites the --endpoint to be a specially chosen dyn://dynamo.prefill.generate" ,
10570 )
106- parser .add_argument (
107- "--kv-events-port" ,
108- type = int ,
109- default = find_free_port (),
110- help = "Endpoint where vLLM publishes metrics for dynamo. For DP, we handle the port iteration." ,
111- )
11271
11372 parser = AsyncEngineArgs .add_cli_args (parser )
11473 args = parser .parse_args ()
@@ -143,14 +102,160 @@ def parse_args() -> Config:
143102 config .endpoint = parsed_endpoint_name
144103 config .engine_args = engine_args
145104 config .is_prefill_worker = args .is_prefill_worker
146- config .kv_events_port = args .kv_events_port
147105
148106 if config .engine_args .block_size is None :
149107 config .engine_args .block_size = 16
150108 logger .debug (
151109 f"Setting reasonable default of { config .engine_args .block_size } for block_size"
152110 )
153111
154- overwrite_args (config )
155-
156112 return config
113+
114+
115+ async def allocate_and_reserve_port (
116+ namespace ,
117+ etcd_client ,
118+ worker_id : str ,
119+ reason : str ,
120+ max_attempts : int = 100 ,
121+ ) -> int :
122+ """
123+ Get an OS-assigned port and atomically reserve it in ETCD.
124+ Retries until successful or max_attempts reached.
125+
126+ Args:
127+ max_attempts: Maximum number of ports to try (default: 100)
128+
129+ Raises:
130+ RuntimeError: If unable to reserve a port within max_attempts
131+ OSError: If unable to create sockets (system resource issues)
132+ """
133+
134+ node_name = socket .gethostname ()
135+
136+ for attempt in range (1 , max_attempts + 1 ):
137+ # Hold socket open just long enough to reserve in ETCD
138+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as sock :
139+ sock .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
140+ sock .bind (("" , 0 ))
141+ port = sock .getsockname ()[1 ]
142+
143+ # Reserve in ETCD while holding the socket
144+ key = f"dyn://{ namespace } /ports/{ node_name } /{ port } "
145+ value = {
146+ "worker_id" : worker_id ,
147+ "reason" : reason ,
148+ "reserved_at" : time .time (),
149+ "pid" : os .getpid (),
150+ }
151+
152+ try :
153+ await etcd_client .kv_create (
154+ key = key ,
155+ value = json .dumps (value ).encode (),
156+ lease_id = etcd_client .primary_lease_id (),
157+ )
158+ logger .debug (f"Reserved OS-assigned port { port } for { worker_id } " )
159+ return port
160+
161+ except Exception as e :
162+ logger .debug (
163+ f"Port { port } on { node_name } was already reserved (attempt { attempt } ): { e } "
164+ )
165+
166+ if attempt < max_attempts :
167+ await asyncio .sleep (0.01 )
168+
169+ raise RuntimeError (
170+ f"Failed to allocate and reserve a port after { max_attempts } attempts"
171+ )
172+
173+
174+ async def configure_ports_with_etcd (config : Config , etcd_client ):
175+ """Configure all settings that require ETCD, including port allocation and vLLM overrides."""
176+
177+ # First, allocate ports
178+ dp_rank = config .engine_args .data_parallel_rank or 0
179+ worker_id = f"vllm-{ config .component } -dp{ dp_rank } "
180+
181+ # Allocate KV events port
182+ kv_port = await allocate_and_reserve_port (
183+ namespace = config .namespace ,
184+ etcd_client = etcd_client ,
185+ worker_id = f"{ worker_id } " ,
186+ reason = "zmq_kv_event_port" ,
187+ )
188+
189+ # Allocate side channel port
190+ side_channel_port = await allocate_and_reserve_port (
191+ namespace = config .namespace ,
192+ etcd_client = etcd_client ,
193+ worker_id = f"{ worker_id } " ,
194+ reason = "nixl_side_channel_port" ,
195+ )
196+
197+ # Update config with allocated ports
198+ config .kv_port = kv_port
199+ config .side_channel_port = side_channel_port
200+
201+
202+ def overwrite_args (config ):
203+ """Set vLLM defaults for Dynamo."""
204+ assert (
205+ config .kv_port is not None
206+ ), "Must set the kv_port, use configure_ports_with_etcd"
207+ assert (
208+ config .side_channel_port is not None
209+ ), "Must set the kv_port, use configure_ports_with_etcd"
210+
211+ dp_rank = config .engine_args .data_parallel_rank or 0
212+
213+ defaults = {
214+ "task" : "generate" ,
215+ "skip_tokenizer_init" : True ,
216+ "disable_log_requests" : True ,
217+ "enable_prefix_caching" : True ,
218+ # KV routing relies on logging KV metrics
219+ "disable_log_stats" : False ,
220+ # Always setting up kv transfer for disagg
221+ "kv_transfer_config" : KVTransferConfig (
222+ kv_connector = "NixlConnector" , kv_role = "kv_both"
223+ ),
224+ "kv_events_config" : KVEventsConfig (
225+ enable_kv_cache_events = True ,
226+ publisher = "zmq" ,
227+ endpoint = f"tcp://*:{ config .kv_port - dp_rank } " , # vLLM will iterate dp_rank for us, so we need to subtract it out TODO: fix in vLLM
228+ ),
229+ }
230+
231+ set_side_channel_host_and_port (config )
232+
233+ logger .debug ("Setting Dynamo defaults for vLLM" )
234+ for key , value in defaults .items ():
235+ if hasattr (config .engine_args , key ):
236+ setattr (config .engine_args , key , value )
237+ logger .debug (f" engine_args.{ key } = { value } " )
238+ else :
239+ raise ValueError (f"{ key } not found in AsyncEngineArgs from vLLM." )
240+
241+
242+ def set_side_channel_host_and_port (config : Config , hostname : Optional [str ] = None ):
243+ """vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
244+ This sets the port number for the side channel.
245+ """
246+ if hostname is None :
247+ hostname = socket .gethostname ()
248+ # Test if hostname is usable by attempting to bind to it
249+ try :
250+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as test_socket :
251+ test_socket .bind ((hostname , 0 ))
252+ except (socket .error , socket .gaierror ):
253+ # If hostname is not usable, fall back to localhost
254+ logger .warning (
255+ f"Hostname '{ hostname } ' is not usable, falling back to '127.0.0.1'"
256+ )
257+ hostname = "127.0.0.1"
258+
259+ os .environ ["VLLM_NIXL_SIDE_CHANNEL_HOST" ] = hostname
260+ os .environ ["VLLM_NIXL_SIDE_CHANNEL_PORT" ] = str (config .side_channel_port )
261+ logger .debug (f"Set NIXL side channel to { hostname } :{ config .side_channel_port } " )
0 commit comments