Skip to content

Tracking interruption point during large language model output streaming using FastAPI StreamingResponse #13707

Answered by YuriiMotov
kingmming asked this question in Questions
Discussion options

You must be logged in to vote

Not sure it's the easiest way, but the following code works:

import asyncio

from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse

app = FastAPI()

@app.get("/stream")
async def stream(request: Request):
    async def event_generator(state: dict[str, int | str]):
        try:
            for i in range(20):  # Simulating a long output from a large model
                state["step"] = i
                await asyncio.sleep(0.5)
                yield f"data: Line {i}\n\n"
        except Exception as e:
            print(f"Error occurred: {e}")
        state["finished"] = True

    state = {"step": 0}  # Shared state object

    async def watch_disconnect(re…

Replies: 1 comment 6 replies

Comment options

You must be logged in to vote
6 replies
@YuriiMotov
Comment options

@kingmming
Comment options

@kingmming
Comment options

@YuriiMotov
Comment options

@kingmming
Comment options

Answer selected by kingmming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question or problem
2 participants