Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release buffer for end-stream messages back to the pool #678

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,12 @@ type envelopeReader struct {

func (r *envelopeReader) Unmarshal(message any) *Error {
buffer := r.bufferPool.Get()
defer r.bufferPool.Put(buffer)
var dontRelease *bytes.Buffer
defer func() {
if buffer != dontRelease {
r.bufferPool.Put(buffer)
}
}()

env := &envelope{Data: buffer}
err := r.Read(env)
Expand Down Expand Up @@ -256,7 +261,11 @@ func (r *envelopeReader) Unmarshal(message any) *Error {
)
}
decompressed := r.bufferPool.Get()
defer r.bufferPool.Put(decompressed)
defer func() {
if decompressed != dontRelease {
r.bufferPool.Put(decompressed)
}
}()
if err := r.compressionPool.Decompress(decompressed, data, int64(r.readMaxBytes)); err != nil {
return err
}
Expand All @@ -276,14 +285,13 @@ func (r *envelopeReader) Unmarshal(message any) *Error {
}
// One of the protocol-specific flags are set, so this is the end of the
// stream. Save the message for protocol-specific code to process and
// return a sentinel error. Since we've deferred functions to return env's
// underlying buffer to a pool, we need to keep a copy.
copiedData := make([]byte, data.Len())
copy(copiedData, data.Bytes())
// return a sentinel error. We alias the buffer with dontRelease as a
// way of marking it so above defers don't release it to the pool.
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
r.last = envelope{
Data: bytes.NewBuffer(copiedData),
Data: data,
Flags: env.Flags,
}
dontRelease = data
emcfarlane marked this conversation as resolved.
Show resolved Hide resolved
return errSpecialEnvelope
}

Expand Down
7 changes: 5 additions & 2 deletions protocol_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,12 +863,15 @@ func (u *connectStreamingUnmarshaler) Unmarshal(message any) *Error {
if !errors.Is(err, errSpecialEnvelope) {
return err
}
env := u.envelopeReader.last
env := u.last
data := env.Data
u.last.Data = nil // don't keep a reference to it
defer u.bufferPool.Put(data)
if !env.IsSet(connectFlagEnvelopeEndStream) {
return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags)
}
var end connectEndStreamMessage
if err := json.Unmarshal(env.Data.Bytes(), &end); err != nil {
if err := json.Unmarshal(data.Bytes(), &end); err != nil {
return errorf(CodeInternal, "unmarshal end stream message: %w", err)
}
for name, value := range end.Trailer {
Expand Down
15 changes: 9 additions & 6 deletions protocol_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,9 @@ func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error {
}

type grpcUnmarshaler struct {
envelopeReader envelopeReader
web bool
webTrailer http.Header
envelopeReader
jhump marked this conversation as resolved.
Show resolved Hide resolved
web bool
webTrailer http.Header
}

func (u *grpcUnmarshaler) Unmarshal(message any) *Error {
Expand All @@ -615,18 +615,21 @@ func (u *grpcUnmarshaler) Unmarshal(message any) *Error {
if !errors.Is(err, errSpecialEnvelope) {
return err
}
env := u.envelopeReader.last
env := u.last
data := env.Data
u.last.Data = nil // don't keep a reference to it
defer u.bufferPool.Put(data)
if !u.web || !env.IsSet(grpcFlagEnvelopeTrailer) {
return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags)
}

// Per the gRPC-Web specification, trailers should be encoded as an HTTP/1
// headers block _without_ the terminating newline. To make the headers
// parseable by net/textproto, we need to add the newline.
if err := env.Data.WriteByte('\n'); err != nil {
if err := data.WriteByte('\n'); err != nil {
return errorf(CodeInternal, "unmarshal web trailers: %w", err)
}
bufferedReader := bufio.NewReader(env.Data)
bufferedReader := bufio.NewReader(data)
mimeReader := textproto.NewReader(bufferedReader)
mimeHeader, mimeErr := mimeReader.ReadMIMEHeader()
if mimeErr != nil {
Expand Down
Loading