Skip to content

Commit

Permalink
Merge pull request #6125 from harry671003/fix_sort_by
Browse files Browse the repository at this point in the history
Fix sorted queries do not produce sorted results for shardable queries
  • Loading branch information
yeya24 authored Feb 23, 2023
2 parents 1967cd0 + bbc85a7 commit e97f342
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6103](https://github.com/thanos-io/thanos/pull/6103) Mixins(Rule): Fix query for long rule evaluations.
- [#6121](https://github.com/thanos-io/thanos/pull/6121) Receive: Deduplicate metamonitoring queries.
- [#6137](https://github.com/thanos-io/thanos/pull/6137) Downsample: Repair of non-empty XOR chunks during 1h downsampling.
- [#6125](https://github.com/thanos-io/thanos/pull/6125) Query Frontend: Fix vertical shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk` and `bottomk` functions.

### Changed

Expand Down
4 changes: 2 additions & 2 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Codec interface {
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
type Merger interface {
// MergeResponse merges responses from multiple requests into a single Response
MergeResponse(...Response) (Response, error)
MergeResponse(Request, ...Response) (Response, error)
}

// Request represents a query range request that can be process by middlewares.
Expand Down Expand Up @@ -192,7 +192,7 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
}
}

func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func TestMergeAPIResponses(t *testing.T) {
},
}} {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(tc.input...)
output, err := PrometheusCodec.MergeResponse(nil, tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
})
Expand Down
6 changes: 3 additions & 3 deletions internal/cortex/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return nil, nil, err
}
if len(requests) == 0 {
response, err := s.merger.MergeResponse(responses...)
response, err := s.merger.MergeResponse(r, responses...)
// No downstream requests so no need to write back to the cache.
return response, nil, err
}
Expand Down Expand Up @@ -466,7 +466,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
if err != nil {
return nil, nil, err
}
merged, err := s.merger.MergeResponse(accumulator.Response, currentRes)
merged, err := s.merger.MergeResponse(r, accumulator.Response, currentRes)
if err != nil {
return nil, nil, err
}
Expand All @@ -478,7 +478,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return nil, nil, err
}

response, err := s.merger.MergeResponse(responses...)
response, err := s.merger.MergeResponse(r, responses...)
return response, mergedExtents, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
resps = append(resps, reqResp.Response)
}

response, err := s.merger.MergeResponse(resps...)
response, err := s.merger.MergeResponse(r, resps...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestSplitQuery(t *testing.T) {
}

func TestSplitByDay(t *testing.T) {
mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse)
mergedResponse, err := PrometheusCodec.MergeResponse(nil, parsedResponse, parsedResponse)
require.NoError(t, err)

mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse)
Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/downsampled.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ forLoop:
break forLoop
}
}
response, err := d.merger.MergeResponse(resps...)
response, err := d.merger.MergeResponse(req, resps...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/labels_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du
}

// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order.
func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
func (c labelsCodec) MergeResponse(_ queryrange.Request, responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
// Empty response for label_names, label_values and series API.
return &ThanosLabelsResponse{
Expand Down
6 changes: 3 additions & 3 deletions pkg/queryfrontend/labels_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func TestLabelsCodec_MergeResponse(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
codec := NewThanosLabelsCodec(false, time.Hour*2)
r, err := codec.MergeResponse(tc.responses...)
r, err := codec.MergeResponse(nil, tc.responses...)
if tc.expectedError != nil {
testutil.Equals(t, err, tc.expectedError)
} else {
Expand Down Expand Up @@ -677,7 +677,7 @@ func benchmarkMergeResponses(b *testing.B, size int) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _ = codec.MergeResponse(queryResSeries...)
_, _ = codec.MergeResponse(nil, queryResSeries...)
}
})

Expand All @@ -686,7 +686,7 @@ func benchmarkMergeResponses(b *testing.B, size int) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _ = codec.MergeResponse(queryResLabel...)
_, _ = codec.MergeResponse(nil, queryResLabel...)
}
})

Expand Down
125 changes: 110 additions & 15 deletions pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"

"github.com/prometheus/prometheus/promql/parser"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/internal/cortex/cortexpb"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
cortexutil "github.com/thanos-io/thanos/internal/cortex/util"
Expand All @@ -41,7 +43,7 @@ func NewThanosQueryInstantCodec(partialResponse bool) *queryInstantCodec {
// MergeResponse merges multiple responses into a single response. For instant query
// only vector and matrix responses will be merged because other types of queries
// are not shardable like number literal, string literal, scalar, etc.
func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
return queryrange.NewEmptyPrometheusInstantQueryResponse(), nil
} else if len(responses) == 1 {
Expand All @@ -68,13 +70,17 @@ func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (quer
},
}
default:
v, err := vectorMerge(req, promResponses)
if err != nil {
return nil, err
}
res = &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Vector{
Vector: vectorMerge(promResponses),
Vector: v,
},
},
Stats: queryrange.StatsMerge(responses),
Expand Down Expand Up @@ -228,7 +234,7 @@ func (c queryInstantCodec) EncodeResponse(ctx context.Context, res queryrange.Re
return &resp, nil
}

func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, _ queryrange.Request) (queryrange.Response, error) {
func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := io.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
Expand All @@ -254,8 +260,13 @@ func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response,
return &resp, nil
}

func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Vector {
func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQueryResponse) (*queryrange.Vector, error) {
output := map[string]*queryrange.Sample{}
metrics := []string{} // Used to preserve the order for topk and bottomk.
sortPlan, err := sortPlanForQuery(req.GetQuery())
if err != nil {
return nil, err
}
for _, resp := range resps {
if resp == nil {
continue
Expand All @@ -273,32 +284,116 @@ func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange
metric := cortexpb.FromLabelAdaptersToLabels(sample.Labels).String()
if existingSample, ok := output[metric]; !ok {
output[metric] = s
metrics = append(metrics, metric) // Preserve the order of metric.
} else if existingSample.GetSample().TimestampMs < s.GetSample().TimestampMs {
// Choose the latest sample if we see overlap.
output[metric] = s
}
}
}

result := &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0, len(output)),
}

if len(output) == 0 {
return &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0),
return result, nil
}

if sortPlan == mergeOnly {
for _, k := range metrics {
result.Samples = append(result.Samples, output[k])
}
return result, nil
}

keys := make([]string, 0, len(output))
for key := range output {
keys = append(keys, key)
type pair struct {
metric string
s *queryrange.Sample
}
sort.Strings(keys)

result := &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0, len(output)),
samples := make([]*pair, 0, len(output))
for k, v := range output {
samples = append(samples, &pair{
metric: k,
s: v,
})
}
for _, key := range keys {
result.Samples = append(result.Samples, output[key])

sort.Slice(samples, func(i, j int) bool {
// Order is determined by vector
switch sortPlan {
case sortByValuesAsc:
return samples[i].s.Sample.Value < samples[j].s.Sample.Value
case sortByValuesDesc:
return samples[i].s.Sample.Value > samples[j].s.Sample.Value
}
return samples[i].metric < samples[j].metric
})

for _, p := range samples {
result.Samples = append(result.Samples, p.s)
}
return result
return result, nil
}

type sortPlan int

const (
mergeOnly sortPlan = 0
sortByValuesAsc sortPlan = 1
sortByValuesDesc sortPlan = 2
sortByLabels sortPlan = 3
)

func sortPlanForQuery(q string) (sortPlan, error) {
expr, err := promqlparser.ParseExpr(q)
if err != nil {
return 0, err
}
// Check if the root expression is topk or bottomk
if aggr, ok := expr.(*parser.AggregateExpr); ok {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
return mergeOnly, nil
}
}
checkForSort := func(expr promqlparser.Expr) (sortAsc, sortDesc bool) {
if n, ok := expr.(*promqlparser.Call); ok {
if n.Func != nil {
if n.Func.Name == "sort" {
sortAsc = true
}
if n.Func.Name == "sort_desc" {
sortDesc = true
}
}
}
return sortAsc, sortDesc
}
// Check the root expression for sort
if sortAsc, sortDesc := checkForSort(expr); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}

// If the root expression is a binary expression, check the LHS and RHS for sort
if bin, ok := expr.(*parser.BinaryExpr); ok {
if sortAsc, sortDesc := checkForSort(bin.LHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
if sortAsc, sortDesc := checkForSort(bin.RHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
}
return sortByLabels, nil
}

func matrixMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Matrix {
Expand Down
Loading

0 comments on commit e97f342

Please sign in to comment.