Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions tests/v1/engine/test_output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def test_incremental_detokenization(request_output_kind: RequestOutputKind,
]

# Add requests to the detokenizer.
for request, prompt in zip(requests, dummy_test_vectors.prompt_strings):
output_processor.add_request(request, prompt)
for request in requests:
output_processor.add_request(request)

gen_strings = {}
gen_tokens = {}
Expand Down Expand Up @@ -418,8 +418,8 @@ def test_logprobs_processor(request_output_kind: RequestOutputKind,
]

# Add requests to the detokenizer.
for request, prompt in zip(requests, dummy_test_vectors.prompt_strings):
output_processor.add_request(request, prompt)
for request in requests:
output_processor.add_request(request)

gen_tokens = {}
gen_logprobs = {}
Expand Down Expand Up @@ -546,7 +546,6 @@ def test_stop_token(include_stop_str_in_output: bool,
generation_logprobs = (
dummy_test_vectors.generation_logprobs[0] +
2 * [dummy_test_vectors.generation_logprobs[0][-1]])
prompt_string = dummy_test_vectors.prompt_strings[0]
prompt_tokens = dummy_test_vectors.prompt_tokens[0]
engine_core = MockEngineCore(
tokens_list=[generation_tokens],
Expand Down Expand Up @@ -581,7 +580,7 @@ def test_stop_token(include_stop_str_in_output: bool,
))

# Add request to the detokenizer.
output_processor.add_request(request, prompt_string)
output_processor.add_request(request)

# Loop over engine core steps; run output processor
gen_string = ""
Expand Down Expand Up @@ -678,8 +677,8 @@ def test_stop_string(include_stop_str_in_output: bool,
]

# Add requests to the detokenizer.
for request, prompt in zip(requests, dummy_test_vectors.prompt_strings):
output_processor.add_request(request, prompt)
for request in requests:
output_processor.add_request(request)

gen_strings = {}
gen_tokens = {}
Expand Down Expand Up @@ -786,7 +785,7 @@ def test_iteration_stats(dummy_test_vectors):
# Add all requests except one to the OutputProcessor.
num_active = len(dummy_test_vectors.generation_tokens) - 1
for request in requests[:num_active]:
output_processor.add_request(request, None)
output_processor.add_request(request)
inactive_request = requests[num_active]

# First iteration has 2 prefills.
Expand All @@ -812,7 +811,7 @@ def test_iteration_stats(dummy_test_vectors):
assert iteration_stats.num_generation_tokens == num_active

# Add a new request - prefill and 2 decodes in this step.
output_processor.add_request(inactive_request, None)
output_processor.add_request(inactive_request)
num_active += 1
outputs = engine_core.get_outputs()[:num_active]
iteration_stats = IterationStats()
Expand Down
19 changes: 9 additions & 10 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,15 @@ async def add_request(
queue = RequestOutputCollector(output_kind=params.output_kind)

# Convert Input --> Request.
prompt_str, request = self.processor.process_inputs(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@njhill Yes, I agree with you that the best approach would be to parse the prompt string within this function, However, after examining the function, I couldn't find any logic that converts the prompt token ids into a prompt string, Do you have any better ideas?

request_id, prompt, params, arrival_time, lora_request,
tokenization_kwargs, trace_headers, prompt_adapter_request,
priority)
request = self.processor.process_inputs(request_id, prompt, params,
arrival_time, lora_request,
tokenization_kwargs,
trace_headers,
prompt_adapter_request,
priority)

if params.n == 1:
await self._add_request(request, prompt_str, None, 0, queue)
await self._add_request(request, None, 0, queue)
return queue

# Fan out child requests (for n>1).
Expand All @@ -241,18 +243,15 @@ async def add_request(
child_request = request if idx == params.n - 1 else copy(request)
child_request.request_id = request_id
child_request.sampling_params = params
await self._add_request(child_request, prompt_str, parent_request,
idx, queue)
await self._add_request(child_request, parent_request, idx, queue)
return queue

async def _add_request(self, request: EngineCoreRequest,
prompt: Optional[str],
parent_req: Optional[ParentRequest], index: int,
queue: RequestOutputCollector):

# Add the request to OutputProcessor (this process).
self.output_processor.add_request(request, prompt, parent_req, index,
queue)
self.output_processor.add_request(request, parent_req, index, queue)

# Add the EngineCoreRequest to EngineCore (separate process).
await self.engine_core.add_request_async(request)
Expand Down
3 changes: 3 additions & 0 deletions vllm/v1/engine/detokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def update(self, new_token_ids: list[int],
def get_next_output_text(self, finished: bool, delta: bool) -> str:
return ""

def decode_next(self, next_token_id: int) -> str:
raise NotImplementedError

@classmethod
def from_new_request(
cls,
Expand Down
15 changes: 8 additions & 7 deletions vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,18 @@ def add_request(
priority: int = 0,
) -> None:
# Process raw inputs into the request.
prompt_str, request = self.processor.process_inputs(
request_id, prompt, params, arrival_time, lora_request,
tokenization_kwargs, trace_headers, prompt_adapter_request,
priority)
request = self.processor.process_inputs(request_id, prompt, params,
arrival_time, lora_request,
tokenization_kwargs,
trace_headers,
prompt_adapter_request,
priority)

n = params.n if isinstance(params, SamplingParams) else 1

if n == 1:
# Make a new RequestState and queue.
self.output_processor.add_request(request, prompt_str, None, 0)
self.output_processor.add_request(request, None, 0)
# Add the request to EngineCore.
self.engine_core.add_request(request)
return
Expand All @@ -215,8 +217,7 @@ def add_request(
child_request.sampling_params = params

# Make a new RequestState and queue.
self.output_processor.add_request(child_request, prompt_str,
parent_req, idx)
self.output_processor.add_request(child_request, parent_req, idx)
# Add the request to EngineCore.
self.engine_core.add_request(child_request)

Expand Down
24 changes: 16 additions & 8 deletions vllm/v1/engine/output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,41 @@ def from_new_request(
cls,
tokenizer: AnyTokenizer,
request: EngineCoreRequest,
prompt: Optional[str],
parent_req: Optional[ParentRequest],
request_index: int,
queue: Optional[RequestOutputCollector],
log_stats: bool,
) -> "RequestState":
if not request.sampling_params.detokenize:
tokenizer = None

detokenizer = IncrementalDetokenizer.from_new_request(
tokenizer=tokenizer,
request=request,
)

# When generating the requestState, the prompt_text must
# be preserved because this value needs to be written
# back to RequestOutputs upon returning. Here, we decode
# the prompt_token_ids of the request into prompt_text.
prompt_text = ""
for token_id in request.prompt_token_ids:
prompt_text += detokenizer.decode_next(token_id)

return cls(
request_id=request.request_id,
parent_req=parent_req,
request_index=request_index,
lora_name=(request.lora_request.name
if request.lora_request is not None else None),
output_kind=request.sampling_params.output_kind,
prompt=prompt,
prompt=prompt_text,
prompt_token_ids=request.prompt_token_ids,
logprobs_processor=LogprobsProcessor.from_new_request(
tokenizer=tokenizer,
request=request,
),
detokenizer=IncrementalDetokenizer.from_new_request(
tokenizer=tokenizer,
request=request,
),
detokenizer=detokenizer,
max_tokens_param=(request.sampling_params.max_tokens if
request.sampling_params is not None else None),
arrival_time=request.arrival_time,
Expand Down Expand Up @@ -275,7 +285,6 @@ def abort_requests(
def add_request(
self,
request: EngineCoreRequest,
prompt: Optional[str],
parent_req: Optional[ParentRequest] = None,
request_index: int = 0,
queue: Optional[RequestOutputCollector] = None,
Expand All @@ -287,7 +296,6 @@ def add_request(
req_state = RequestState.from_new_request(
tokenizer=self.tokenizer.get_lora_tokenizer(request.lora_request),
request=request,
prompt=prompt,
parent_req=parent_req,
request_index=request_index,
queue=queue,
Expand Down
4 changes: 2 additions & 2 deletions vllm/v1/engine/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def process_inputs(
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> tuple[Optional[str], EngineCoreRequest]:
) -> EngineCoreRequest:

# TODO(woosuk): Support pooling models.
# TODO(woosuk): Support encoder-decoder models.
Expand Down Expand Up @@ -316,7 +316,7 @@ def process_inputs(
else:
sorted_mm_inputs = orig_sorted_mm_inputs

return decoder_inputs.get("prompt"), EngineCoreRequest(
return EngineCoreRequest(
request_id=request_id,
prompt_token_ids=decoder_inputs["prompt_token_ids"],
mm_inputs=sorted_mm_inputs,
Expand Down
Loading