-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transport: drain client transport when streamID approaches maxStreamID #5889
Conversation
internal/transport/defaults.go
Outdated
// MaxStreamIDForTesting is the upper bound for the stream ID before the current | ||
// transport gracefully closes and a new transport is created for subsequent RPCs. | ||
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it. | ||
var MaxStreamIDForTesting = uint32(float32(math.MaxUint32) * 0.75) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: functions that do stuff for testing are ForTesting
. This can be just MaxStreamID
, and some tests happen to override it, which seems fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg! i'm updating this
internal/transport/http2_client.go
Outdated
// 75% of math.MaxUint32, which then signals gRPC to restart transport | ||
// for subsequent RPCs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe "which signals gRPC that the connection is closed and a new one must be created for subsequent RPCs"? "Restart transport" doesn't really imply the right things, and also this doesn't happen automatically (the LB policy would need to request it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/transport/http2_client.go
Outdated
if t.nextID > MaxStreamIDForTesting { | ||
transportDrainRequired = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shorten: transportDrainRequired = t.nextID > MaxStreamIDForTesting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/transport/http2_client.go
Outdated
@@ -862,6 +869,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, | |||
sh.HandleRPC(s.ctx, outHeader) | |||
} | |||
} | |||
if transportDrainRequired { | |||
if logger.V(logLevel) { | |||
logger.Infof("t.nextID > MaxStreamID. transport: draining") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"transport" is supposed to be the prefix of the whole log message. How about transport: t.nextID > MaxStreamID. Draining
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes! that makes sense
internal/transport/transport_test.go
Outdated
@@ -536,6 +536,40 @@ func (s) TestInflightStreamClosing(t *testing.T) { | |||
} | |||
} | |||
|
|||
// TestClientTransportDrainsAfterStreamIdExhausted tests that when | |||
// streamID > MaxStreamId, the current client transport drains. | |||
func (s) TestClientTransportDrainsAfterStreamIdExhausted(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: in Go style we write "ID" not "Id".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
test/end2end_test.go
Outdated
transport.MaxStreamIDForTesting = originalMaxStreamID | ||
}() | ||
|
||
// setting up StubServer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar nits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing...
test/end2end_test.go
Outdated
return nil | ||
}, | ||
} | ||
testpb.RegisterTestServiceServer(s, ss) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate you have to have a ton of the boilerplate that StubServer
is intended to help with.
Two options:
- use
WithContextDialer
on the client to detect when we're (re)connecting. - Add some stuff to
StubServer
to facilitate wrapping the listener.
This will save you manually creating the server, registering the service, dialing the client, wrapping the client with the grpc stub, and dealing with extra shutdown tasks. The resulting test code will be the more interesting code which will make it easier to read and understand what it's doing that's unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also bouncing around different things to get ListenerWrapper to work. I'm doing ahead with option 2 to facilitate StubServer with ListenerWrapper.. let me know what you think of the approach @dfawley
test/end2end_test.go
Outdated
if _, err = stream.Recv(); err != nil && err != io.EOF { | ||
t.Fatalf("stream.Recv() = _, %v want: nil or EOF", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we know what we should receive? Shouldn't it always be nil
since the server sends 10 messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, should never error
test/end2end_test.go
Outdated
ss := &stubserver.StubServer{ | ||
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { | ||
for i := 0; i < 10; i++ { | ||
m := &grpc.PreparedMsg{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a PreparedMsg
? That seems unnecessarily complex. Just do stream.SendMsg(testpb.WhateverThisIs{})
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. ive updating this in the rev
test/end2end_test.go
Outdated
t.Fatal("timeout expired when waiting to create new conn channel") | ||
} | ||
|
||
// verifying the connection to the old one is drained and closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly you haven't fully received off the streams that were active. So this only works because the flow control is enough that the server finished the stream.
I'd recommend avoiding "coincidences" like this (even if they're literally never going to not be true).
Instead you should make the client & server behavior align.
It also could be interesting to test that the stream is still active even after the new connection is created, which is how "draining" behavior should work. E.g.
- Create a stream that ping-pongs back and forth.
- Use it and make sure it works.
- Create a stream on the next connection that does the same. (Verify that the new connection was used for it somehow.)
- Use it and make sure it works.
- Verify that the stream from (1) still works.
- Close streams from (1) and (3). Client-side context cancelation should be easy and fine here.
- Expect the original connection to close.
One way to validate which connection the stream is using: you could use a custom credentials (its AuthInfo ends up in the Peer) that counts connections and increments some number for each one. Creds could also be used to determine when connections are closed, since they return a wrapped net.Conn
back to us that is what we really close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the detailed comment. I've updated the test to align to your comment. lemme know what you think.
internal/transport/http2_client.go
Outdated
if t.nextID > MaxStreamIDForTesting { | ||
transportDrainRequired = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh whoops. You have a race. This bool needs to be set inside checkForStreamQuota
then read here (or below as is and ignore here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im made the change now
internal/transport/defaults.go
Outdated
// MaxStreamIDForTesting is the upper bound for the stream ID before the current | ||
// transport gracefully closes and a new transport is created for subsequent RPCs. | ||
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it. | ||
var MaxStreamIDForTesting = uint32(float32(math.MaxUint32) * 0.75) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a nit: I'd simplify this to something like uint32(3*1024*1024*1024)
or 3_221_225_472
or 0xc0000000
. (Basically, extra casts and floats (in general) should be avoided.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i missed this one.. updating this now
64494e1
to
4b1c7cd
Compare
I just merged 3 of my commits by
but now it shows up as forced push on this PR 😢 .. im going to refrain for any commit amends in the future. Luckily i see all the past comments here. |
cb377f5
to
97fe966
Compare
5dbeec7
to
fa9b35f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. Just some minor comments this time around.
test/end2end_test.go
Outdated
// connWrapperWithCloseCh wraps a net.Conn and pushes on a channel when closed. | ||
type connWrapperWithCloseCh struct { | ||
net.Conn | ||
CloseCh *testutils.Channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one needs to unexported as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm.. isn't this unexported right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseCh
field is still exported, which is immaterial given that the struct is unexported now. But it is better to not have this anomaly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. thanks. updating this
test/end2end_test.go
Outdated
|
||
// Setting up 3 streams and calling Send() and Recv() once on each. | ||
for i := 0; i < 3; i++ { | ||
var p peer.Peer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Since p
is always used as a pointer, you could instead do p := new(peer.Peer)
here and replace &p
with p
down below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. updating this
test/transport_end2end_test.go
Outdated
@@ -0,0 +1,198 @@ | |||
/* | |||
* | |||
* Copyright 2022 gRPC authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2023 now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol. In my head, i'm still stuck in 2020
test/transport_end2end_test.go
Outdated
@@ -0,0 +1,198 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Just transport_test.go
; it's cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/transport_end2end_test.go
Outdated
for { | ||
select { | ||
case cw.closeCh <- nil: | ||
return err | ||
case <-cw.closeCh: | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't you just do close(cw.closeCh)
?
Close shouldn't be called multiple times -- is it? If so, use a grpcsync.Event
for simplicity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. updated
test/transport_end2end_test.go
Outdated
func (c *transportRestartCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
conn := &connWrapperWithCloseCh{Conn: rawConn, closeCh: make(chan interface{}, 1)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing the channel means you don't need a buffer on it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
test/transport_end2end_test.go
Outdated
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
conn := &connWrapperWithCloseCh{Conn: rawConn, closeCh: make(chan interface{}, 1)} | ||
c.connections = append(c.connections, *conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make connections
a []*connWrapperWithCloseCh
to avoid copies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didnt know that before. Updating.
test/transport_end2end_test.go
Outdated
t.Fatalf("Creating FullDuplex stream: %v", err) | ||
} | ||
|
||
streams = append(streams, s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the connection guaranteed to be established even without this?
I think so, as gRPC won't return from NewStream
unless there's at least a connection for the RPC. But maybe not, because it could be that we see the first connection as okay and return a stream that uses it
So maybe we don't need this? Or if we do need it, please add a comment explaining why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heavily offline-d this with Doug. Turns out there was a bug in the way this implemented restarted the stream. The test was also checking the true end2end behavior incorrectly.
Like in this example, 3 streams are created (s1,s2,s3) while MaxStreamId is set to 5. Which means there should not be any transport restarts.
However, it was perceived that s3 was created on a new connection. this is because, since we are creating 3 streams on the same connection, the 3rd stream makes transport call graceClose. Now when a worker picks up an item from the controlbuf (which is a headerFrame for the stream), it would see that the stream is already in draining (checked in headerFrame.initStream). This is a race, and the test would have been flakey.
This bug would not have immediate regression due to the 3B buffer in streamIDs, but thanks @dfawley for calling this out!
test/transport_end2end_test.go
Outdated
// expectedNumConns when each stream is created. | ||
expectedNumConns := []int{1, 1, 2} | ||
|
||
// Set up 3 streams and call sendAndReceive() once on each. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no more sendAndReceive()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol yea.. sad old function. This PR has had this function created and deleted multiple times. Sucks to be sendAndReceive()
test/transport_end2end_test.go
Outdated
|
||
var connPerStream []net.Conn | ||
|
||
// The peer passed via the call option is set up only after the RPC is complete. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true. nixing it
test/transport_end2end_test.go
Outdated
connPerStream = append(connPerStream, p.AuthInfo.(*authInfoWithConn).conn) | ||
} | ||
|
||
// Verifying the first and second RPCs were made on the same connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not even sure we need any of this anymore, since you're verifying the number of connections after each stream, above. Please consider and simplify as much as possible (without compromising the validity/coverage of the test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are correct. This doesnt seem to be important anymore. I can also get rid of the AuthInfo stuff along with this.
test/transport_end2end_test.go
Outdated
connPerStream = append(connPerStream, p.AuthInfo.(*authInfoWithConn).conn) | ||
} | ||
|
||
// Verifying the first and second RPCs were made on the same connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Verify
and below 2x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. and noted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo nits/cleanups!
internal/transport/controlbuf.go
Outdated
func (l *loopyWriter) originateStream(str *outStream) error { | ||
hdr := str.itl.dequeue().(*headerFrame) | ||
// originateStreamWithHeaderFrame calls the initStream function on the headerFrame and | ||
// called writeHeader. If write succeeds the streamID is added to l.estdStreams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
called->calls?
And end the last sentence with a period, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I nixed the comment itself. Seems like i really just wrote exactly what the below 10 lines do
internal/transport/controlbuf.go
Outdated
// called writeHeader. If write succeeds the streamID is added to l.estdStreams | ||
func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error { | ||
// l.draining is set for an incomingGoAway. In which case, we want to avoid further | ||
// writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
Also... we want to avoid creating new streams
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// l.draining is set for an incomingGoAway. In which case, we want to avoid further | ||
// writes | ||
if l.draining { | ||
hdr.onOrphaned(errStreamDrain) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: provide a better error with the reason we are in draining.
e.g.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
internal/transport/http2_client.go
Outdated
@@ -742,15 +742,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, | |||
endStream: false, | |||
initStream: func(id uint32) error { | |||
t.mu.Lock() | |||
if state := t.state; state != reachable { | |||
// we want initStream to cleanup and return an error when transport is closing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: capitalize "We".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nixing this comment as well. Seems like it doesnt add value when i read it now
if state := t.state; state != reachable { | ||
// we want initStream to cleanup and return an error when transport is closing. | ||
// initStream is never called when transport is draining. | ||
if t.state == closing { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: handle transport closure in loopy instead and remove this.
e.g.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// connWrapperWithCloseCh wraps a net.Conn and pushes on a channel when closed. | ||
type connWrapperWithCloseCh struct { | ||
net.Conn | ||
closeCh chan interface{} | ||
close *grpcsync.Event | ||
} | ||
|
||
// Close closes the connection and sends a value on the close channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...and fires the close event.
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
test/transport_test.go
Outdated
err := cw.Conn.Close() | ||
for { | ||
select { | ||
case cw.closeCh <- nil: | ||
return err | ||
case <-cw.closeCh: | ||
} | ||
} | ||
cw.close.Fire() | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit/optional:
cw.close.Fire()
return cw.Conn.Close()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
thanks @dfawley @arvindbr8 |
// MaxStreamID is the upper bound for the stream ID before the current | ||
// transport gracefully closes and new transport is created for subsequent RPCs. | ||
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it. | ||
var MaxStreamID = uint32(3_221_225_472) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamID
is a 31-bit unsigned integer. The max value a streamID could be before overflow is 2^31-1 = 2,147,483,647
.
I'm not sure this will actually prevent this issue from happening, as the overflow would occur before 3_221_225_472
. You can read more about this in RFC-7540: https://www.rfc-editor.org/rfc/rfc7540#section-5.1.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @dfawley
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec:
Streams are identified with an unsigned 31-bit integer. Streams
initiated by a client MUST use odd-numbered stream identifiers; those
initiated by the server MUST use even-numbered stream identifiers. A
stream identifier of zero (0x0) is used for connection control
messages; the stream identifier of zero cannot be used to establish a
new stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @patrick-ogrady! Great catch. Im going to fix that
From #5358 (comment), we noticed that users may be working around the max stream ID issue by using max connection age, which is not really an intended use case of that feature. This change sets MaxStreamID to 75% of math.uint32 and drain client transport when next stream ID > MaxStreamID.
Fixes #5600
RELEASE NOTES: