From dc12f820d279e09d0c56b38c504944fd0d8e99e9 Mon Sep 17 00:00:00 2001 From: mgoin Date: Mon, 28 Jul 2025 10:11:16 -0400 Subject: [PATCH 1/3] Add async_llm_streaming.py example for AsyncLLM streaming in python Signed-off-by: mgoin --- .../offline_inference/async_llm_streaming.py | 238 ++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 examples/offline_inference/async_llm_streaming.py diff --git a/examples/offline_inference/async_llm_streaming.py b/examples/offline_inference/async_llm_streaming.py new file mode 100644 index 000000000000..feff5d060a0c --- /dev/null +++ b/examples/offline_inference/async_llm_streaming.py @@ -0,0 +1,238 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Example demonstrating streaming offline inference with AsyncLLM (V1 engine). + +This script shows how to use vLLM's AsyncLLM engine for streaming token-by-token +output in offline inference scenarios. It demonstrates both DELTA mode (new tokens only) +and CUMULATIVE mode (complete output so far). + +Usage: + python examples/offline_inference/async_llm_streaming.py + python examples/offline_inference/async_llm_streaming.py --model meta-llama/Llama-3.2-1B-Instruct + python examples/offline_inference/async_llm_streaming.py --streaming-mode cumulative +""" # noqa: E501 + +import asyncio +import os +import time + +from vllm import SamplingParams +from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.sampling_params import RequestOutputKind +from vllm.utils import FlexibleArgumentParser +from vllm.v1.engine.async_llm import AsyncLLM + + +def create_parser(): + """Create argument parser with AsyncEngineArgs and streaming options.""" + parser = FlexibleArgumentParser(description="AsyncLLM Streaming Inference Example") + + AsyncEngineArgs.add_cli_args(parser) + parser.set_defaults( + model="meta-llama/Llama-3.2-1B-Instruct", + enforce_eager=True, # Faster for examples + ) + + # Add sampling parameters + sampling_group = parser.add_argument_group("Sampling parameters") + sampling_group.add_argument( + "--max-tokens", + type=int, + default=100, + help="Maximum number of tokens to generate", + ) + sampling_group.add_argument( + "--temperature", type=float, default=0.8, help="Sampling temperature" + ) + sampling_group.add_argument( + "--top-p", type=float, default=0.95, help="Top-p (nucleus) sampling" + ) + sampling_group.add_argument("--top-k", type=int, default=-1, help="Top-k sampling") + + # Add streaming options + streaming_group = parser.add_argument_group("Streaming options") + streaming_group.add_argument( + "--streaming-mode", + choices=["delta", "cumulative"], + default="delta", + help="Streaming mode: 'delta' for new tokens only, " + "'cumulative' for complete output so far", + ) + streaming_group.add_argument( + "--show-timing", + action="store_true", + help="Show timing information for each token", + ) + + return parser + + +async def stream_response( + engine: AsyncLLM, + prompt: str, + sampling_params: SamplingParams, + request_id: str, + show_timing: bool = False, +) -> None: + """Stream response from AsyncLLM and display tokens as they arrive.""" + + print(f"\nšŸš€ Prompt: {prompt!r}") + print(f"šŸ“ Streaming mode: {sampling_params.output_kind.name}") + print("šŸ”„ Generating", end="", flush=True) + + if sampling_params.output_kind == RequestOutputKind.DELTA: + print(" (token-by-token):") + print("šŸ’¬ ", end="", flush=True) + else: + print(" (cumulative):") + + start_time = time.time() + token_count = 0 + last_time = start_time + + try: + # Stream tokens from AsyncLLM + async for output in engine.generate( + request_id=request_id, prompt=prompt, sampling_params=sampling_params + ): + current_time = time.time() + + # Process each completion in the output + for completion in output.outputs: + if sampling_params.output_kind == RequestOutputKind.DELTA: + # In DELTA mode, we get only new tokens + new_text = completion.text + if new_text: + print(new_text, end="", flush=True) + token_count += len(completion.token_ids) + + if show_timing: + token_time = current_time - last_time + print(f" [{token_time:.3f}s]", end="", flush=True) + + last_time = current_time + + else: # CUMULATIVE mode + # In CUMULATIVE mode, we get the complete output so far + complete_text = completion.text + token_count = len(completion.token_ids) + + # Clear the line and print the updated text + print(f"\ršŸ’¬ {complete_text}", end="", flush=True) + + if show_timing: + token_time = current_time - last_time + print(f" [{token_time:.3f}s]", end="", flush=True) + last_time = current_time + + # Check if generation is finished + if output.finished: + total_time = current_time - start_time + print( + f"\nāœ… Finished! Generated {token_count}" + f" tokens in {total_time:.2f}s" + ) + + if token_count > 0: + tokens_per_second = token_count / total_time + print(f"āš”ļø Speed: {tokens_per_second:.1f} tokens/second") + break + + except Exception as e: + print(f"\nāŒ Error during streaming: {e}") + raise + + +async def run_streaming_examples(args) -> None: + """Run streaming examples with different prompts and configurations.""" + + # Ensure V1 is enabled + os.environ["VLLM_USE_V1"] = "1" + + # Extract sampling parameters + max_tokens = args.pop("max_tokens", 100) + temperature = args.pop("temperature", 0.8) + top_p = args.pop("top_p", 0.95) + top_k = args.pop("top_k", -1) + streaming_mode = args.pop("streaming_mode", "delta") + show_timing = args.pop("show_timing", False) + + # Determine output kind + output_kind = ( + RequestOutputKind.DELTA + if streaming_mode == "delta" + else RequestOutputKind.CUMULATIVE + ) + + # Create sampling parameters + sampling_params = SamplingParams( + max_tokens=max_tokens, + temperature=temperature, + top_p=top_p, + top_k=top_k, + output_kind=output_kind, + # Use a seed for reproducible results in examples + seed=42, + ) + + print(f"šŸ”§ Initializing AsyncLLM with model: {args.get('model', 'default')}") + + # Create AsyncLLM engine + engine_args = AsyncEngineArgs(**args) + engine = AsyncLLM.from_engine_args(engine_args) + + try: + # Sample prompts for demonstration + prompts = [ + "The future of artificial intelligence is", + "In a galaxy far, far away", + "The key to happiness is", + "Climate change solutions include", + ] + + print(f"šŸŽÆ Running {len(prompts)} streaming examples...") + + # Process each prompt + for i, prompt in enumerate(prompts, 1): + print(f"\n{'=' * 60}") + print(f"Example {i}/{len(prompts)}") + print(f"{'=' * 60}") + + request_id = f"stream-example-{i}" + + await stream_response( + engine=engine, + prompt=prompt, + sampling_params=sampling_params, + request_id=request_id, + show_timing=show_timing, + ) + + # Small delay between examples for better readability + if i < len(prompts): + await asyncio.sleep(1) + + print("\nšŸŽ‰ All examples completed successfully!") + print("šŸ’” Try different streaming modes with --streaming-mode delta|cumulative") + print("šŸ’” Add --show-timing to see per-token timing information") + + finally: + # Clean up the engine + engine.shutdown() + + +def main(): + """Main function.""" + parser = create_parser() + args = vars(parser.parse_args()) + + # Run the async examples + try: + asyncio.run(run_streaming_examples(args)) + except KeyboardInterrupt: + print("\nšŸ›‘ Interrupted by user") + + +if __name__ == "__main__": + main() From a47b3138cbdc7ccb1c7da168f4b8e1df89f90bf8 Mon Sep 17 00:00:00 2001 From: mgoin Date: Mon, 28 Jul 2025 10:33:12 -0400 Subject: [PATCH 2/3] Simplify Signed-off-by: mgoin --- .../offline_inference/async_llm_streaming.py | 211 ++++-------------- 1 file changed, 45 insertions(+), 166 deletions(-) diff --git a/examples/offline_inference/async_llm_streaming.py b/examples/offline_inference/async_llm_streaming.py index feff5d060a0c..87003e18be94 100644 --- a/examples/offline_inference/async_llm_streaming.py +++ b/examples/offline_inference/async_llm_streaming.py @@ -1,142 +1,62 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ -Example demonstrating streaming offline inference with AsyncLLM (V1 engine). +Simple example demonstrating streaming offline inference with AsyncLLM (V1 engine). -This script shows how to use vLLM's AsyncLLM engine for streaming token-by-token -output in offline inference scenarios. It demonstrates both DELTA mode (new tokens only) -and CUMULATIVE mode (complete output so far). +This script shows the core functionality of vLLM's AsyncLLM engine for streaming +token-by-token output in offline inference scenarios. It demonstrates DELTA mode +streaming where you receive new tokens as they are generated. Usage: python examples/offline_inference/async_llm_streaming.py - python examples/offline_inference/async_llm_streaming.py --model meta-llama/Llama-3.2-1B-Instruct - python examples/offline_inference/async_llm_streaming.py --streaming-mode cumulative -""" # noqa: E501 +""" import asyncio import os -import time from vllm import SamplingParams from vllm.engine.arg_utils import AsyncEngineArgs from vllm.sampling_params import RequestOutputKind -from vllm.utils import FlexibleArgumentParser from vllm.v1.engine.async_llm import AsyncLLM -def create_parser(): - """Create argument parser with AsyncEngineArgs and streaming options.""" - parser = FlexibleArgumentParser(description="AsyncLLM Streaming Inference Example") - - AsyncEngineArgs.add_cli_args(parser) - parser.set_defaults( - model="meta-llama/Llama-3.2-1B-Instruct", - enforce_eager=True, # Faster for examples - ) - - # Add sampling parameters - sampling_group = parser.add_argument_group("Sampling parameters") - sampling_group.add_argument( - "--max-tokens", - type=int, - default=100, - help="Maximum number of tokens to generate", - ) - sampling_group.add_argument( - "--temperature", type=float, default=0.8, help="Sampling temperature" - ) - sampling_group.add_argument( - "--top-p", type=float, default=0.95, help="Top-p (nucleus) sampling" - ) - sampling_group.add_argument("--top-k", type=int, default=-1, help="Top-k sampling") - - # Add streaming options - streaming_group = parser.add_argument_group("Streaming options") - streaming_group.add_argument( - "--streaming-mode", - choices=["delta", "cumulative"], - default="delta", - help="Streaming mode: 'delta' for new tokens only, " - "'cumulative' for complete output so far", - ) - streaming_group.add_argument( - "--show-timing", - action="store_true", - help="Show timing information for each token", - ) - - return parser - - -async def stream_response( - engine: AsyncLLM, - prompt: str, - sampling_params: SamplingParams, - request_id: str, - show_timing: bool = False, -) -> None: - """Stream response from AsyncLLM and display tokens as they arrive.""" +async def stream_response(engine: AsyncLLM, prompt: str, request_id: str) -> None: + """ + Stream response from AsyncLLM and display tokens as they arrive. + This function demonstrates the core streaming pattern: + 1. Create SamplingParams with DELTA output kind + 2. Call engine.generate() and iterate over the async generator + 3. Print new tokens as they arrive + 4. Handle the finished flag to know when generation is complete + """ print(f"\nšŸš€ Prompt: {prompt!r}") - print(f"šŸ“ Streaming mode: {sampling_params.output_kind.name}") - print("šŸ”„ Generating", end="", flush=True) + print("šŸ’¬ Response: ", end="", flush=True) - if sampling_params.output_kind == RequestOutputKind.DELTA: - print(" (token-by-token):") - print("šŸ’¬ ", end="", flush=True) - else: - print(" (cumulative):") - - start_time = time.time() - token_count = 0 - last_time = start_time + # Configure sampling parameters for streaming + sampling_params = SamplingParams( + max_tokens=100, + temperature=0.8, + top_p=0.95, + seed=42, # For reproducible results + output_kind=RequestOutputKind.DELTA, # Get only new tokens each iteration + ) try: # Stream tokens from AsyncLLM async for output in engine.generate( request_id=request_id, prompt=prompt, sampling_params=sampling_params ): - current_time = time.time() - # Process each completion in the output for completion in output.outputs: - if sampling_params.output_kind == RequestOutputKind.DELTA: - # In DELTA mode, we get only new tokens - new_text = completion.text - if new_text: - print(new_text, end="", flush=True) - token_count += len(completion.token_ids) - - if show_timing: - token_time = current_time - last_time - print(f" [{token_time:.3f}s]", end="", flush=True) - - last_time = current_time - - else: # CUMULATIVE mode - # In CUMULATIVE mode, we get the complete output so far - complete_text = completion.text - token_count = len(completion.token_ids) - - # Clear the line and print the updated text - print(f"\ršŸ’¬ {complete_text}", end="", flush=True) - - if show_timing: - token_time = current_time - last_time - print(f" [{token_time:.3f}s]", end="", flush=True) - last_time = current_time + # In DELTA mode, we get only new tokens generated since last iteration + new_text = completion.text + if new_text: + print(new_text, end="", flush=True) # Check if generation is finished if output.finished: - total_time = current_time - start_time - print( - f"\nāœ… Finished! Generated {token_count}" - f" tokens in {total_time:.2f}s" - ) - - if token_count > 0: - tokens_per_second = token_count / total_time - print(f"āš”ļø Speed: {tokens_per_second:.1f} tokens/second") + print("\nāœ… Generation complete!") break except Exception as e: @@ -144,51 +64,27 @@ async def stream_response( raise -async def run_streaming_examples(args) -> None: - """Run streaming examples with different prompts and configurations.""" +async def main(): + """Main function demonstrating AsyncLLM streaming.""" - # Ensure V1 is enabled + # Ensure V1 engine is enabled os.environ["VLLM_USE_V1"] = "1" - # Extract sampling parameters - max_tokens = args.pop("max_tokens", 100) - temperature = args.pop("temperature", 0.8) - top_p = args.pop("top_p", 0.95) - top_k = args.pop("top_k", -1) - streaming_mode = args.pop("streaming_mode", "delta") - show_timing = args.pop("show_timing", False) - - # Determine output kind - output_kind = ( - RequestOutputKind.DELTA - if streaming_mode == "delta" - else RequestOutputKind.CUMULATIVE - ) + print("šŸ”§ Initializing AsyncLLM...") - # Create sampling parameters - sampling_params = SamplingParams( - max_tokens=max_tokens, - temperature=temperature, - top_p=top_p, - top_k=top_k, - output_kind=output_kind, - # Use a seed for reproducible results in examples - seed=42, + # Create AsyncLLM engine with simple configuration + engine_args = AsyncEngineArgs( + model="meta-llama/Llama-3.2-1B-Instruct", + enforce_eager=True, # Faster startup for examples ) - - print(f"šŸ”§ Initializing AsyncLLM with model: {args.get('model', 'default')}") - - # Create AsyncLLM engine - engine_args = AsyncEngineArgs(**args) engine = AsyncLLM.from_engine_args(engine_args) try: - # Sample prompts for demonstration + # Example prompts to demonstrate streaming prompts = [ "The future of artificial intelligence is", "In a galaxy far, far away", "The key to happiness is", - "Climate change solutions include", ] print(f"šŸŽÆ Running {len(prompts)} streaming examples...") @@ -200,39 +96,22 @@ async def run_streaming_examples(args) -> None: print(f"{'=' * 60}") request_id = f"stream-example-{i}" + await stream_response(engine, prompt, request_id) - await stream_response( - engine=engine, - prompt=prompt, - sampling_params=sampling_params, - request_id=request_id, - show_timing=show_timing, - ) - - # Small delay between examples for better readability + # Brief pause between examples if i < len(prompts): - await asyncio.sleep(1) + await asyncio.sleep(0.5) - print("\nšŸŽ‰ All examples completed successfully!") - print("šŸ’” Try different streaming modes with --streaming-mode delta|cumulative") - print("šŸ’” Add --show-timing to see per-token timing information") + print("\nšŸŽ‰ All streaming examples completed!") finally: - # Clean up the engine + # Always clean up the engine + print("šŸ”§ Shutting down engine...") engine.shutdown() -def main(): - """Main function.""" - parser = create_parser() - args = vars(parser.parse_args()) - - # Run the async examples +if __name__ == "__main__": try: - asyncio.run(run_streaming_examples(args)) + asyncio.run(main()) except KeyboardInterrupt: print("\nšŸ›‘ Interrupted by user") - - -if __name__ == "__main__": - main() From 69a3e79f3603efc0b7cd39fdc8016f95974a8fd5 Mon Sep 17 00:00:00 2001 From: mgoin Date: Mon, 28 Jul 2025 13:47:29 -0400 Subject: [PATCH 3/3] Remove V1 Signed-off-by: mgoin --- examples/offline_inference/async_llm_streaming.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/examples/offline_inference/async_llm_streaming.py b/examples/offline_inference/async_llm_streaming.py index 87003e18be94..b876d536e3a1 100644 --- a/examples/offline_inference/async_llm_streaming.py +++ b/examples/offline_inference/async_llm_streaming.py @@ -12,7 +12,6 @@ """ import asyncio -import os from vllm import SamplingParams from vllm.engine.arg_utils import AsyncEngineArgs @@ -65,11 +64,6 @@ async def stream_response(engine: AsyncLLM, prompt: str, request_id: str) -> Non async def main(): - """Main function demonstrating AsyncLLM streaming.""" - - # Ensure V1 engine is enabled - os.environ["VLLM_USE_V1"] = "1" - print("šŸ”§ Initializing AsyncLLM...") # Create AsyncLLM engine with simple configuration