diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 45cfb22225..d9ba57ccba 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -3,6 +3,7 @@ package transport import ( "bytes" "context" + "io" "io/ioutil" "net/http" @@ -24,6 +25,15 @@ type grpcRoundTripperAdapter struct { roundTripper GrpcRoundTripper } +type buffer struct { + buff []byte + io.ReadCloser +} + +func (b *buffer) Bytes() []byte { + return b.buff +} + func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) { req, err := server.HTTPRequest(r) if err != nil { @@ -37,7 +47,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er httpResp := &http.Response{ StatusCode: int(resp.Code), - Body: ioutil.NopCloser(bytes.NewReader(resp.Body)), + Body: &buffer{buff: resp.Body, ReadCloser: ioutil.NopCloser(bytes.NewReader(resp.Body))}, Header: http.Header{}, ContentLength: int64(len(resp.Body)), } diff --git a/pkg/querier/queryrange/query_range.go b/pkg/querier/queryrange/query_range.go index 7506d4f507..32962db908 100644 --- a/pkg/querier/queryrange/query_range.go +++ b/pkg/querier/queryrange/query_range.go @@ -266,20 +266,15 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer log.Finish() - // Preallocate the buffer with the exact size so we don't waste allocations - // while progressively growing an initial small buffer. The buffer capacity - // is increased by MinRead to avoid extra allocations due to how ReadFrom() - // internally works. - buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) - if _, err := buf.ReadFrom(r.Body); err != nil { + buf, err := bodyBuffer(r) + if err != nil { log.Error(err) - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + return nil, err } - - log.LogFields(otlog.Int("bytes", buf.Len())) + log.LogFields(otlog.Int("bytes", len(buf))) var resp PrometheusResponse - if err := json.Unmarshal(buf.Bytes(), &resp); err != nil { + if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } @@ -289,6 +284,29 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R return &resp, nil } +// Buffer can be used to read a response body. +// This allows to avoid reading the body multiple times from the `http.Response.Body`. +type Buffer interface { + Bytes() []byte +} + +func bodyBuffer(res *http.Response) ([]byte, error) { + // Attempt to cast the response body to a Buffer and use it if possible. + // This is because the frontend may have already read the body and buffered it. + if buffer, ok := res.Body.(Buffer); ok { + return buffer.Bytes(), nil + } + // Preallocate the buffer with the exact size so we don't waste allocations + // while progressively growing an initial small buffer. The buffer capacity + // is increased by MinRead to avoid extra allocations due to how ReadFrom() + // internally works. + buf := bytes.NewBuffer(make([]byte, 0, res.ContentLength+bytes.MinRead)) + if _, err := buf.ReadFrom(res.Body); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + return buf.Bytes(), nil +} + func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() @@ -392,7 +410,6 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream { // bigger than the given minTs. Empty slice is returned if minTs is bigger than all the // timestamps in samples. func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample { - if len(samples) <= 0 || minTs < samples[0].TimestampMs { return samples }