Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore common label-values in ProxyResponseHeap sort function #6299

Merged
27 changes: 26 additions & 1 deletion pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,13 @@ func (h *ProxyResponseHeap) Less(i, j int) bool {
jResp := (*h)[j].rs.At()

if iResp.GetSeries() != nil && jResp.GetSeries() != nil {
// Response sets are sorted before adding external labels.
// This comparison excludes those labels to keep the same order.
iStoreLbls := (*h)[i].rs.StoreLabels()
jStoreLbls := (*h)[j].rs.StoreLabels()
iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels)
jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels)
return labels.Compare(iLbls, jLbls) < 0
return labels.Compare(rmLabels(iLbls.Copy(), iStoreLbls), rmLabels(jLbls.Copy(), jStoreLbls)) < 0
wallee94 marked this conversation as resolved.
Show resolved Hide resolved
} else if iResp.GetSeries() == nil && jResp.GetSeries() != nil {
return true
} else if iResp.GetSeries() != nil && jResp.GetSeries() == nil {
Expand Down Expand Up @@ -257,6 +261,10 @@ func (l *lazyRespSet) Labelset() string {
return labelpb.PromLabelSetsToString(l.storeLabelSets)
}

func (l *lazyRespSet) StoreLabels() map[string]struct{} {
return l.storeLabels
}

// lazyRespSet is a lazy storepb.SeriesSet that buffers
// everything as fast as possible while at the same it permits
// reading response-by-response. It blocks if there is no data
Expand All @@ -268,6 +276,7 @@ type lazyRespSet struct {
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
storeLabels map[string]struct{}
frameTimeout time.Duration
ctx context.Context

Expand Down Expand Up @@ -374,6 +383,11 @@ func newLazyRespSet(
bufferedResponses: bufferedResponses,
shardMatcher: shardMatcher,
}
for _, ls := range storeLabelSets {
for _, l := range ls {
respSet.storeLabels[l.Name] = struct{}{}
}
}

go func(st string, l *lazyRespSet) {
bytesProcessed := 0
Expand Down Expand Up @@ -610,6 +624,7 @@ type eagerRespSet struct {

shardMatcher *storepb.ShardMatcher
removeLabels map[string]struct{}
storeLabels map[string]struct{}

// Internal bookkeeping.
bufferedResponses []*storepb.SeriesResponse
Expand Down Expand Up @@ -641,6 +656,11 @@ func newEagerRespSet(
shardMatcher: shardMatcher,
removeLabels: removeLabels,
}
for _, ls := range st.LabelSets() {
for _, l := range ls {
ret.storeLabels[l.Name] = struct{}{}
}
}

ret.wg.Add(1)

Expand Down Expand Up @@ -811,11 +831,16 @@ func (l *eagerRespSet) Labelset() string {
return labelpb.PromLabelSetsToString(l.st.LabelSets())
}

func (l *eagerRespSet) StoreLabels() map[string]struct{} {
return l.storeLabels
}

type respSet interface {
Close()
At() *storepb.SeriesResponse
Next() bool
StoreID() string
Labelset() string
StoreLabels() map[string]struct{}
Empty() bool
}
126 changes: 126 additions & 0 deletions pkg/store/proxy_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,132 @@ func TestRmLabelsCornerCases(t *testing.T) {
}), labels.Labels{})
}

type testingRespSet struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the mocks and stubs from the testutil package? You can see an example usage here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find an implementation of respSet in testutil, but I changed it to use one from proxy_head.go.

bufferedResponse []*storepb.SeriesResponse
storeLabels map[string]struct{}
i int
}

func (l *testingRespSet) Close() {}

func (l *testingRespSet) At() *storepb.SeriesResponse {
return l.bufferedResponse[l.i]
}

func (l *testingRespSet) Next() bool {
l.i++
return l.i < len(l.bufferedResponse)
}

func (l *testingRespSet) StoreID() string {
return ""
}

func (l *testingRespSet) Labelset() string {
return ""
}

func (l *testingRespSet) Empty() bool {
return l.i >= len(l.bufferedResponse)
}

func (l *testingRespSet) StoreLabels() map[string]struct{} {
return l.storeLabels
}

func TestProxyResponseHeapSort(t *testing.T) {
for _, tcase := range []struct {
title string
input []respSet
exp []*storepb.SeriesResponse
}{
{
title: "merge sets with different series and common labels",
input: []respSet{
&testingRespSet{
bufferedResponse: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")),
},
},
&testingRespSet{
bufferedResponse: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")),
},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")),
},
},
{
title: "merge sets with different series and labels",
input: []respSet{
&testingRespSet{
bufferedResponse: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")),
},
},
&testingRespSet{
bufferedResponse: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")),
},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")),
storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")),
},
},
{
title: "merge sets that were ordered before adding external labels",
input: []respSet{
&testingRespSet{
bufferedResponse: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
storeLabels: map[string]struct{}{"c": {}},
},
&testingRespSet{
bufferedResponse: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
storeLabels: map[string]struct{}{"c": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
},
} {
t.Run(tcase.title, func(t *testing.T) {
h := NewProxyResponseHeap(tcase.input...)
if !h.Empty() {
got := []*storepb.SeriesResponse{h.At()}
for h.Next() {
got = append(got, h.At())
}
testutil.Equals(t, tcase.exp, got)
}
})
}
}

func TestSortWithoutLabels(t *testing.T) {
for _, tcase := range []struct {
input []*storepb.SeriesResponse
Expand Down