Skip to content

Commit

Permalink
server: wait to close connection until incoming socket is drained (wi…
Browse files Browse the repository at this point in the history
…th timeout)
  • Loading branch information
dfawley committed Feb 9, 2024
1 parent f135e98 commit f416f95
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
5 changes: 2 additions & 3 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,15 @@ const minBatchSize = 1000
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
// flushes the underlying connection. The connection is always left open to
// allow different closing behavior on the client and server.
func (l *loopyWriter) run() (err error) {
defer func() {
if l.logger.V(logLevel) {
l.logger.Infof("loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.run()
// Immediately close the connection, as the loopy writer returns when
// there are no more active streams and we were draining (the server
// sent a GOAWAY).
t.conn.Close()
close(t.writerDone)
}()
return t, nil
Expand Down
16 changes: 14 additions & 2 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy.run()
close(t.loopyWriterDone)
// Wait 1 second before closing the connection, or when the reader is
// done (i.e. the client already closed the connection or a connection
// error occurred). This avoids the potential problem where there is
// unread data on the receive side of the connection, which, if closed,
// would lead to a TCP RST instead of FIN, and the client encountering
// errors. For more info: https://github.com/grpc/grpc-go/issues/5358
select {
case <-t.readerDone:
case <-time.After(time.Second):
}
t.conn.Close()
}()
go t.keepalive()
return t, nil
Expand Down Expand Up @@ -609,8 +620,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
defer func() {
<-t.loopyWriterDone
close(t.readerDone)
<-t.loopyWriterDone
}()
for {
t.controlBuf.throttle()
Expand Down Expand Up @@ -1325,6 +1336,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
t.framer.writer.Flush()
if retErr != nil {
return false, retErr
}
Expand All @@ -1345,7 +1357,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
go func() {
timer := time.NewTimer(time.Minute)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-t.drainEvent.Done():
Expand Down

0 comments on commit f416f95

Please sign in to comment.