Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

There is no way to get bedrock streaming api responses using async io. #3360

Closed
lucasmeijer opened this issue Jun 27, 2024 · 8 comments
Closed
Labels
bug This issue is a bug. module/sdk-custom response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days.

Comments

@lucasmeijer
Copy link

lucasmeijer commented Jun 27, 2024

Describe the bug

       var response = await _client.InvokeModelWithResponseStreamAsync(request);
        
        //there are two ways to get the streaming events.  one is enumeration.
        foreach (var e in response.Body)
        {
        }
        
        //the other is setting up events + StartProcessing():
        response.Body.ChunkReceived += (sender, args) => { /* handle event*/};
        response.Body.StartProcessing();
        //(in this 2nd way there does not seem to be a natural way to know when it's finished?
        
        //both techniques end up in this blocking NetworkStream.Read() call:
        //https://github.com/aws/aws-sdk-net/blob/976e5b161fc1d41b1b8e57e47597c834ba3c40a4/sdk/src/Core/Amazon.Runtime/EventStreams/Internal/EventStream.cs#L310

This is unfortunate. The threads that are supposed to handle new incoming requests for my server, are now blocked on very long running blocking network calls. (AI model responses are easily 30s, (spread out over many events)).

Expected Behavior

I would expect a way that allows me to use async enumeration:

await foreach(var event in response.EnumerateEventsAsync())
{
    //handle event.
}

EnumerateEventsAsync would under the hood not use blocking io, leaving my servers
threads available to do server work, and come back to this network request only when
data is there.

Current Behavior

server threads being blocked.

Reproduction Steps

see code snippet in description

Possible Solution

No response

Additional Information/Context

No response

AWS .NET SDK and/or Package version used

Targeted .NET Platform

net8

Operating System and version

all

@lucasmeijer lucasmeijer added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Jun 27, 2024
@bhoradc bhoradc added module/sdk-custom and removed needs-triage This issue or PR still needs to be triaged. labels Jun 27, 2024
@ashishdhingra
Copy link
Contributor

Needs review with the team.

@normj
Copy link
Member

normj commented Jun 29, 2024

The normal request / response APIs are using async don't the http client but I agree after revisiting the for response streaming that the StartProcessing method does eventually go down to a loop calling Stream.Read on the stream return back for the response and that will block till there is data coming back from Bedrock.

At a minimum we should rework the ProcessLoop to use async / await.

@normj
Copy link
Member

normj commented Jun 29, 2024

I created a PR to add a new StartProcessingAsync method that will use async methods down to the network stream read. #3364

@dscpinheiro
Copy link
Contributor

dscpinheiro commented Jul 15, 2024

@lucasmeijer The PR Norm mentioned was included in the latest version of the AWSSDK.BedrockRuntime package. Would you be able to verify the threads are no longer blocking on network calls?

@dscpinheiro dscpinheiro added the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. label Jul 15, 2024
@lucasmeijer
Copy link
Author

lucasmeijer commented Jul 15, 2024

I'm on #PackageReference Include="AWSSDK.BedrockRuntime" Version="3.7.306" # which still seems to be the most recent version available?

@dscpinheiro
Copy link
Contributor

No, the latest version is https://www.nuget.org/packages/AWSSDK.BedrockRuntime/3.7.307.1

Copy link

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@lucasmeijer
Copy link
Author

@normj @dscpinheiro (tagging you as requested by the bot above).

Good news: the new package works, and allows me to solve my problem.
Bad news: the user experience is not great. This code is from my bedrock adapter for my app. My anthropic, openai, groq adapters do not require me to write this quite tricky to get right code. (I'm not even 100% sure I got it right). would be nice to not push this onto users.

static async IAsyncEnumerable<PayloadPart> PayloadPartsFrom(InvokeModelWithResponseStreamResponse r)
        {
            ConcurrentQueue<PayloadPart> queue = new();

            using SemaphoreSlim semaphore = new(0);
            
            void OnBodyOnChunkReceived(object? sender, EventStreamEventReceivedArgs<PayloadPart> args)
            {
                queue.Enqueue(args.EventStreamEvent);
                semaphore.Release();
            }
            
            r.Body.ChunkReceived += OnBodyOnChunkReceived; 
            var finishTask = r.Body.StartProcessingAsync();

            while (true)
            {
                var t = await Task.WhenAny(semaphore.WaitAsync(), finishTask);
                if (t == finishTask)
                    yield break;
                while (queue.TryDequeue(out var p))
                    yield return p;
            }
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. module/sdk-custom response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days.
Projects
None yet
Development

No branches or pull requests

5 participants