From 6734c98ab978667a3060ace91e456fabd47967ed Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Fri, 2 Feb 2024 16:00:59 -0500 Subject: [PATCH 1/2] release data to pool and clear reference --- envelope.go | 22 +++++++++++++++------- protocol_connect.go | 7 +++++-- protocol_grpc.go | 15 +++++++++------ 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/envelope.go b/envelope.go index 4fa39542..b7fc16ef 100644 --- a/envelope.go +++ b/envelope.go @@ -228,7 +228,12 @@ type envelopeReader struct { func (r *envelopeReader) Unmarshal(message any) *Error { buffer := r.bufferPool.Get() - defer r.bufferPool.Put(buffer) + var dontRelease *bytes.Buffer + defer func() { + if buffer != dontRelease { + r.bufferPool.Put(buffer) + } + }() env := &envelope{Data: buffer} err := r.Read(env) @@ -256,7 +261,11 @@ func (r *envelopeReader) Unmarshal(message any) *Error { ) } decompressed := r.bufferPool.Get() - defer r.bufferPool.Put(decompressed) + defer func() { + if decompressed != dontRelease { + r.bufferPool.Put(decompressed) + } + }() if err := r.compressionPool.Decompress(decompressed, data, int64(r.readMaxBytes)); err != nil { return err } @@ -276,14 +285,13 @@ func (r *envelopeReader) Unmarshal(message any) *Error { } // One of the protocol-specific flags are set, so this is the end of the // stream. Save the message for protocol-specific code to process and - // return a sentinel error. Since we've deferred functions to return env's - // underlying buffer to a pool, we need to keep a copy. - copiedData := make([]byte, data.Len()) - copy(copiedData, data.Bytes()) + // return a sentinel error. We alias the buffer with dontRelease as a + // way of marking it so above defers don't release it to the pool. r.last = envelope{ - Data: bytes.NewBuffer(copiedData), + Data: data, Flags: env.Flags, } + dontRelease = data return errSpecialEnvelope } diff --git a/protocol_connect.go b/protocol_connect.go index 9aaa71ca..ad3f908d 100644 --- a/protocol_connect.go +++ b/protocol_connect.go @@ -863,12 +863,15 @@ func (u *connectStreamingUnmarshaler) Unmarshal(message any) *Error { if !errors.Is(err, errSpecialEnvelope) { return err } - env := u.envelopeReader.last + env := u.last + data := env.Data + u.last.Data = nil // don't keep a reference to it + defer u.bufferPool.Put(data) if !env.IsSet(connectFlagEnvelopeEndStream) { return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags) } var end connectEndStreamMessage - if err := json.Unmarshal(env.Data.Bytes(), &end); err != nil { + if err := json.Unmarshal(data.Bytes(), &end); err != nil { return errorf(CodeInternal, "unmarshal end stream message: %w", err) } for name, value := range end.Trailer { diff --git a/protocol_grpc.go b/protocol_grpc.go index b14f2293..a20b7118 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -602,9 +602,9 @@ func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error { } type grpcUnmarshaler struct { - envelopeReader envelopeReader - web bool - webTrailer http.Header + envelopeReader + web bool + webTrailer http.Header } func (u *grpcUnmarshaler) Unmarshal(message any) *Error { @@ -615,7 +615,10 @@ func (u *grpcUnmarshaler) Unmarshal(message any) *Error { if !errors.Is(err, errSpecialEnvelope) { return err } - env := u.envelopeReader.last + env := u.last + data := env.Data + u.last.Data = nil // don't keep a reference to it + defer u.bufferPool.Put(data) if !u.web || !env.IsSet(grpcFlagEnvelopeTrailer) { return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags) } @@ -623,10 +626,10 @@ func (u *grpcUnmarshaler) Unmarshal(message any) *Error { // Per the gRPC-Web specification, trailers should be encoded as an HTTP/1 // headers block _without_ the terminating newline. To make the headers // parseable by net/textproto, we need to add the newline. - if err := env.Data.WriteByte('\n'); err != nil { + if err := data.WriteByte('\n'); err != nil { return errorf(CodeInternal, "unmarshal web trailers: %w", err) } - bufferedReader := bufio.NewReader(env.Data) + bufferedReader := bufio.NewReader(data) mimeReader := textproto.NewReader(bufferedReader) mimeHeader, mimeErr := mimeReader.ReadMIMEHeader() if mimeErr != nil { From fdcbc6f0967cae7780b1ad98fdd6e6e801f85380 Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Tue, 13 Feb 2024 14:26:09 -0500 Subject: [PATCH 2/2] extra line to match style guide --- protocol_grpc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol_grpc.go b/protocol_grpc.go index a20b7118..da7064f2 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -603,6 +603,7 @@ func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error { type grpcUnmarshaler struct { envelopeReader + web bool webTrailer http.Header }