Skip to content

Conversation

alanprot
Copy link

@alanprot alanprot commented Aug 11, 2025

Description

Hi,

After upgrading go-grpc, we started see the following error:

stream terminated by RST_STREAM with error code: REFUSED_STREAM

Upon investigation, it appears that in some scenarios, we fail to properly delete streams from the activeStreams map, resulting in this error being wrongly returned.

I added a test case that seems to trigger the problem:

PS: To test the behavior i added a "ActiveStreamTracker" hook -> im not sure this is the best way but i wanted just to show the problem

end2end_test.go:4015: leak streams:  5

The issue seems to have been introduced in the following PR: #8071

Specifically, the check added here:

if oldState == streamDone {
return
}

If this conditional is removed, all streams are correctly cleaned up from the activeStreams map, and the test passes without leaks.

@alanprot
Copy link
Author

cc @arjan-bal @dfawley

@alanprot alanprot changed the title Failing to delete the stream from the activeStreams map leading to "REFUSED_STREAM" errors. Failing to delete the stream from the activeStreams map leading to REFUSED_STREAM errors. Aug 11, 2025
@arjan-bal
Copy link
Contributor

arjan-bal commented Aug 12, 2025

Hi @alanprot, thanks for sharing the test to repro. I haven't debugged the test, but wanted to ask if you know why the check added causes stream deletion to be skipped? From a look quick look at the places streamDone is being set, I found only two:

oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
return
}
hdr.cleanup = &cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {
t.deleteStream(s, eosReceived)

func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
oldState := s.swapState(streamDone)
if oldState == streamDone {
return
}
t.deleteStream(s, eosReceived)

In both places, deleteStream is called eventually. Do you know how the code ends up in a situation where the stream is marked as done, but the it's not deleted from the map?

Edit: It seems like the onWrite callback may not be getting executed.

@arjan-bal arjan-bal self-assigned this Aug 12, 2025
@alanprot
Copy link
Author

alanprot commented Aug 12, 2025

Hi @arjan-bal

As far i could see the problem is we add set the clean and add the hdr on the controlbuff at

hdr.cleanup = &cleanupStream{

And we get to:

if str.state != empty { // either active or waiting on stream quota.
// add it str's list of items.
str.itl.enqueue(h)
return nil
}

And at this point, the stream is not on the activeStreams list, so processData returns here:

str := l.activeStreams.dequeue() // Remove the first stream.
if str == nil {
return true, nil
}

And i think this stream never become added back on the activeStreams list, so we never clean it up.

So it seems that it only happens when the ServerSide is sending tons of data and waiting for quota?

@arjan-bal
Copy link
Contributor

Thank you @alanprot, I think I understand the sequence of events leading to this.

  1. The server stream exhausts the stream quota, and starts buffering data frames.
  2. The server attempts to finish the RPC gracefully by sending trailers by calling finishStream.
  3. The write of the header frame gets blocked due to the pending data frames that are awaiting flow control quota.
  4. Once the deadline expires, the server's deadline monitoring timer tries to call closeStream, which is a no-op since the stream state is already marked streamDone by finishStream in point 1.
  5. The client sends and RST stream, the server calls closeStream which is again a no-op.

The simplest fix is to remove the guard block added in closeStream in #8071. We may end up pushing redundant cleanupStream events on to the controlbuf, but it doesn't cause correctness issues since cleanupStreamHandler returns early in such cases.

@dfawley what do you think?

@dfawley
Copy link
Member

dfawley commented Aug 13, 2025

The simplest fix is to remove the guard block added in closeStream in #8071. We may end up pushing redundant cleanupStream events on to the controlbuf, but it doesn't cause correctness issues since cleanupStreamHandler returns early in such cases.

That makes sense to me. I can't think of any other way to do this. Does the transport properly handle the rest of this situation, especially: dropping the queued data frames when the RST_STREAM is sent (or shortly thereafter)?

@dfawley dfawley removed their assignment Aug 13, 2025
@alanprot
Copy link
Author

Also maybe we wanna consider decrementing this metric only if the stream was present on the activeStreams map?

t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
}
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
}

@arjan-bal
Copy link
Contributor

That makes sense to me. I can't think of any other way to do this. Does the transport properly handle the rest of this situation, especially: dropping the queued data frames when the RST_STREAM is sent (or shortly thereafter)?

All the pending Data and Header frames are dropped in cleanupStreamHandler.

When does our max incoming streams quota go back up?

The check for max concurrent streams depends on the size of the activeStreams map.
We delete from activeStreams map only in deleteStream().
So the question is: When is deleteStream() called?

Without timeout:
After the trailers are written, at the beginning of cleanupStreamHandler() due to cleanupStream.onWrite().

With timeout:
Just before cleanupStream event is pushed into the controlBuf. So there is a small interval before the cleanupStreamHandler is called when an extra stream could be started. We can minimize this interval this by using cleaupStream.onWrite(), similar to the timeout case above, but this is not super urgent since it's been this way for a while.

@arjan-bal
Copy link
Contributor

@alanprot are you interested in raising a PR with the fix and a unit test for catching regressions?

@alanprot
Copy link
Author

@arjan-bal I can definitely do that, but I’m out until Tuesday, so I’d only be able to pick it up then. :)

@arjan-bal
Copy link
Contributor

I'll be closing this PR and using #8517 to track the fix as per our normal bug fixing process. When the PR with the fix is merged #8517 can be marked as completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants