Skip to content

Commit

Permalink
QueryFrontend: pass "stats" parameter forward
Browse files Browse the repository at this point in the history
If a querier sees a "stats" parameter in the query request, it will attach important information about the query execution to the response.
But currently, even if an user sets this value, the Query Frontend will lose this value in its middleware/roundtrippers.

This PR fixes this problem by properly encoding/decoding the requests in QFE.

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed Oct 21, 2024
1 parent 7d95913 commit 4ea012d
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 104 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
16 changes: 13 additions & 3 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
stdjson "encoding/json"
"fmt"
io "io"
"io"
"math"
"net/http"
"net/url"
Expand Down Expand Up @@ -329,6 +329,7 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward
result.Query = r.FormValue("query")
result.Stats = r.FormValue("stats")
result.Path = r.URL.Path
result.Stats = r.FormValue("stats")

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
Expand Down Expand Up @@ -706,6 +707,8 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
func StatsMerge(resps []Response) *PrometheusResponseStats {
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
hasStats := false
peakSamples := int32(0)
totalSamples := int64(0)
for _, resp := range resps {
stats := resp.GetStats()
if stats == nil {
Expand All @@ -720,6 +723,11 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
output[s.GetTimestampMs()] = s
}

if stats.Samples.PeakSamples > peakSamples {
peakSamples = stats.Samples.PeakSamples
}
totalSamples += stats.Samples.TotalQueryableSamples
}

if !hasStats {
Expand All @@ -733,10 +741,12 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {

sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{
PeakSamples: peakSamples,
TotalQueryableSamples: totalSamples,
}}
for _, key := range keys {
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key])
result.Samples.TotalQueryableSamples += output[key].Value
}

return result
Expand Down
38 changes: 34 additions & 4 deletions internal/cortex/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package queryrange
import (
"bytes"
"context"
io "io"
"io"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -36,6 +36,20 @@ func TestRequest(t *testing.T) {
url: query,
expected: &parsedRequestWithHeaders,
},
{
url: "/api/v1/query_range?start=0&end=60&step=14&stats=all&query=sum(container_memory_rss) by (namespace)",
expected: &PrometheusRequest{
Path: "/api/v1/query_range",
Start: 0,
End: 60_000,
Step: 14_000,
Query: "sum(container_memory_rss) by (namespace)",
Stats: "all",
Headers: []*PrometheusRequestHeader{
{Name: "Test-Header", Values: []string{"test"}},
},
},
},
{
url: "api/v1/query_range?start=foo&stats=all",
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"start\"; cannot parse \"foo\" to a valid timestamp"),
Expand Down Expand Up @@ -129,7 +143,7 @@ func TestResponseWithStats(t *testing.T) {
expected *PrometheusResponse
}{
{
body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}},"analysis":null}}`,
body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"peakSamples":0,"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}},"analysis":null}}`,
expected: &PrometheusResponse{
Status: "success",
Data: PrometheusData{
Expand Down Expand Up @@ -158,7 +172,7 @@ func TestResponseWithStats(t *testing.T) {
},
},
{
body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}},"analysis":{"name":"[noArgFunction]","executionTime":"1s","children":null}}}`,
body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}],"stats":{"samples":{"peakSamples":0,"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1536673680,5],[1536673780,5]]}},"analysis":{"name":"[noArgFunction]","executionTime":"1s","children":null}}}`,
expected: &PrometheusResponse{
Status: "success",
Data: PrometheusData{
Expand Down Expand Up @@ -211,11 +225,27 @@ func TestResponseWithStats(t *testing.T) {
}
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
assert.Equal(t, prettyPrintJsonBody(t, response.Body), prettyPrintJsonBody(t, resp2.Body))
})
}
}

func prettyPrintJsonBody(t *testing.T, body io.ReadCloser) string {
t.Helper()

bodyContent, err := io.ReadAll(body)
require.NoError(t, err)

var jsonData interface{}
err = json.Unmarshal(bodyContent, &jsonData)
require.NoError(t, err)

prettyBytes, err := json.MarshalIndent(jsonData, "", " ")
require.NoError(t, err)

return string(prettyBytes)
}

func TestMergeAPIResponses(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down
Loading

0 comments on commit 4ea012d

Please sign in to comment.