From 0edfecdc2e862a6f6803125276e557d6209097c0 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Fri, 5 Jul 2024 12:13:47 -0700 Subject: [PATCH 01/11] [http_util/bufWriter] fast-fail on error returned from flushKeepBuffer() --- internal/transport/http_util.go | 7 ++++-- internal/transport/http_util_test.go | 34 ++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 39cef3bd442e..e483f67afdea 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -36,6 +36,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/codes" ) @@ -335,10 +336,12 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { w.offset += nn n += nn if w.offset >= w.batchSize { - err = w.flushKeepBuffer() + if err = w.flushKeepBuffer(); err != nil { + return n, err + } } } - return n, err + return n, nil } func (w *bufWriter) Flush() error { diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index cc7807670b62..ac3e364dd06e 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -19,7 +19,10 @@ package transport import ( + "errors" "fmt" + "io" + "net" "reflect" "testing" "time" @@ -215,6 +218,37 @@ func (s) TestParseDialTarget(t *testing.T) { } } +type badNetworkConn struct { + net.Conn +} + +func (c *badNetworkConn) Write([]byte) (int, error) { + return 0, io.EOF +} + +func TestWriteBadConnection(t *testing.T) { + data := []byte("test_data") + writeBufferSize := len(data) - 1 + writer := newBufWriter(&badNetworkConn{}, writeBufferSize, getWriteBufferPool(writeBufferSize)) + + syncCh := make(chan struct{}) + var err error + go func() { + _, err = writer.Write(data) + close(syncCh) + }() + + select { + case <-time.After(time.Second): + t.Fatalf("Write() did not return in time") + case <-syncCh: + } + + if !errors.Is(err, io.EOF) { + t.Fatalf("Write() did not return an error or returned unexpeced one: err=%v", err) + } +} + func BenchmarkDecodeGrpcMessage(b *testing.B) { input := "Hello, %E4%B8%96%E7%95%8C" want := "Hello, 世界" From ac5160942cb18eabc379bfd67ad6f20f16ae0833 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Mon, 8 Jul 2024 18:21:06 -0700 Subject: [PATCH 02/11] address comments --- internal/transport/http_util.go | 3 +-- internal/transport/http_util_test.go | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index e483f67afdea..5258d3864a07 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -36,7 +36,6 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - "google.golang.org/grpc/codes" ) @@ -341,7 +340,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { } } } - return n, nil + return n, err } func (w *bufWriter) Flush() error { diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index ac3e364dd06e..24f082c0d40e 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -226,26 +226,27 @@ func (c *badNetworkConn) Write([]byte) (int, error) { return 0, io.EOF } -func TestWriteBadConnection(t *testing.T) { +// This test ensures Write() on a broken network connection does not lead to +// an infinite loop. More details in https://github.com/grpc/grpc-go/issues/7389. +func (s) TestWriteBadConnection(t *testing.T) { data := []byte("test_data") - writeBufferSize := len(data) - 1 + writeBufferSize := (len(data) - 1) / 2 writer := newBufWriter(&badNetworkConn{}, writeBufferSize, getWriteBufferPool(writeBufferSize)) - syncCh := make(chan struct{}) - var err error + syncCh := make(chan error) + defer close(syncCh) go func() { - _, err = writer.Write(data) - close(syncCh) + _, err := writer.Write(data) + syncCh <- err }() select { case <-time.After(time.Second): t.Fatalf("Write() did not return in time") - case <-syncCh: - } - - if !errors.Is(err, io.EOF) { - t.Fatalf("Write() did not return an error or returned unexpeced one: err=%v", err) + case err := <-syncCh: + if !errors.Is(err, io.EOF) { + t.Fatalf("Write() = %v, want error presence = %v", err, io.EOF) + } } } From f43d1242a0d428c7921855c068f1211f66ef8f5f Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Mon, 8 Jul 2024 18:23:39 -0700 Subject: [PATCH 03/11] return nil if success --- internal/transport/http_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 5258d3864a07..f6e7c967f226 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -340,7 +340,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { } } } - return n, err + return n, nil } func (w *bufWriter) Flush() error { From 17d23426356b67d27b4aaf0ad799c7a1f2fa4fbe Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Tue, 9 Jul 2024 17:03:44 -0700 Subject: [PATCH 04/11] address PR comments --- internal/transport/http_util_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index 24f082c0d40e..5a259d43cdc2 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -227,23 +227,24 @@ func (c *badNetworkConn) Write([]byte) (int, error) { } // This test ensures Write() on a broken network connection does not lead to -// an infinite loop. More details in https://github.com/grpc/grpc-go/issues/7389. +// an infinite loop. See https://github.com/grpc/grpc-go/issues/7389 for more details. func (s) TestWriteBadConnection(t *testing.T) { data := []byte("test_data") + // Configure the bufWriter with a batchsize that results in data being flushed + // to the underlying conn, midway through Write(). writeBufferSize := (len(data) - 1) / 2 writer := newBufWriter(&badNetworkConn{}, writeBufferSize, getWriteBufferPool(writeBufferSize)) - syncCh := make(chan error) - defer close(syncCh) + errCh := make(chan error, 1) go func() { _, err := writer.Write(data) - syncCh <- err + errCh <- err }() select { case <-time.After(time.Second): t.Fatalf("Write() did not return in time") - case err := <-syncCh: + case err := <-errCh: if !errors.Is(err, io.EOF) { t.Fatalf("Write() = %v, want error presence = %v", err, io.EOF) } From 446bee150046914dcb37aa4b79743ddfd53bb738 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Fri, 19 Jul 2024 13:03:06 -0700 Subject: [PATCH 05/11] correctly return total bytes written from Write() call --- internal/transport/http_util.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index f6e7c967f226..206bfa223804 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -36,6 +36,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/codes" ) @@ -329,13 +330,15 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { b := w.pool.Get().(*[]byte) w.buf = *b } + var bytesWritten int for len(b) > 0 { nn := copy(w.buf[w.offset:], b) b = b[nn:] w.offset += nn - n += nn if w.offset >= w.batchSize { - if err = w.flushKeepBuffer(); err != nil { + bytesWritten, err = w.flushKeepBuffer() + n += bytesWritten + if err != nil { return n, err } } @@ -344,7 +347,7 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { } func (w *bufWriter) Flush() error { - err := w.flushKeepBuffer() + _, err := w.flushKeepBuffer() // Only release the buffer if we are in a "shared" mode if w.buf != nil && w.pool != nil { b := w.buf @@ -354,17 +357,18 @@ func (w *bufWriter) Flush() error { return err } -func (w *bufWriter) flushKeepBuffer() error { +func (w *bufWriter) flushKeepBuffer() (int, error) { if w.err != nil { - return w.err + return 0, w.err } if w.offset == 0 { - return nil + return 0, nil } - _, w.err = w.conn.Write(w.buf[:w.offset]) + var n int + n, w.err = w.conn.Write(w.buf[:w.offset]) w.err = toIOError(w.err) w.offset = 0 - return w.err + return n, w.err } type ioError struct { From 93993a032d094a49515b44dac09c4b5c63195772 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Fri, 19 Jul 2024 13:06:33 -0700 Subject: [PATCH 06/11] remove blankline --- internal/transport/http_util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 206bfa223804..6a0620c2b6a5 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -36,7 +36,6 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - "google.golang.org/grpc/codes" ) From 37ee46a8e7203e8a2025f8861022fdb44c9adfd2 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Tue, 23 Jul 2024 20:29:56 -0700 Subject: [PATCH 07/11] update Write() impl per pr review comment --- internal/transport/http_util.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 6a0620c2b6a5..57d6759ec41f 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -36,6 +36,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + "google.golang.org/grpc/codes" ) @@ -317,32 +318,34 @@ func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter { return w } -func (w *bufWriter) Write(b []byte) (n int, err error) { +func (w *bufWriter) Write(b []byte) (int, error) { if w.err != nil { return 0, w.err } if w.batchSize == 0 { // Buffer has been disabled. - n, err = w.conn.Write(b) + n, err := w.conn.Write(b) return n, toIOError(err) } if w.buf == nil { b := w.pool.Get().(*[]byte) w.buf = *b } - var bytesWritten int + written := 0 for len(b) > 0 { - nn := copy(w.buf[w.offset:], b) - b = b[nn:] - w.offset += nn - if w.offset >= w.batchSize { - bytesWritten, err = w.flushKeepBuffer() - n += bytesWritten - if err != nil { - return n, err - } + copied := copy(w.buf[w.offset:], b) + b = b[copied:] + w.offset += copied + if w.offset < w.batchSize { + written += copied + continue + } + flushed, err := w.flushKeepBuffer() + if err != nil { + return written + flushed, err } + written += flushed } - return n, nil + return written, nil } func (w *bufWriter) Flush() error { From f01ef256eb2ad42a4e861bf1e23d66ca95b077bc Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Tue, 23 Jul 2024 20:31:34 -0700 Subject: [PATCH 08/11] simplify flushKeepBuffer impl --- internal/transport/http_util.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 57d6759ec41f..9db22f1293d1 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -366,9 +366,8 @@ func (w *bufWriter) flushKeepBuffer() (int, error) { if w.offset == 0 { return 0, nil } - var n int - n, w.err = w.conn.Write(w.buf[:w.offset]) - w.err = toIOError(w.err) + n, err := w.conn.Write(w.buf[:w.offset]) + w.err = toIOError(err) w.offset = 0 return n, w.err } From 89aed2b0d538bd6049304d13b0fca2b35542200b Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Tue, 23 Jul 2024 20:41:32 -0700 Subject: [PATCH 09/11] revert import sort --- internal/transport/http_util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 9db22f1293d1..4c6432915fd9 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -36,7 +36,6 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - "google.golang.org/grpc/codes" ) From 5faf4be26829cc609c91e8223676e03a0d9aa3b2 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Wed, 31 Jul 2024 19:37:39 -0700 Subject: [PATCH 10/11] revert flushKeepBuffer change --- internal/transport/http_util.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 4c6432915fd9..41b676d93728 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -333,22 +333,20 @@ func (w *bufWriter) Write(b []byte) (int, error) { for len(b) > 0 { copied := copy(w.buf[w.offset:], b) b = b[copied:] + written += copied w.offset += copied if w.offset < w.batchSize { - written += copied continue } - flushed, err := w.flushKeepBuffer() - if err != nil { - return written + flushed, err + if err := w.flushKeepBuffer(); err != nil { + return written, err } - written += flushed } return written, nil } func (w *bufWriter) Flush() error { - _, err := w.flushKeepBuffer() + err := w.flushKeepBuffer() // Only release the buffer if we are in a "shared" mode if w.buf != nil && w.pool != nil { b := w.buf @@ -358,17 +356,17 @@ func (w *bufWriter) Flush() error { return err } -func (w *bufWriter) flushKeepBuffer() (int, error) { +func (w *bufWriter) flushKeepBuffer() error { if w.err != nil { - return 0, w.err + return w.err } if w.offset == 0 { - return 0, nil + return nil } - n, err := w.conn.Write(w.buf[:w.offset]) + _, err := w.conn.Write(w.buf[:w.offset]) w.err = toIOError(err) w.offset = 0 - return n, w.err + return w.err } type ioError struct { From 877dd6fd2f56f40c64b78af7a3cc6c1678bed1c1 Mon Sep 17 00:00:00 2001 From: Oleg Guba Date: Wed, 31 Jul 2024 19:41:08 -0700 Subject: [PATCH 11/11] revert changes in flushKeepBuffer --- internal/transport/http_util.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 41b676d93728..f609c6c66595 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -363,8 +363,8 @@ func (w *bufWriter) flushKeepBuffer() error { if w.offset == 0 { return nil } - _, err := w.conn.Write(w.buf[:w.offset]) - w.err = toIOError(err) + _, w.err = w.conn.Write(w.buf[:w.offset]) + w.err = toIOError(w.err) w.offset = 0 return w.err }