Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions tests/v1/engine/test_engine_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_engine_core(monkeypatch: pytest.MonkeyPatch):
assert len(engine_core.scheduler.running) == 4

# Loop through until they are all done.
while len(engine_core.step().outputs) > 0:
while len(engine_core.step()[0].outputs) > 0:
pass

assert len(engine_core.scheduler.waiting) == 0
Expand Down Expand Up @@ -163,11 +163,11 @@ def test_engine_core(monkeypatch: pytest.MonkeyPatch):
req0.request_id = req1.request_id = "test"
engine_core.add_request(req0)

while len(engine_core.step().outputs) > 0:
while len(engine_core.step()[0].outputs) > 0:
pass

engine_core.add_request(req1)
while len(engine_core.step().outputs) > 0:
while len(engine_core.step()[0].outputs) > 0:
pass

assert len(engine_core.scheduler.waiting) == 0
Expand Down Expand Up @@ -207,7 +207,7 @@ def _check_engine_state():
assert len(engine_core.scheduler.waiting) == 1
assert len(engine_core.scheduler.running) == 0
# Loop through until they are all done.
while len(engine_core.step().outputs) > 0:
while len(engine_core.step()[0].outputs) > 0:
pass
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 0
Expand Down Expand Up @@ -296,7 +296,7 @@ def shutdown(self):
engine_core.add_request(req1)

# Schedule Batch 1: (10, req0)
assert engine_core.step_with_batch_queue() is None
assert engine_core.step_with_batch_queue()[0] is None
assert engine_core.batch_queue.qsize() == 1
scheduler_output = engine_core.batch_queue.queue[-1][1]
assert scheduler_output.num_scheduled_tokens[0] == 10
Expand All @@ -305,7 +305,7 @@ def shutdown(self):
req0.request_id].num_computed_tokens == 10

# Schedule Batch 2: (2, req0), (8, req1)
assert engine_core.step_with_batch_queue() is None
assert engine_core.step_with_batch_queue()[0] is None
assert engine_core.batch_queue.qsize() == 2
scheduler_output = engine_core.batch_queue.queue[-1][1]
assert scheduler_output.num_scheduled_tokens[0] == 2
Expand All @@ -327,7 +327,7 @@ def shutdown(self):
assert scheduler_output.num_scheduled_tokens[1] == 4

# Batch queue is full. Finish Batch 2. Get first token of req0.
output = engine_core.step_with_batch_queue()
output = engine_core.step_with_batch_queue()[0]
assert output is not None
assert len(output.outputs) == 1
assert engine_core.scheduler.requests[req0.request_id].num_tokens == 13
Expand All @@ -339,7 +339,7 @@ def shutdown(self):
assert scheduler_output.num_scheduled_tokens[0] == 1

# Batch queue is full. Finish Batch 3. Get first token of req1.
output = engine_core.step_with_batch_queue()
output = engine_core.step_with_batch_queue()[0]
assert output is not None
assert len(output.outputs) == 1
assert engine_core.scheduler.requests[req1.request_id].num_tokens == 13
Expand All @@ -358,7 +358,7 @@ def shutdown(self):
engine_core.scheduler.requests[1].num_tokens + 1,
]
while engine_core.scheduler.get_num_unfinished_requests() == 2:
output = engine_core.step_with_batch_queue()
output = engine_core.step_with_batch_queue()[0]
if step % 2 == 0:
# Even steps consumes an output.
assert output is not None
Expand Down
7 changes: 4 additions & 3 deletions vllm/forward_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def get_forward_context() -> ForwardContext:
def set_forward_context(attn_metadata: Any,
vllm_config: VllmConfig,
virtual_engine: int = 0,
num_tokens: int = 0):
num_tokens: Optional[int] = None):
"""A context manager that stores the current forward context,
can be attention metadata, etc.
Here we can inject common logic for every model forward pass.
Expand All @@ -111,9 +111,10 @@ def set_forward_context(attn_metadata: Any,
if need_to_track_batchsize:
forward_start_time = time.perf_counter()
dp_metadata: Optional[DPMetadata] = None
if vllm_config.parallel_config.data_parallel_size > 1:
if vllm_config.parallel_config.data_parallel_size > 1 and (
attn_metadata is not None or num_tokens is not None):
dp_metadata = DPMetadata.make(vllm_config.parallel_config,
attn_metadata, num_tokens)
attn_metadata, num_tokens or 0)

global _forward_context
prev_context = _forward_context
Expand Down
50 changes: 22 additions & 28 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,30 @@ def execute_model(self, scheduler_output: SchedulerOutput):
# Re-raise exception
raise err

def step(self) -> EngineCoreOutputs:
"""Schedule, execute, and make output."""
def step(self) -> tuple[EngineCoreOutputs, bool]:
"""Schedule, execute, and make output.

Returns tuple of outputs and a flag indicating whether the model
was executed.
"""

# Check for any requests remaining in the scheduler - unfinished,
# or finished and not yet removed from the batch.
if not self.scheduler.has_requests():
return EngineCoreOutputs(
outputs=[],
scheduler_stats=self.scheduler.make_stats(),
)
), False
scheduler_output = self.scheduler.schedule()
model_output = self.execute_model(scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output) # type: ignore

return engine_core_outputs
return (engine_core_outputs,
scheduler_output.total_num_scheduled_tokens > 0)

def step_with_batch_queue(self) -> Optional[EngineCoreOutputs]:
def step_with_batch_queue(
self) -> tuple[Optional[EngineCoreOutputs], bool]:
"""Schedule and execute batches with the batch queue.
Note that if nothing to output in this step, None is returned.

Expand Down Expand Up @@ -274,7 +280,7 @@ def step_with_batch_queue(self) -> Optional[EngineCoreOutputs]:
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output)

return engine_core_outputs
return engine_core_outputs, scheduled_batch

def shutdown(self):
self.structured_output_manager.clear_backend()
Expand Down Expand Up @@ -538,15 +544,17 @@ def _process_input_queue(self):
req = self.input_queue.get_nowait()
self._handle_client_request(*req)

def _process_engine_step(self):
def _process_engine_step(self) -> bool:
"""Called only when there are unfinished local requests."""

# Step the engine core.
outputs = self.step_fn()
outputs, model_executed = self.step_fn()
# Put EngineCoreOutputs into the output queue.
if outputs is not None:
self.output_queue.put_nowait(outputs)

return model_executed

def _handle_client_request(self, request_type: EngineCoreRequestType,
request: Any) -> None:
"""Dispatch request from client."""
Expand Down Expand Up @@ -750,30 +758,16 @@ def run_busy_loop(self):
# 1) Poll the input queue until there is work to do.
self._process_input_queue()

# 2) Step the engine core.
executed = self._process_engine_step()
local_unfinished_reqs = self.scheduler.has_unfinished_requests()

if local_unfinished_reqs:
# 2) Step the engine core.
self._process_engine_step()

# Check if we have now finished all requests.
local_unfinished_reqs = (
self.scheduler.has_unfinished_requests())
else:
if self.scheduler.has_finished_requests():
# There are no unfinished requests, but there are some
# finished requests remaining to be removed from the
# batch state. This engine step won't perform a forward
# pass but will flush the finished requests to ensure
# up-to-date state is returned in the engine outputs.
self._process_engine_step()

if not self.engines_running:
if not executed:
if not local_unfinished_reqs and not self.engines_running:
# All engines are idle.
continue

# There must be unfinished requests in DP peers, run a
# dummy forward pass.
# We are in a running state and so must execute a dummy pass
# if the model didn't execute any ready requests.
self.execute_dummy_batch()

# 3) All-reduce operation to determine global unfinished reqs.
Expand Down
3 changes: 2 additions & 1 deletion vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def __init__(self, *args, **kwargs):
self.engine_core = EngineCore(*args, **kwargs)

def get_output(self) -> EngineCoreOutputs:
return self.engine_core.step()
outputs, _ = self.engine_core.step()
return outputs

def add_request(self, request: EngineCoreRequest) -> None:
self.engine_core.add_request(request)
Expand Down
Loading