Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore common label-values in ProxyResponseHeap sort function #6299

Merged
94 changes: 84 additions & 10 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,37 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse {
// This is O(n*logk) but can be Theta(n*logk). However,
// tournament trees need n-1 auxiliary nodes so there
// might not be much of a difference.
type ProxyResponseHeap []ProxyResponseHeapNode
type ProxyResponseHeap struct {
nodes []ProxyResponseHeapNode
iLblsScratch labels.Labels
jLblsScratch labels.Labels
}

func (h *ProxyResponseHeap) Less(i, j int) bool {
iResp := (*h)[i].rs.At()
jResp := (*h)[j].rs.At()
iResp := h.nodes[i].rs.At()
jResp := h.nodes[j].rs.At()

if iResp.GetSeries() != nil && jResp.GetSeries() != nil {
// Response sets are sorted before adding external labels.
// This comparison excludes those labels to keep the same order.
iStoreLbls := h.nodes[i].rs.StoreLabels()
jStoreLbls := h.nodes[j].rs.StoreLabels()

iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels)
jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels)
return labels.Compare(iLbls, jLbls) < 0

copyLabels(&h.iLblsScratch, iLbls)
copyLabels(&h.jLblsScratch, jLbls)

var iExtLbls, jExtLbls labels.Labels
h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls)
h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls)

c := labels.Compare(h.iLblsScratch, h.jLblsScratch)
if c != 0 {
return c < 0
}
return labels.Compare(iExtLbls, jExtLbls) < 0
} else if iResp.GetSeries() == nil && jResp.GetSeries() != nil {
return true
} else if iResp.GetSeries() != nil && jResp.GetSeries() == nil {
Expand All @@ -185,19 +206,19 @@ func (h *ProxyResponseHeap) Less(i, j int) bool {
}

func (h *ProxyResponseHeap) Len() int {
return len(*h)
return len(h.nodes)
}

func (h *ProxyResponseHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i]
}

func (h *ProxyResponseHeap) Push(x interface{}) {
*h = append(*h, x.(ProxyResponseHeapNode))
h.nodes = append(h.nodes, x.(ProxyResponseHeapNode))
}

func (h *ProxyResponseHeap) Pop() (v interface{}) {
*h, v = (*h)[:h.Len()-1], (*h)[h.Len()-1]
h.nodes, v = h.nodes[:h.Len()-1], h.nodes[h.Len()-1]
return
}

Expand All @@ -206,7 +227,7 @@ func (h *ProxyResponseHeap) Empty() bool {
}

func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode {
return &(*h)[0]
return &h.nodes[0]
}

type ProxyResponseHeapNode struct {
Expand All @@ -216,7 +237,9 @@ type ProxyResponseHeapNode struct {
// NewProxyResponseHeap returns heap that k-way merge series together.
// It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order.
func NewProxyResponseHeap(seriesSets ...respSet) *ProxyResponseHeap {
ret := make(ProxyResponseHeap, 0, len(seriesSets))
ret := ProxyResponseHeap{
nodes: make([]ProxyResponseHeapNode, 0, len(seriesSets)),
}

for _, ss := range seriesSets {
if ss.Empty() {
Expand Down Expand Up @@ -257,6 +280,10 @@ func (l *lazyRespSet) Labelset() string {
return labelpb.PromLabelSetsToString(l.storeLabelSets)
}

func (l *lazyRespSet) StoreLabels() map[string]struct{} {
return l.storeLabels
}

// lazyRespSet is a lazy storepb.SeriesSet that buffers
// everything as fast as possible while at the same it permits
// reading response-by-response. It blocks if there is no data
Expand All @@ -268,6 +295,7 @@ type lazyRespSet struct {
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
storeLabels map[string]struct{}
frameTimeout time.Duration
ctx context.Context

Expand Down Expand Up @@ -374,6 +402,12 @@ func newLazyRespSet(
bufferedResponses: bufferedResponses,
shardMatcher: shardMatcher,
}
respSet.storeLabels = make(map[string]struct{})
for _, ls := range storeLabelSets {
for _, l := range ls {
respSet.storeLabels[l.Name] = struct{}{}
}
}

go func(st string, l *lazyRespSet) {
bytesProcessed := 0
Expand Down Expand Up @@ -610,6 +644,7 @@ type eagerRespSet struct {

shardMatcher *storepb.ShardMatcher
removeLabels map[string]struct{}
storeLabels map[string]struct{}

// Internal bookkeeping.
bufferedResponses []*storepb.SeriesResponse
Expand Down Expand Up @@ -641,6 +676,12 @@ func newEagerRespSet(
shardMatcher: shardMatcher,
removeLabels: removeLabels,
}
ret.storeLabels = make(map[string]struct{})
for _, ls := range st.LabelSets() {
for _, l := range ls {
ret.storeLabels[l.Name] = struct{}{}
}
}

ret.wg.Add(1)

Expand Down Expand Up @@ -748,6 +789,34 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels
return l
}

// dropLabels removes labels from the given label set and returns the removed labels.
func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) {
cutoff := len(l)
for i := 0; i < len(l); i++ {
if i == cutoff {
break
}
if _, ok := labelsToDrop[l[i].Name]; !ok {
continue
}

lbl := l[i]
l = append(append(l[:i], l[i+1:]...), lbl)
cutoff--
i--
}

return l[:cutoff], l[cutoff:]
}

func copyLabels(dest *labels.Labels, src labels.Labels) {
if len(*dest) < cap(src) {
*dest = make([]labels.Label, len(src))
}
*dest = (*dest)[:len(src)]
copy(*dest, src)
}

// sortWithoutLabels removes given labels from series and re-sorts the series responses that the same
// series with different labels are coming right after each other. Other types of responses are moved to front.
func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) {
Expand Down Expand Up @@ -811,11 +880,16 @@ func (l *eagerRespSet) Labelset() string {
return labelpb.PromLabelSetsToString(l.st.LabelSets())
}

func (l *eagerRespSet) StoreLabels() map[string]struct{} {
return l.storeLabels
}

type respSet interface {
Close()
At() *storepb.SeriesResponse
Next() bool
StoreID() string
Labelset() string
StoreLabels() map[string]struct{}
Empty() bool
}
182 changes: 182 additions & 0 deletions pkg/store/proxy_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package store

import (
"sync"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand All @@ -22,6 +24,186 @@ func TestRmLabelsCornerCases(t *testing.T) {
}), labels.Labels{})
}

func TestProxyResponseHeapSort(t *testing.T) {
for _, tcase := range []struct {
title string
input []respSet
exp []*storepb.SeriesResponse
}{
{
title: "merge sets with different series and common labels",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")),
},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")),
},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")),
},
},
{
title: "merge sets with different series and labels",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")),
},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")),
},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")),
storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")),
},
},
{
title: "merge duplicated sets that were ordered before adding external labels",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
storeLabels: map[string]struct{}{"c": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
storeLabels: map[string]struct{}{"c": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
},
{
title: "merge repeated series in stores with different external labels",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
storeLabels: map[string]struct{}{"ext2": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
},
storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
},
{
title: "merge series with external labels at beginning of series",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")),
},
storeLabels: map[string]struct{}{"a": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
},
storeLabels: map[string]struct{}{"a": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
},
},
{
title: "merge series in stores with external labels not present in series (e.g. stripped during dedup)",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
storeLabels: map[string]struct{}{"ext2": {}, "replica": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
},
storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}, "replica": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
},
} {
t.Run(tcase.title, func(t *testing.T) {
h := NewProxyResponseHeap(tcase.input...)
if !h.Empty() {
got := []*storepb.SeriesResponse{h.At()}
for h.Next() {
got = append(got, h.At())
}
testutil.Equals(t, tcase.exp, got)
}
})
}
}

func TestSortWithoutLabels(t *testing.T) {
for _, tcase := range []struct {
input []*storepb.SeriesResponse
Expand Down