Skip to content

Commit

Permalink
Reuse the byte buffer from GRPC response in the frontend. (cortexproj…
Browse files Browse the repository at this point in the history
…ect#4377)

* Reuse the byte buffer from GRPC response in the frontend.

This PR allow to reuse the GRPC byte buffer casted as a `io.Reader` in the http rountripper of the frontend.
This way we don't have to copy the content from the reader into another buffer when reading the http response.

I've been testing this with in Loki and this shows good memory saving (around 1GB).

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Use safe cast

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedback.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Alvin Lin <alvinlin@amazon.com>
  • Loading branch information
cyriltovena authored and alvinlin123 committed Jan 14, 2022
1 parent 31bce1c commit e765e51
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
12 changes: 11 additions & 1 deletion pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transport
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"

Expand All @@ -24,6 +25,15 @@ type grpcRoundTripperAdapter struct {
roundTripper GrpcRoundTripper
}

type buffer struct {
buff []byte
io.ReadCloser
}

func (b *buffer) Bytes() []byte {
return b.buff
}

func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
req, err := server.HTTPRequest(r)
if err != nil {
Expand All @@ -37,7 +47,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er

httpResp := &http.Response{
StatusCode: int(resp.Code),
Body: ioutil.NopCloser(bytes.NewReader(resp.Body)),
Body: &buffer{buff: resp.Body, ReadCloser: ioutil.NopCloser(bytes.NewReader(resp.Body))},
Header: http.Header{},
ContentLength: int64(len(resp.Body)),
}
Expand Down
39 changes: 28 additions & 11 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,20 +266,15 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

// Preallocate the buffer with the exact size so we don't waste allocations
// while progressively growing an initial small buffer. The buffer capacity
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
// internally works.
buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead))
if _, err := buf.ReadFrom(r.Body); err != nil {
buf, err := bodyBuffer(r)
if err != nil {
log.Error(err)
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
return nil, err
}

log.LogFields(otlog.Int("bytes", buf.Len()))
log.LogFields(otlog.Int("bytes", len(buf)))

var resp PrometheusResponse
if err := json.Unmarshal(buf.Bytes(), &resp); err != nil {
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

Expand All @@ -289,6 +284,29 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
return &resp, nil
}

// Buffer can be used to read a response body.
// This allows to avoid reading the body multiple times from the `http.Response.Body`.
type Buffer interface {
Bytes() []byte
}

func bodyBuffer(res *http.Response) ([]byte, error) {
// Attempt to cast the response body to a Buffer and use it if possible.
// This is because the frontend may have already read the body and buffered it.
if buffer, ok := res.Body.(Buffer); ok {
return buffer.Bytes(), nil
}
// Preallocate the buffer with the exact size so we don't waste allocations
// while progressively growing an initial small buffer. The buffer capacity
// is increased by MinRead to avoid extra allocations due to how ReadFrom()
// internally works.
buf := bytes.NewBuffer(make([]byte, 0, res.ContentLength+bytes.MinRead))
if _, err := buf.ReadFrom(res.Body); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
return buf.Bytes(), nil
}

func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()
Expand Down Expand Up @@ -392,7 +410,6 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in samples.
func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {

if len(samples) <= 0 || minTs < samples[0].TimestampMs {
return samples
}
Expand Down

0 comments on commit e765e51

Please sign in to comment.