@@ -205,23 +205,18 @@ def step_with_batch_queue(self) -> Optional[EngineCoreOutputs]:
205205 self .batch_queue .put_nowait (
206206 (future , scheduler_output )) # type: ignore
207207
208- # If all requests are scheduled or the job queue is full,
208+ scheduled_batch = (scheduler_output is not None
209+ and scheduler_output .total_num_scheduled_tokens > 0 )
210+
211+ # If no more requests can be scheduled and the job queue is not empty,
209212 # block until the first batch in the job queue is finished.
210- if (scheduler_output is None
211- or scheduler_output .total_num_scheduled_tokens == 0 ):
212- try :
213- future , scheduler_output = self .batch_queue .get (
214- timeout = POLLING_TIMEOUT_S )
215- # Blocking until the first result is available.
216- model_output = future .result ()
217- self .batch_queue .task_done ()
218- engine_core_outputs = self .scheduler .update_from_output (
219- scheduler_output , model_output )
220- except queue .Empty :
221- # If the queue is empty (timeout at .get), return
222- # an empty EngineCoreOutputs for logging.
223- engine_core_outputs = EngineCoreOutputs (
224- outputs = [], scheduler_stats = self .scheduler .make_stats ())
213+ if not scheduled_batch and not self .batch_queue .empty ():
214+ future , scheduler_output = self .batch_queue .get_nowait ()
215+ # Blocking until the first result is available.
216+ model_output = future .result ()
217+ self .batch_queue .task_done ()
218+ engine_core_outputs = self .scheduler .update_from_output (
219+ scheduler_output , model_output )
225220
226221 return engine_core_outputs
227222
0 commit comments