From 8f0acbeb92a8fe3183ad712cee1759428ab4cadd Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 1 Feb 2024 10:26:46 -0700 Subject: [PATCH 1/4] Delay closing connection on the server after all streams are completed --- server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server.go b/server.go index e89c5ac6136c..8246bb3f162e 100644 --- a/server.go +++ b/server.go @@ -1013,6 +1013,7 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, } defer func() { + time.Sleep(time.Second) st.Close(errors.New("finished serving streams for the server transport")) for _, sh := range s.opts.statsHandlers { sh.HandleConn(ctx, &stats.ConnEnd{}) From ede0afe4b93e3ca34dd3c1a262486bd14abff0c9 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 1 Feb 2024 11:07:00 -0700 Subject: [PATCH 2/4] increase sleep interval --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 8246bb3f162e..973dbfac1fc5 100644 --- a/server.go +++ b/server.go @@ -1013,7 +1013,7 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, } defer func() { - time.Sleep(time.Second) + time.Sleep(2 * time.Second) st.Close(errors.New("finished serving streams for the server transport")) for _, sh := range s.opts.statsHandlers { sh.HandleConn(ctx, &stats.ConnEnd{}) From f61ec455d53d37d720ecabff9b2307f8969e6c4f Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 1 Feb 2024 11:32:19 -0700 Subject: [PATCH 3/4] set timeout to 0 in tests --- server.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 973dbfac1fc5..1478c934837a 100644 --- a/server.go +++ b/server.go @@ -31,6 +31,7 @@ import ( "strings" "sync" "sync/atomic" + "testing" "time" "golang.org/x/net/trace" @@ -66,6 +67,8 @@ const ( listenerAddressForServeHTTP = "listenerAddressForServeHTTP" ) +var goAwayTimeout = 1 * time.Second + func init() { internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { return srv.opts.creds @@ -83,6 +86,10 @@ func init() { internal.BinaryLogger = binaryLogger internal.JoinServerOptions = newJoinServerOption internal.RecvBufferPool = recvBufferPool + + if testing.Testing() { + goAwayTimeout = 0 + } } var statusOK = status.New(codes.OK, "") @@ -1013,7 +1020,7 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, } defer func() { - time.Sleep(2 * time.Second) + time.Sleep(goAwayTimeout) st.Close(errors.New("finished serving streams for the server transport")) for _, sh := range s.opts.statsHandlers { sh.HandleConn(ctx, &stats.ConnEnd{}) From cac6dc2a0b7af489d4825b8894978610f3ae4c50 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Thu, 1 Feb 2024 12:14:47 -0700 Subject: [PATCH 4/4] Add comment --- server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server.go b/server.go index 1478c934837a..d2840b715d22 100644 --- a/server.go +++ b/server.go @@ -1020,6 +1020,10 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, } defer func() { + // The client might not be done reading all the data, which at this point + // is stored in the kernel TCP buffer, yet. If we close the connection right away + // the client will get RST and the request will fail. Delay closing for 1 sec. + // See more details in https://github.com/grpc/grpc-go/pull/6957 time.Sleep(goAwayTimeout) st.Close(errors.New("finished serving streams for the server transport")) for _, sh := range s.opts.statsHandlers {