@@ -50,28 +50,34 @@ async def generate_tokens(self, prompt, sampling_params, request_id):
5050 gen = self .engine_client .generate (prompt , sampling_params , request_id )
5151
5252 num_output_tokens_so_far = 0
53- async for res in gen :
54- # res is vllm's RequestOutput
55-
56- # This is the expected way for a request to end.
57- # The new token ID will be eos, don't forward it.
58- if res .finished :
59- yield {"finish_reason" : "stop" , "token_ids" : []}
60- break
61-
62- if not res .outputs :
63- yield {"finish_reason" : "error" , "token_ids" : []}
64- break
65-
66- output = res .outputs [0 ]
67- next_total_toks = len (output .token_ids )
68- out = {"token_ids" : output .token_ids [num_output_tokens_so_far :]}
69- if output .finish_reason :
70- out ["finish_reason" ] = output .finish_reason
71- if output .stop_reason :
72- out ["stop_reason" ] = output .stop_reason
73- yield out
74- num_output_tokens_so_far = next_total_toks
53+ try :
54+ async for res in gen :
55+ # res is vllm's RequestOutput
56+
57+ # This is the expected way for a request to end.
58+ # The new token ID will be eos, don't forward it.
59+ if res .finished :
60+ yield {"finish_reason" : "stop" , "token_ids" : []}
61+ break
62+
63+ if not res .outputs :
64+ yield {"finish_reason" : "error" , "token_ids" : []}
65+ break
66+
67+ output = res .outputs [0 ]
68+ next_total_toks = len (output .token_ids )
69+ out = {"token_ids" : output .token_ids [num_output_tokens_so_far :]}
70+ if output .finish_reason :
71+ out ["finish_reason" ] = output .finish_reason
72+ if output .stop_reason :
73+ out ["stop_reason" ] = output .stop_reason
74+ yield out
75+ num_output_tokens_so_far = next_total_toks
76+ except asyncio .CancelledError :
77+ # raise EngineShGeneratorExit when engine exits so that frontend can migrate the request
78+ raise GeneratorExit (
79+ "Decode engine was shut down during token generation"
80+ ) from None
7581
7682
7783class DecodeWorkerHandler (BaseWorkerHandler ):
@@ -173,15 +179,21 @@ async def generate(self, request):
173179 gen = self .engine_client .generate (prompt , sampling_params , request_id )
174180
175181 # Generate only 1 token in prefill
176- async for res in gen :
177- logger .debug (f"kv transfer params: { res .kv_transfer_params } " )
178- yield MyRequestOutput (
179- request_id = res .request_id ,
180- prompt = res .prompt ,
181- prompt_token_ids = res .prompt_token_ids ,
182- prompt_logprobs = res .prompt_logprobs ,
183- outputs = res .outputs ,
184- finished = res .finished ,
185- metrics = res .metrics ,
186- kv_transfer_params = res .kv_transfer_params ,
187- ).model_dump_json ()
182+ try :
183+ async for res in gen :
184+ logger .debug (f"kv transfer params: { res .kv_transfer_params } " )
185+ yield MyRequestOutput (
186+ request_id = res .request_id ,
187+ prompt = res .prompt ,
188+ prompt_token_ids = res .prompt_token_ids ,
189+ prompt_logprobs = res .prompt_logprobs ,
190+ outputs = res .outputs ,
191+ finished = res .finished ,
192+ metrics = res .metrics ,
193+ kv_transfer_params = res .kv_transfer_params ,
194+ ).model_dump_json ()
195+ except asyncio .CancelledError :
196+ # raise the error because we cannot migrate prefill requests
197+ raise GeneratorExit (
198+ "Prefill engine was shut down during token generation"
199+ ) from None
0 commit comments