diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 028ac81a7e..d5cc940637 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -163,16 +163,37 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse { // This is O(n*logk) but can be Theta(n*logk). However, // tournament trees need n-1 auxiliary nodes so there // might not be much of a difference. -type ProxyResponseHeap []ProxyResponseHeapNode +type ProxyResponseHeap struct { + nodes []ProxyResponseHeapNode + iLblsScratch labels.Labels + jLblsScratch labels.Labels +} func (h *ProxyResponseHeap) Less(i, j int) bool { - iResp := (*h)[i].rs.At() - jResp := (*h)[j].rs.At() + iResp := h.nodes[i].rs.At() + jResp := h.nodes[j].rs.At() if iResp.GetSeries() != nil && jResp.GetSeries() != nil { + // Response sets are sorted before adding external labels. + // This comparison excludes those labels to keep the same order. + iStoreLbls := h.nodes[i].rs.StoreLabels() + jStoreLbls := h.nodes[j].rs.StoreLabels() + iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - return labels.Compare(iLbls, jLbls) < 0 + + copyLabels(&h.iLblsScratch, iLbls) + copyLabels(&h.jLblsScratch, jLbls) + + var iExtLbls, jExtLbls labels.Labels + h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls) + h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls) + + c := labels.Compare(h.iLblsScratch, h.jLblsScratch) + if c != 0 { + return c < 0 + } + return labels.Compare(iExtLbls, jExtLbls) < 0 } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { return true } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { @@ -185,19 +206,19 @@ func (h *ProxyResponseHeap) Less(i, j int) bool { } func (h *ProxyResponseHeap) Len() int { - return len(*h) + return len(h.nodes) } func (h *ProxyResponseHeap) Swap(i, j int) { - (*h)[i], (*h)[j] = (*h)[j], (*h)[i] + h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i] } func (h *ProxyResponseHeap) Push(x interface{}) { - *h = append(*h, x.(ProxyResponseHeapNode)) + h.nodes = append(h.nodes, x.(ProxyResponseHeapNode)) } func (h *ProxyResponseHeap) Pop() (v interface{}) { - *h, v = (*h)[:h.Len()-1], (*h)[h.Len()-1] + h.nodes, v = h.nodes[:h.Len()-1], h.nodes[h.Len()-1] return } @@ -206,7 +227,7 @@ func (h *ProxyResponseHeap) Empty() bool { } func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode { - return &(*h)[0] + return &h.nodes[0] } type ProxyResponseHeapNode struct { @@ -216,7 +237,9 @@ type ProxyResponseHeapNode struct { // NewProxyResponseHeap returns heap that k-way merge series together. // It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order. func NewProxyResponseHeap(seriesSets ...respSet) *ProxyResponseHeap { - ret := make(ProxyResponseHeap, 0, len(seriesSets)) + ret := ProxyResponseHeap{ + nodes: make([]ProxyResponseHeapNode, 0, len(seriesSets)), + } for _, ss := range seriesSets { if ss.Empty() { @@ -257,6 +280,10 @@ func (l *lazyRespSet) Labelset() string { return labelpb.PromLabelSetsToString(l.storeLabelSets) } +func (l *lazyRespSet) StoreLabels() map[string]struct{} { + return l.storeLabels +} + // lazyRespSet is a lazy storepb.SeriesSet that buffers // everything as fast as possible while at the same it permits // reading response-by-response. It blocks if there is no data @@ -268,6 +295,7 @@ type lazyRespSet struct { closeSeries context.CancelFunc storeName string storeLabelSets []labels.Labels + storeLabels map[string]struct{} frameTimeout time.Duration ctx context.Context @@ -374,6 +402,12 @@ func newLazyRespSet( bufferedResponses: bufferedResponses, shardMatcher: shardMatcher, } + respSet.storeLabels = make(map[string]struct{}) + for _, ls := range storeLabelSets { + for _, l := range ls { + respSet.storeLabels[l.Name] = struct{}{} + } + } go func(st string, l *lazyRespSet) { bytesProcessed := 0 @@ -610,6 +644,7 @@ type eagerRespSet struct { shardMatcher *storepb.ShardMatcher removeLabels map[string]struct{} + storeLabels map[string]struct{} // Internal bookkeeping. bufferedResponses []*storepb.SeriesResponse @@ -641,6 +676,12 @@ func newEagerRespSet( shardMatcher: shardMatcher, removeLabels: removeLabels, } + ret.storeLabels = make(map[string]struct{}) + for _, ls := range st.LabelSets() { + for _, l := range ls { + ret.storeLabels[l.Name] = struct{}{} + } + } ret.wg.Add(1) @@ -748,6 +789,34 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels return l } +// dropLabels removes labels from the given label set and returns the removed labels. +func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) { + cutoff := len(l) + for i := 0; i < len(l); i++ { + if i == cutoff { + break + } + if _, ok := labelsToDrop[l[i].Name]; !ok { + continue + } + + lbl := l[i] + l = append(append(l[:i], l[i+1:]...), lbl) + cutoff-- + i-- + } + + return l[:cutoff], l[cutoff:] +} + +func copyLabels(dest *labels.Labels, src labels.Labels) { + if len(*dest) < cap(src) { + *dest = make([]labels.Label, len(src)) + } + *dest = (*dest)[:len(src)] + copy(*dest, src) +} + // sortWithoutLabels removes given labels from series and re-sorts the series responses that the same // series with different labels are coming right after each other. Other types of responses are moved to front. func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) { @@ -811,11 +880,16 @@ func (l *eagerRespSet) Labelset() string { return labelpb.PromLabelSetsToString(l.st.LabelSets()) } +func (l *eagerRespSet) StoreLabels() map[string]struct{} { + return l.storeLabels +} + type respSet interface { Close() At() *storepb.SeriesResponse Next() bool StoreID() string Labelset() string + StoreLabels() map[string]struct{} Empty() bool } diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index d37b4e4840..fdfec178ca 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -4,10 +4,12 @@ package store import ( + "sync" "testing" "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/errors" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -22,6 +24,186 @@ func TestRmLabelsCornerCases(t *testing.T) { }), labels.Labels{}) } +func TestProxyResponseHeapSort(t *testing.T) { + for _, tcase := range []struct { + title string + input []respSet + exp []*storepb.SeriesResponse + }{ + { + title: "merge sets with different series and common labels", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), + }, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")), + }, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")), + }, + }, + { + title: "merge sets with different series and labels", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), + }, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), + storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")), + }, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), + storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")), + storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), + }, + }, + { + title: "merge duplicated sets that were ordered before adding external labels", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + }, + storeLabels: map[string]struct{}{"c": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + }, + storeLabels: map[string]struct{}{"c": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + }, + }, + { + title: "merge repeated series in stores with different external labels", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + }, + storeLabels: map[string]struct{}{"ext2": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + }, + storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + }, + }, + { + title: "merge series with external labels at beginning of series", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")), + }, + storeLabels: map[string]struct{}{"a": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + }, + storeLabels: map[string]struct{}{"a": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + }, + }, + { + title: "merge series in stores with external labels not present in series (e.g. stripped during dedup)", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + }, + storeLabels: map[string]struct{}{"ext2": {}, "replica": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + }, + storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}, "replica": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + }, + }, + } { + t.Run(tcase.title, func(t *testing.T) { + h := NewProxyResponseHeap(tcase.input...) + if !h.Empty() { + got := []*storepb.SeriesResponse{h.At()} + for h.Next() { + got = append(got, h.At()) + } + testutil.Equals(t, tcase.exp, got) + } + }) + } +} + func TestSortWithoutLabels(t *testing.T) { for _, tcase := range []struct { input []*storepb.SeriesResponse