@@ -81,18 +81,40 @@ async def init(runtime: DistributedRuntime, config: Config):
8181 logging .info (f"Setting up ZMQ kv event publisher at { zmq_ep } " )
8282 kv_publisher = ZmqKvEventPublisher (component = component , config = zmq_config )
8383
84+ # Readiness gate: requests wait until model is registered
85+ ready_event = asyncio .Event ()
86+
87+ async def gated_generate (request ):
88+ """Queue requests until model registration completes"""
89+ await ready_event .wait () # Block until model is ready
90+ async for response in handler .generate (request ):
91+ yield response
92+
8493 handler = DecodeWorkerHandler (
8594 component , engine , config , publisher , kv_publisher , prefill_client
8695 )
8796
88- await register_llm_with_runtime_config (
89- engine , generate_endpoint , server_args , dynamo_args .migration_limit
90- )
97+ async def register_model ():
98+ """Register the model and signal readiness"""
99+ registration_success = await register_llm_with_runtime_config (
100+ engine , generate_endpoint , server_args , dynamo_args .migration_limit
101+ )
102+
103+ if not registration_success :
104+ logging .error ("Model registration failed; shutting down" )
105+ runtime .shutdown ()
106+ raise RuntimeError ("Model registration failed" )
107+
108+ # Model is ready - allow queued requests to proceed
109+ ready_event .set ()
110+ logging .info ("Model registration succeeded; processing queued requests" )
91111
92112 try :
93- # TODO: add in native endpoints
113+ # Start endpoint immediately and register model concurrently
114+ # Requests queue until ready_event is set
94115 await asyncio .gather (
95- generate_endpoint .serve_endpoint (handler .generate , graceful_shutdown = False ),
116+ generate_endpoint .serve_endpoint (gated_generate , graceful_shutdown = False ),
117+ register_model (),
96118 )
97119 except Exception as e :
98120 logging .error (f"Failed to serve endpoints: { e } " )
0 commit comments