From ace88ff28507bd87ab09f643ba6e411014eb5c0d Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 10 Dec 2020 19:51:36 +0000 Subject: [PATCH 1/3] storegw: Optimized inject label stage of index lookup. Signed-off-by: Bartlomiej Plotka --- pkg/store/bucket.go | 46 ++++++++++++++++++--------- pkg/store/bucket_test.go | 68 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 15 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4e63a3fcd8..898e6ef255 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -677,7 +677,7 @@ func (s *bucketSeriesSet) Err() error { } func blockSeries( - extLset map[string]string, + extLset labels.Labels, indexr *bucketIndexReader, chunkr *bucketChunkReader, matchers []*labels.Matcher, @@ -718,7 +718,7 @@ func blockSeries( continue } - s := seriesEntry{lset: make(labels.Labels, 0, len(symbolizedLset)+len(extLset))} + s := seriesEntry{} if !req.SkipChunks { // Schedule loading chunks. s.refs = make([]uint64, 0, len(chks)) @@ -743,18 +743,7 @@ func blockSeries( return nil, nil, errors.Wrap(err, "Lookup labels symbols") } - for _, l := range lset { - // Skip if the external labels of the block overrule the series' label. - // NOTE(fabxc): maybe move it to a prefixed version to still ensure uniqueness of series? - if extLset[l.Name] != "" { - continue - } - s.lset = append(s.lset, l) - } - for ln, lv := range extLset { - s.lset = append(s.lset, labels.Label{Name: ln, Value: lv}) - } - sort.Sort(s.lset) + s.lset = injectLabels(lset, extLset) res = append(res, s) } @@ -782,6 +771,31 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } +func injectLabels(in labels.Labels, extLset labels.Labels) labels.Labels { + out := make(labels.Labels, 0, len(in)+len(extLset)) + + // Inject external labels in place. + for len(in) > 0 && len(extLset) > 0 { + d := strings.Compare(in[0].Name, extLset[0].Name) + if d == 0 { + // Duplicate, prefer external labels. + // NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series? + out = append(out, extLset[0]) + in, extLset = in[1:], extLset[1:] + } else if d < 0 { + out = append(out, in[0]) + in = in[1:] + } else if d > 0 { + out = append(out, extLset[0]) + extLset = extLset[1:] + } + } + // Append all remaining elements. + out = append(out, in...) + out = append(out, extLset...) + return out +} + func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { if in.Encoding() == chunkenc.EncXOR { out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} @@ -943,7 +957,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie g.Go(func() error { part, pstats, err := blockSeries( - b.meta.Thanos.Labels, + b.extLset, indexr, chunkr, blockMatchers, @@ -1374,6 +1388,7 @@ type bucketBlock struct { dir string indexCache storecache.IndexCache chunkPool pool.BytesPool + extLset labels.Labels indexHeaderReader indexheader.Reader @@ -1410,6 +1425,7 @@ func newBucketBlock( partitioner: p, meta: meta, indexHeaderReader: indexHeadReader, + extLset: labels.FromMap(meta.Thanos.Labels), } // Translate the block's labels and inject the block ID as a label diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e6aa993816..fa476dcabc 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2215,3 +2215,71 @@ func labelNamesFromSeriesSet(series []*storepb.Series) []string { sort.Strings(labels) return labels } + +func TestInjectExtLabels(t *testing.T) { + testInjectExtLabels(testutil.NewTB(t)) +} + +func BenchmarkInjectExtLabels(b *testing.B) { + testInjectExtLabels(testutil.NewTB(b)) +} + +var x labels.Labels + +func testInjectExtLabels(tb testutil.TB) { + in := labels.FromStrings( + "__name__", "subscription_labels", + "_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a", + "account", "1afsdfsddsfsdfsdfsdfsdfs", + "ebs_account", "1asdasdad45", + "email_domain", "asdasddgfkw.example.com", + "endpoint", "metrics", + "external_organization", "dfsdfsdf", + "instance", "10.128.4.231:8080", + "job", "sdd-acct-mngr-metrics", + "managed", "false", + "namespace", "production", + "organization", "dasdadasdasasdasaaFGDSG", + "pod", "sdd-acct-mngr-6669c947c8-xjx7f", + "prometheus", "telemeter-production/telemeter", + "prometheus_replica", "prometheus-telemeter-1", + "risk", "5", + "service", "sdd-acct-mngr-metrics", + "support", "Self-Support", // Should be overwritten. + ) + extLset := labels.FromStrings( + "support", "Host-Support", + "replica", "1", + "tenant", "2342", + ) + tb.ResetTimer() + for i := 0; i < tb.N(); i++ { + x = injectLabels(in, extLset) + + if !tb.IsBenchmark() { + testutil.Equals(tb, labels.FromStrings( + "__name__", "subscription_labels", + "_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a", + "account", "1afsdfsddsfsdfsdfsdfsdfs", + "ebs_account", "1asdasdad45", + "email_domain", "asdasddgfkw.example.com", + "endpoint", "metrics", + "external_organization", "dfsdfsdf", + "instance", "10.128.4.231:8080", + "job", "sdd-acct-mngr-metrics", + "managed", "false", + "namespace", "production", + "organization", "dasdadasdasasdasaaFGDSG", + "pod", "sdd-acct-mngr-6669c947c8-xjx7f", + "prometheus", "telemeter-production/telemeter", + "prometheus_replica", "prometheus-telemeter-1", + "replica", "1", + "risk", "5", + "service", "sdd-acct-mngr-metrics", + "support", "Host-Support", + "tenant", "2342", + ), x) + } + } + fmt.Fprint(ioutil.Discard, x) +} From 0e372f9d0e6fb24b829689edd1b3547c6e6aa25f Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 11 Dec 2020 11:54:59 +0000 Subject: [PATCH 2/3] Improved helper method instead. Signed-off-by: Bartlomiej Plotka --- cmd/thanos/rule.go | 2 + pkg/promclient/promclient.go | 7 ++- pkg/receive/multitsdb.go | 9 ++-- pkg/rules/prometheus.go | 3 +- pkg/store/bucket.go | 43 ++++------------ pkg/store/bucket_test.go | 69 +------------------------- pkg/store/labelpb/label.go | 38 +++++++++------ pkg/store/labelpb/label_test.go | 73 ++++++++++++++++++++++++++-- pkg/store/prometheus.go | 44 ++++++++--------- pkg/store/proxy.go | 2 +- pkg/store/storepb/prompb/types.pb.go | 1 + pkg/store/storepb/prompb/types.proto | 1 + pkg/store/tsdb.go | 13 ++--- 13 files changed, 150 insertions(+), 155 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 3759a685b4..13240b966c 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -10,6 +10,7 @@ import ( "net/url" "os" "path/filepath" + "sort" "strconv" "strings" "time" @@ -701,6 +702,7 @@ func parseFlagLabels(s []string) (labels.Labels, error) { } lset = append(lset, labels.Label{Name: parts[0], Value: val}) } + sort.Sort(lset) return lset, nil } diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index c20e5b162b..0ce00060a5 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -152,7 +152,7 @@ func IsWALDirAccessible(dir string) error { return nil } -// ExternalLabels returns external labels from /api/v1/status/config Prometheus endpoint. +// ExternalLabels returns sorted external labels from /api/v1/status/config Prometheus endpoint. // Note that configuration can be hot reloadable on Prometheus, so this config might change in runtime. func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labels, error) { u := *base @@ -181,7 +181,10 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil { return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML) } - return labels.FromMap(cfg.Global.ExternalLabels), nil + + lset := labels.FromMap(cfg.Global.ExternalLabels) + sort.Sort(lset) + return lset, nil } type Flags struct { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 08033d0a7f..ca42bd24e4 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -24,6 +24,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -43,6 +44,8 @@ type MultiTSDB struct { allowOutOfOrderUpload bool } +// NewMultiTSDB creates new MultiTSDB. +// NOTE: Passed labels has to be sorted by name. func NewMultiTSDB( dataDir string, l log.Logger, @@ -262,7 +265,7 @@ func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer { func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error { reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg) - lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) + lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID)) dataDir := t.defaultTenantDataDir(tenantID) level.Info(logger).Log("msg", "opening TSDB") @@ -286,13 +289,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant reg, dataDir, t.bucket, - func() labels.Labels { return lbls }, + func() labels.Labels { return lset }, metadata.ReceiveSource, false, t.allowOutOfOrderUpload, ) } - tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship) + tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lset), s, ship) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/rules/prometheus.go b/pkg/rules/prometheus.go index 811cf66fb3..6b7c99cfc7 100644 --- a/pkg/rules/prometheus.go +++ b/pkg/rules/prometheus.go @@ -52,10 +52,11 @@ func (p *Prometheus) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer) return nil } +// extLset has to be sorted. func enrichRulesWithExtLabels(groups []*rulespb.RuleGroup, extLset labels.Labels) { for _, g := range groups { for _, r := range g.Rules { - r.SetLabels(labelpb.ExtendLabels(r.GetLabels(), extLset)) + r.SetLabels(labelpb.ExtendSortedLabels(r.GetLabels(), extLset)) } } } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 898e6ef255..04061438ac 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -743,7 +743,7 @@ func blockSeries( return nil, nil, errors.Wrap(err, "Lookup labels symbols") } - s.lset = injectLabels(lset, extLset) + s.lset = labelpb.ExtendSortedLabels(lset, extLset) res = append(res, s) } @@ -771,31 +771,6 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func injectLabels(in labels.Labels, extLset labels.Labels) labels.Labels { - out := make(labels.Labels, 0, len(in)+len(extLset)) - - // Inject external labels in place. - for len(in) > 0 && len(extLset) > 0 { - d := strings.Compare(in[0].Name, extLset[0].Name) - if d == 0 { - // Duplicate, prefer external labels. - // NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series? - out = append(out, extLset[0]) - in, extLset = in[1:], extLset[1:] - } else if d < 0 { - out = append(out, in[0]) - in = in[1:] - } else if d > 0 { - out = append(out, extLset[0]) - extLset = extLset[1:] - } - } - // Append all remaining elements. - out = append(out, in...) - out = append(out, extLset...) - return out -} - func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { if in.Encoding() == chunkenc.EncXOR { out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} @@ -1426,14 +1401,14 @@ func newBucketBlock( meta: meta, indexHeaderReader: indexHeadReader, extLset: labels.FromMap(meta.Thanos.Labels), - } - - // Translate the block's labels and inject the block ID as a label - // to allow to match blocks also by ID. - b.relabelLabels = append(labels.FromMap(meta.Thanos.Labels), labels.Label{ - Name: block.BlockIDLabel, - Value: meta.ULID.String(), - }) + // Translate the block's labels and inject the block ID as a label + // to allow to match blocks also by ID. + relabelLabels: append(labels.FromMap(meta.Thanos.Labels), labels.Label{ + Name: block.BlockIDLabel, + Value: meta.ULID.String(), + }), + } + sort.Sort(b.extLset) sort.Sort(b.relabelLabels) // Get object handles for all chunk files (segment files) from meta.json, if available. diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index fa476dcabc..b32ee35f3b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1257,6 +1257,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, + extLset: extLset, } blocks = append(blocks, b) } @@ -2215,71 +2216,3 @@ func labelNamesFromSeriesSet(series []*storepb.Series) []string { sort.Strings(labels) return labels } - -func TestInjectExtLabels(t *testing.T) { - testInjectExtLabels(testutil.NewTB(t)) -} - -func BenchmarkInjectExtLabels(b *testing.B) { - testInjectExtLabels(testutil.NewTB(b)) -} - -var x labels.Labels - -func testInjectExtLabels(tb testutil.TB) { - in := labels.FromStrings( - "__name__", "subscription_labels", - "_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a", - "account", "1afsdfsddsfsdfsdfsdfsdfs", - "ebs_account", "1asdasdad45", - "email_domain", "asdasddgfkw.example.com", - "endpoint", "metrics", - "external_organization", "dfsdfsdf", - "instance", "10.128.4.231:8080", - "job", "sdd-acct-mngr-metrics", - "managed", "false", - "namespace", "production", - "organization", "dasdadasdasasdasaaFGDSG", - "pod", "sdd-acct-mngr-6669c947c8-xjx7f", - "prometheus", "telemeter-production/telemeter", - "prometheus_replica", "prometheus-telemeter-1", - "risk", "5", - "service", "sdd-acct-mngr-metrics", - "support", "Self-Support", // Should be overwritten. - ) - extLset := labels.FromStrings( - "support", "Host-Support", - "replica", "1", - "tenant", "2342", - ) - tb.ResetTimer() - for i := 0; i < tb.N(); i++ { - x = injectLabels(in, extLset) - - if !tb.IsBenchmark() { - testutil.Equals(tb, labels.FromStrings( - "__name__", "subscription_labels", - "_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a", - "account", "1afsdfsddsfsdfsdfsdfsdfs", - "ebs_account", "1asdasdad45", - "email_domain", "asdasddgfkw.example.com", - "endpoint", "metrics", - "external_organization", "dfsdfsdf", - "instance", "10.128.4.231:8080", - "job", "sdd-acct-mngr-metrics", - "managed", "false", - "namespace", "production", - "organization", "dasdadasdasasdasaaFGDSG", - "pod", "sdd-acct-mngr-6669c947c8-xjx7f", - "prometheus", "telemeter-production/telemeter", - "prometheus_replica", "prometheus-telemeter-1", - "replica", "1", - "risk", "5", - "service", "sdd-acct-mngr-metrics", - "support", "Host-Support", - "tenant", "2342", - ), x) - } - } - fmt.Fprint(ioutil.Discard, x) -} diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index ea0a7fa49b..5712cd912f 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -235,27 +235,35 @@ func (m *ZLabel) Compare(other ZLabel) int { return strings.Compare(m.Value, other.Value) } -// ExtendLabels extend given labels by extend in labels format. +// ExtendSortedLabels extend given labels by extend in labels format. // The type conversion is done safely, which means we don't modify extend labels underlying array. // // In case of existing labels already present in given label set, it will be overwritten by external one. -func ExtendLabels(lset labels.Labels, extend labels.Labels) labels.Labels { - overwritten := map[string]struct{}{} - for i, l := range lset { - if v := extend.Get(l.Name); v != "" { - lset[i].Value = v - overwritten[l.Name] = struct{}{} +// NOTE: Labels and extend has to be sorted. +func ExtendSortedLabels(lset labels.Labels, extend labels.Labels) labels.Labels { + ret := make(labels.Labels, 0, len(lset)+len(extend)) + + // Inject external labels in place. + for len(lset) > 0 && len(extend) > 0 { + d := strings.Compare(lset[0].Name, extend[0].Name) + if d == 0 { + // Duplicate, prefer external labels. + // NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series? + ret = append(ret, extend[0]) + lset, extend = lset[1:], extend[1:] + } else if d < 0 { + ret = append(ret, lset[0]) + lset = lset[1:] + } else if d > 0 { + ret = append(ret, extend[0]) + extend = extend[1:] } } - for _, l := range extend { - if _, ok := overwritten[l.Name]; ok { - continue - } - lset = append(lset, l) - } - sort.Sort(lset) - return lset + // Append all remaining elements. + ret = append(ret, lset...) + ret = append(ret, extend...) + return ret } func PromLabelSetsToString(lsets []labels.Labels) string { diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index d8d258e8e1..a6227dfd27 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -5,6 +5,7 @@ package labelpb import ( "fmt" + ioutil "io/ioutil" "reflect" "sort" "testing" @@ -52,13 +53,79 @@ func TestLabelMarshal_Unmarshal(t *testing.T) { func TestExtendLabels(t *testing.T) { testutil.Equals(t, labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}}, - ExtendLabels(labels.Labels{{Name: "xb", Value: "2"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01"))) + ExtendSortedLabels(labels.Labels{{Name: "a", Value: "1"}, {Name: "xb", Value: "2"}}, labels.FromStrings("replica", "01"))) testutil.Equals(t, labels.Labels{{Name: "replica", Value: "01"}}, - ExtendLabels(labels.Labels{}, labels.FromStrings("replica", "01"))) + ExtendSortedLabels(labels.Labels{}, labels.FromStrings("replica", "01"))) testutil.Equals(t, labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}}, - ExtendLabels(labels.Labels{{Name: "xb", Value: "2"}, {Name: "replica", Value: "NOT01"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01"))) + ExtendSortedLabels(labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "NOT01"}, {Name: "xb", Value: "2"}}, labels.FromStrings("replica", "01"))) + + testInjectExtLabels(testutil.NewTB(t)) +} + +func BenchmarkExtendLabels(b *testing.B) { + testInjectExtLabels(testutil.NewTB(b)) +} + +var x labels.Labels + +func testInjectExtLabels(tb testutil.TB) { + in := labels.FromStrings( + "__name__", "subscription_labels", + "_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a", + "account", "1afsdfsddsfsdfsdfsdfsdfs", + "ebs_account", "1asdasdad45", + "email_domain", "asdasddgfkw.example.com", + "endpoint", "metrics", + "external_organization", "dfsdfsdf", + "instance", "10.128.4.231:8080", + "job", "sdd-acct-mngr-metrics", + "managed", "false", + "namespace", "production", + "organization", "dasdadasdasasdasaaFGDSG", + "pod", "sdd-acct-mngr-6669c947c8-xjx7f", + "prometheus", "telemeter-production/telemeter", + "prometheus_replica", "prometheus-telemeter-1", + "risk", "5", + "service", "sdd-acct-mngr-metrics", + "support", "Self-Support", // Should be overwritten. + ) + extLset := labels.FromStrings( + "support", "Host-Support", + "replica", "1", + "tenant", "2342", + ) + tb.ResetTimer() + for i := 0; i < tb.N(); i++ { + x = ExtendSortedLabels(in, extLset) + + if !tb.IsBenchmark() { + testutil.Equals(tb, labels.FromStrings( + "__name__", "subscription_labels", + "_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a", + "account", "1afsdfsddsfsdfsdfsdfsdfs", + "ebs_account", "1asdasdad45", + "email_domain", "asdasddgfkw.example.com", + "endpoint", "metrics", + "external_organization", "dfsdfsdf", + "instance", "10.128.4.231:8080", + "job", "sdd-acct-mngr-metrics", + "managed", "false", + "namespace", "production", + "organization", "dasdadasdasasdasaaFGDSG", + "pod", "sdd-acct-mngr-6669c947c8-xjx7f", + "prometheus", "telemeter-production/telemeter", + "prometheus_replica", "prometheus-telemeter-1", + "replica", "1", + "risk", "5", + "service", "sdd-acct-mngr-metrics", + "support", "Host-Support", + "tenant", "2342", + ), x) + } + } + fmt.Fprint(ioutil.Discard, x) } var ( diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 0e4210084a..6d885e5c5b 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -43,13 +43,13 @@ import ( // PrometheusStore implements the store node API on top of the Prometheus remote read API. type PrometheusStore struct { - logger log.Logger - base *url.URL - client *promclient.Client - buffers sync.Pool - component component.StoreAPI - externalLabels func() labels.Labels - timestamps func() (mint int64, maxt int64) + logger log.Logger + base *url.URL + client *promclient.Client + buffers sync.Pool + component component.StoreAPI + externalLabelsFn func() labels.Labels + timestamps func() (mint int64, maxt int64) remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType @@ -60,14 +60,14 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size // NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client // to talk to Prometheus. -// It attaches the provided external labels to all results. +// It attaches the provided external labels to all results. Provided external labels has to be sorted. func NewPrometheusStore( logger log.Logger, reg prometheus.Registerer, client *promclient.Client, baseURL *url.URL, component component.StoreAPI, - externalLabels func() labels.Labels, + externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), ) (*PrometheusStore, error) { if logger == nil { @@ -78,7 +78,7 @@ func NewPrometheusStore( base: baseURL, client: client, component: component, - externalLabels: externalLabels, + externalLabelsFn: externalLabelsFn, timestamps: timestamps, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { @@ -100,7 +100,7 @@ func NewPrometheusStore( // NOTE(bwplotka): MaxTime & MinTime are not accurate nor adjusted dynamically. // This is fine for now, but might be needed in future. func (p *PrometheusStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.InfoResponse, error) { - lset := p.externalLabels() + lset := p.externalLabelsFn() mint, maxt := p.timestamps() res := &storepb.InfoResponse{ @@ -133,9 +133,9 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error { - externalLabels := p.externalLabels() + extLset := p.externalLabelsFn() - match, newMatchers, err := matchesExternalLabels(r.Matchers, externalLabels) + match, newMatchers, err := matchesExternalLabels(r.Matchers, extLset) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -160,11 +160,11 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return err } for _, lbm := range labelMaps { - lset := make([]labelpb.ZLabel, 0, len(lbm)+len(externalLabels)) + lset := make([]labelpb.ZLabel, 0, len(lbm)+len(extLset)) for k, v := range lbm { lset = append(lset, labelpb.ZLabel{Name: k, Value: v}) } - lset = append(lset, labelpb.ZLabelsFromPromLabels(externalLabels)...) + lset = append(lset, labelpb.ZLabelsFromPromLabels(extLset)...) sort.Slice(lset, func(i, j int) bool { return lset[i].Name < lset[j].Name }) @@ -207,16 +207,16 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie // remote read. contentType := httpResp.Header.Get("Content-Type") if strings.HasPrefix(contentType, "application/x-protobuf") { - return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels) + return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset) } if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { return errors.Errorf("not supported remote read content type: %s", contentType) } - return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels) + return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset) } -func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error { +func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, extLset labels.Labels) error { ctx := s.Context() level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") @@ -232,7 +232,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series span.SetTag("series_count", len(resp.Results[0].Timeseries)) for _, e := range resp.Results[0].Timeseries { - lset := labelpb.ExtendLabels(labelpb.ZLabelsToPromLabels(e.Labels), externalLabels) + lset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(e.Labels), extLset) if len(e.Samples) == 0 { // As found in https://github.com/thanos-io/thanos/issues/381 // Prometheus can give us completely empty time series. Ignore these with log until we figure out that @@ -262,7 +262,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series return nil } -func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error { +func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, extLset labels.Labels) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") framesNum := 0 @@ -316,7 +316,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ Labels: labelpb.ZLabelsFromPromLabels( - labelpb.ExtendLabels(labelpb.ZLabelsToPromLabels(series.Labels), externalLabels), + labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset), ), Chunks: thanosChks, })); err != nil { @@ -492,7 +492,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR // LabelValues returns all known label values for a given label name. func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - externalLset := p.externalLabels() + externalLset := p.externalLabelsFn() // First check for matching external label which has priority. if l := externalLset.Get(r.Label); l != "" { diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 4bb9f2f1a5..235adab589 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -145,7 +145,7 @@ func (s *ProxyStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I labelSets := make(map[uint64]labelpb.ZLabelSet, len(stores)) for _, st := range stores { for _, lset := range st.LabelSets() { - mergedLabelSet := labelpb.ExtendLabels(lset, s.selectorLabels) + mergedLabelSet := labelpb.ExtendSortedLabels(lset, s.selectorLabels) labelSets[mergedLabelSet.Hash()] = labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(mergedLabelSet)} } } diff --git a/pkg/store/storepb/prompb/types.pb.go b/pkg/store/storepb/prompb/types.pb.go index f9b8bbc131..93ab7ece8c 100644 --- a/pkg/store/storepb/prompb/types.pb.go +++ b/pkg/store/storepb/prompb/types.pb.go @@ -138,6 +138,7 @@ func (m *Sample) GetTimestamp() int64 { // TimeSeries represents samples and labels for a single time series. type TimeSeries struct { + // Labels have to be sorted by label names and without duplicated label names. // TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details. Labels []github_com_thanos_io_thanos_pkg_store_labelpb.ZLabel `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" json:"labels"` Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` diff --git a/pkg/store/storepb/prompb/types.proto b/pkg/store/storepb/prompb/types.proto index 2b7ac25775..64b8f0d9ed 100644 --- a/pkg/store/storepb/prompb/types.proto +++ b/pkg/store/storepb/prompb/types.proto @@ -35,6 +35,7 @@ message Sample { // TimeSeries represents samples and labels for a single time series. message TimeSeries { + // Labels have to be sorted by label names and without duplicated label names. // TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details. repeated thanos.Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"]; repeated Sample samples = 2 [(gogoproto.nullable) = false]; diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index c9f4421e71..09e964c1ab 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -37,7 +37,7 @@ type TSDBStore struct { logger log.Logger db TSDBReader component component.StoreAPI - externalLabels labels.Labels + extLset labels.Labels maxBytesPerFrame int } @@ -54,7 +54,8 @@ type ReadWriteTSDBStore struct { } // NewTSDBStore creates a new TSDBStore. -func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, component component.StoreAPI, externalLabels labels.Labels) *TSDBStore { +// NOTE: Given lset has to be sorted. +func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } @@ -62,7 +63,7 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com logger: logger, db: db, component: component, - externalLabels: externalLabels, + extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, } } @@ -75,7 +76,7 @@ func (s *TSDBStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.In } res := &storepb.InfoResponse{ - Labels: labelpb.ZLabelsFromPromLabels(s.externalLabels), + Labels: labelpb.ZLabelsFromPromLabels(s.extLset), StoreType: s.component.ToProto(), MinTime: minTime, MaxTime: math.MaxInt64, @@ -101,7 +102,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, newMatchers, err := matchesExternalLabels(r.Matchers, s.externalLabels) + match, newMatchers, err := matchesExternalLabels(r.Matchers, s.extLset) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -135,7 +136,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - seriesLabels := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendLabels(series.Labels(), s.externalLabels))} + seriesLabels := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(series.Labels(), s.extLset))} if r.SkipChunks { if err := srv.Send(storepb.NewSeriesResponse(&seriesLabels)); err != nil { return status.Error(codes.Aborted, err.Error()) From 83f393dd5866847d71f42411086a6bd0db3c5416 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 11 Dec 2020 12:08:32 +0000 Subject: [PATCH 3/3] Updated busybox pin. Signed-off-by: Bartlomiej Plotka --- Dockerfile | 3 ++- Dockerfile.multi-stage | 3 ++- Makefile | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6e7df72df5..11f8f57596 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,5 @@ -ARG SHA="0c38f63cbe19e40123668a48c36466ef72b195e723cbfcbe01e9657a5f14cec6" +# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions. +ARG SHA="fca3819d670cdaee0d785499fda202ea01c0640ca0803d26ae6dbf2a1c8c041c" FROM quay.io/prometheus/busybox@sha256:${SHA} LABEL maintainer="The Thanos Authors" diff --git a/Dockerfile.multi-stage b/Dockerfile.multi-stage index dc393f9297..b0bb659306 100644 --- a/Dockerfile.multi-stage +++ b/Dockerfile.multi-stage @@ -14,7 +14,8 @@ RUN git update-index --refresh; make build # ----------------------------------------------------------------------------- -ARG SHA="0c38f63cbe19e40123668a48c36466ef72b195e723cbfcbe01e9657a5f14cec6" +# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions. +ARG SHA="fca3819d670cdaee0d785499fda202ea01c0640ca0803d26ae6dbf2a1c8c041c" FROM quay.io/prometheus/busybox@sha256:${SHA} LABEL maintainer="The Thanos Authors" diff --git a/Makefile b/Makefile index 6dc317ae59..6a75d52dee 100644 --- a/Makefile +++ b/Makefile @@ -8,13 +8,13 @@ DOCKER_CI_TAG ?= test SHA='' arch = $(shell uname -m) # Run `DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect quay.io/prometheus/busybox:latest` to get SHA -# Update at 2020.07.06 +# Update at 2020.12.11 ifeq ($(arch), x86_64) # amd64 - SHA="248b7ec76e03e6b4fbb796fc3cdd2f91dad45546a6d7dee61c322475e0e8a08f" + SHA="fca3819d670cdaee0d785499fda202ea01c0640ca0803d26ae6dbf2a1c8c041c" else ifeq ($(arch), armv8) # arm64 - SHA="69508e8fdc516eacbacc0379c03c971e3043706cc8211e6bddb35d903edc3628" + SHA="5478a46f1eb37ebe414c399766f8088bc8353345602053485dd429b9a87097e5" else echo >&2 "only support amd64 or arm64 arch" && exit 1 endif