|  | 
|  | 1 | +# SPDX-License-Identifier: Apache-2.0 | 
|  | 2 | +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project | 
|  | 3 | +""" | 
|  | 4 | +Simple example demonstrating streaming offline inference with AsyncLLM (V1 engine). | 
|  | 5 | +
 | 
|  | 6 | +This script shows the core functionality of vLLM's AsyncLLM engine for streaming | 
|  | 7 | +token-by-token output in offline inference scenarios. It demonstrates DELTA mode | 
|  | 8 | +streaming where you receive new tokens as they are generated. | 
|  | 9 | +
 | 
|  | 10 | +Usage: | 
|  | 11 | +    python examples/offline_inference/async_llm_streaming.py | 
|  | 12 | +""" | 
|  | 13 | + | 
|  | 14 | +import asyncio | 
|  | 15 | + | 
|  | 16 | +from vllm import SamplingParams | 
|  | 17 | +from vllm.engine.arg_utils import AsyncEngineArgs | 
|  | 18 | +from vllm.sampling_params import RequestOutputKind | 
|  | 19 | +from vllm.v1.engine.async_llm import AsyncLLM | 
|  | 20 | + | 
|  | 21 | + | 
|  | 22 | +async def stream_response(engine: AsyncLLM, prompt: str, request_id: str) -> None: | 
|  | 23 | +    """ | 
|  | 24 | +    Stream response from AsyncLLM and display tokens as they arrive. | 
|  | 25 | +
 | 
|  | 26 | +    This function demonstrates the core streaming pattern: | 
|  | 27 | +    1. Create SamplingParams with DELTA output kind | 
|  | 28 | +    2. Call engine.generate() and iterate over the async generator | 
|  | 29 | +    3. Print new tokens as they arrive | 
|  | 30 | +    4. Handle the finished flag to know when generation is complete | 
|  | 31 | +    """ | 
|  | 32 | +    print(f"\n🚀 Prompt: {prompt!r}") | 
|  | 33 | +    print("💬 Response: ", end="", flush=True) | 
|  | 34 | + | 
|  | 35 | +    # Configure sampling parameters for streaming | 
|  | 36 | +    sampling_params = SamplingParams( | 
|  | 37 | +        max_tokens=100, | 
|  | 38 | +        temperature=0.8, | 
|  | 39 | +        top_p=0.95, | 
|  | 40 | +        seed=42,  # For reproducible results | 
|  | 41 | +        output_kind=RequestOutputKind.DELTA,  # Get only new tokens each iteration | 
|  | 42 | +    ) | 
|  | 43 | + | 
|  | 44 | +    try: | 
|  | 45 | +        # Stream tokens from AsyncLLM | 
|  | 46 | +        async for output in engine.generate( | 
|  | 47 | +            request_id=request_id, prompt=prompt, sampling_params=sampling_params | 
|  | 48 | +        ): | 
|  | 49 | +            # Process each completion in the output | 
|  | 50 | +            for completion in output.outputs: | 
|  | 51 | +                # In DELTA mode, we get only new tokens generated since last iteration | 
|  | 52 | +                new_text = completion.text | 
|  | 53 | +                if new_text: | 
|  | 54 | +                    print(new_text, end="", flush=True) | 
|  | 55 | + | 
|  | 56 | +            # Check if generation is finished | 
|  | 57 | +            if output.finished: | 
|  | 58 | +                print("\n✅ Generation complete!") | 
|  | 59 | +                break | 
|  | 60 | + | 
|  | 61 | +    except Exception as e: | 
|  | 62 | +        print(f"\n❌ Error during streaming: {e}") | 
|  | 63 | +        raise | 
|  | 64 | + | 
|  | 65 | + | 
|  | 66 | +async def main(): | 
|  | 67 | +    print("🔧 Initializing AsyncLLM...") | 
|  | 68 | + | 
|  | 69 | +    # Create AsyncLLM engine with simple configuration | 
|  | 70 | +    engine_args = AsyncEngineArgs( | 
|  | 71 | +        model="meta-llama/Llama-3.2-1B-Instruct", | 
|  | 72 | +        enforce_eager=True,  # Faster startup for examples | 
|  | 73 | +    ) | 
|  | 74 | +    engine = AsyncLLM.from_engine_args(engine_args) | 
|  | 75 | + | 
|  | 76 | +    try: | 
|  | 77 | +        # Example prompts to demonstrate streaming | 
|  | 78 | +        prompts = [ | 
|  | 79 | +            "The future of artificial intelligence is", | 
|  | 80 | +            "In a galaxy far, far away", | 
|  | 81 | +            "The key to happiness is", | 
|  | 82 | +        ] | 
|  | 83 | + | 
|  | 84 | +        print(f"🎯 Running {len(prompts)} streaming examples...") | 
|  | 85 | + | 
|  | 86 | +        # Process each prompt | 
|  | 87 | +        for i, prompt in enumerate(prompts, 1): | 
|  | 88 | +            print(f"\n{'=' * 60}") | 
|  | 89 | +            print(f"Example {i}/{len(prompts)}") | 
|  | 90 | +            print(f"{'=' * 60}") | 
|  | 91 | + | 
|  | 92 | +            request_id = f"stream-example-{i}" | 
|  | 93 | +            await stream_response(engine, prompt, request_id) | 
|  | 94 | + | 
|  | 95 | +            # Brief pause between examples | 
|  | 96 | +            if i < len(prompts): | 
|  | 97 | +                await asyncio.sleep(0.5) | 
|  | 98 | + | 
|  | 99 | +        print("\n🎉 All streaming examples completed!") | 
|  | 100 | + | 
|  | 101 | +    finally: | 
|  | 102 | +        # Always clean up the engine | 
|  | 103 | +        print("🔧 Shutting down engine...") | 
|  | 104 | +        engine.shutdown() | 
|  | 105 | + | 
|  | 106 | + | 
|  | 107 | +if __name__ == "__main__": | 
|  | 108 | +    try: | 
|  | 109 | +        asyncio.run(main()) | 
|  | 110 | +    except KeyboardInterrupt: | 
|  | 111 | +        print("\n🛑 Interrupted by user") | 
0 commit comments