From 85261bcf0db4375312ad8d9ba0c18c2adb52906d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 6 Jun 2024 15:55:54 +0300 Subject: [PATCH] receive/multitsdb: add cuckoo filter on metric names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a cuckoo filter on metric names to reduce fan-out cost. Signed-off-by: Giedrius Statkevičius --- .bingo/gotesplit.sum | 2 + go.mod | 2 + go.sum | 4 ++ pkg/filter/cuckoo.go | 49 +++++++++++++++++++ pkg/query/endpointset.go | 2 + .../test-storeset-pre-v0.8.0/storeset.go | 2 + pkg/receive/multitsdb.go | 10 +++- pkg/store/proxy.go | 16 ++++++ pkg/store/storepb/testutil/client.go | 2 + pkg/store/tsdb.go | 46 ++++++++++++++++- 10 files changed, 132 insertions(+), 3 deletions(-) create mode 100644 pkg/filter/cuckoo.go diff --git a/.bingo/gotesplit.sum b/.bingo/gotesplit.sum index a13290d3464..e8cb91cf0e2 100644 --- a/.bingo/gotesplit.sum +++ b/.bingo/gotesplit.sum @@ -1,4 +1,6 @@ github.com/Songmu/gotesplit v0.2.1 h1:qJFvR75nJpeKyMQFwyDtFrcc6zDWhrHAkks7DvM8oLo= github.com/Songmu/gotesplit v0.2.1/go.mod h1:sVBfmLT26b1H5VhUpq8cRhCVK75GAmW9c8r2NiK0gzk= github.com/jstemmer/go-junit-report v1.0.0 h1:8X1gzZpR+nVQLAht+L/foqOeX2l9DTZoaIPbEQHxsds= +github.com/jstemmer/go-junit-report v1.0.0/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/go.mod b/go.mod index 1e5c29b7028..dcce18af4ee 100644 --- a/go.mod +++ b/go.mod @@ -120,6 +120,7 @@ require ( github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.27.10 github.com/prometheus-community/prom-label-proxy v0.7.0 + github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb @@ -127,6 +128,7 @@ require ( require ( github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect + github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/go-openapi/runtime v0.26.0 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/google/s2a-go v0.1.7 // indirect diff --git a/go.sum b/go.sum index bd76a6263f7..ffc0e8d6036 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.106.0 h1:m5iErwl3xHovGFlawd50n54ntgXHt1BLsvU6BXsVxEU= github.com/digitalocean/godo v1.106.0/go.mod h1:R6EmmWI8CT1+fCtjWY9UCB+L5uufuZH13wk3YhxycCs= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= @@ -918,6 +920,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrY github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 h1:yWfiTPwYxB0l5fGMhl/G+liULugVIHD9AU77iNLrURQ= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= +github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= diff --git a/pkg/filter/cuckoo.go b/pkg/filter/cuckoo.go new file mode 100644 index 00000000000..86b53428437 --- /dev/null +++ b/pkg/filter/cuckoo.go @@ -0,0 +1,49 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package filter + +import ( + "sync" + + cuckoo "github.com/seiflotfy/cuckoofilter" +) + +type MetricNameFilter interface { + MatchesMetricName(metricName string) bool + ResetAddMetricName(metricNames ...string) +} + +type AllowAllMetricNameFilter struct{} + +func (f AllowAllMetricNameFilter) MatchesMetricName(metricName string) bool { + return true +} + +func (f AllowAllMetricNameFilter) ResetAddMetricName(metricNames ...string) {} + +type CuckooFilterMetricNameFilter struct { + filter *cuckoo.Filter + mtx sync.RWMutex +} + +func NewCuckooFilterMetricNameFilter(capacity uint) *CuckooFilterMetricNameFilter { + return &CuckooFilterMetricNameFilter{ + filter: cuckoo.NewFilter(capacity), + } +} + +func (f *CuckooFilterMetricNameFilter) MatchesMetricName(metricName string) bool { + f.mtx.RLock() + defer f.mtx.RUnlock() + return f.filter.Lookup([]byte(metricName)) +} + +func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) { + f.mtx.Lock() + defer f.mtx.Unlock() + f.filter.Reset() + for _, metricName := range metricNames { + f.filter.Insert([]byte(metricName)) + } +} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index f0daa868007..029f72f5a8e 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -14,6 +14,7 @@ import ( "unicode/utf8" "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/thanos-io/thanos/pkg/filter" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -636,6 +637,7 @@ func (e *EndpointSet) GetEndpointStatus() []EndpointStatus { type endpointRef struct { storepb.StoreClient + filter.AllowAllMetricNameFilter mtx sync.RWMutex cc *grpc.ClientConn diff --git a/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go index ddf413bd8dc..c49d539dfca 100644 --- a/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go +++ b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" @@ -169,6 +170,7 @@ func NewStoreSet( type storeRef struct { storepb.StoreClient + filter.AllowAllMetricNameFilter mtx sync.RWMutex cc *grpc.ClientConn diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 87ac1c9d87f..3251c2729c4 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/objstore" @@ -103,14 +104,16 @@ func NewMultiTSDB( type localClient struct { store *store.TSDBStore desc string + filter.MetricNameFilter } func newLocalClient(store *store.TSDBStore) *localClient { mint, maxt := store.TimeRange() return &localClient{ - store: store, - desc: fmt.Sprintf("LabelSets: %v MinTime: %d MaxTime: %d", labelpb.PromLabelSetsToString(labelpb.ZLabelSetsToPromLabelSets(store.LabelSet()...)), mint, maxt), + MetricNameFilter: store.MetricNameFilter, + store: store, + desc: fmt.Sprintf("LabelSets: %v MinTime: %d MaxTime: %d", labelpb.PromLabelSetsToString(labelpb.ZLabelSetsToPromLabelSets(store.LabelSet()...)), mint, maxt), } } @@ -306,6 +309,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship } func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { + if storeTSDB == nil && t.storeTSDB != nil { + t.storeTSDB.Close() + } t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index ce4d391bf3b..8118e5b2957 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -71,6 +72,8 @@ type Client interface { // Addr returns address of the store client. If second parameter is true, the client // represents a local client (server-as-client) and has no remote address. Addr() (addr string, isLocalClient bool) + + filter.MetricNameFilter } // ProxyStore implements the store API that proxies request to all given underlying stores. @@ -406,6 +409,19 @@ func storeMatches(ctx context.Context, s Client, debugLogging bool, mint, maxt i } return false, reason } + + for _, m := range matchers { + if m.Type == labels.MatchEqual && m.Name == labels.MetricName { + if !s.MatchesMetricName(m.Value) { + if debugLogging { + reason = fmt.Sprintf("metric name %v does not match filter", m.Value) + } + return false, reason + } + break + } + } + return true, "" } diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 90874842d69..e6e08dd6033 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -6,12 +6,14 @@ package storetestutil import ( "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/storepb" ) type TestClient struct { storepb.StoreClient + filter.AllowAllMetricNameFilter Name string diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 7c1cc2441c3..73788aa46bc 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -11,9 +11,12 @@ import ( "sort" "strings" "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "google.golang.org/grpc" @@ -21,6 +24,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -38,6 +42,7 @@ type TSDBReader interface { // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { + filter.MetricNameFilter logger log.Logger db TSDBReader component component.StoreAPI @@ -46,6 +51,11 @@ type TSDBStore struct { extLset labels.Labels mtx sync.RWMutex + close func() +} + +func (s *TSDBStore) Close() { + s.close() } func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) { @@ -66,7 +76,8 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI if logger == nil { logger = log.NewNopLogger() } - return &TSDBStore{ + + st := &TSDBStore{ logger: logger, db: db, component: component, @@ -76,7 +87,40 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI b := make([]byte, 0, initialBufSize) return &b }}, + // NOTE(GiedriusS): about 1MB on 64bit machines. + MetricNameFilter: filter.NewCuckooFilterMetricNameFilter(1000000), + } + + t := time.NewTicker(15 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) + updateMetricNames := func() { + vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{ + Label: model.MetricNameLabel, + Start: 0, + End: math.MaxInt64, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to update metric names", "err", err) + return + } + + st.MetricNameFilter.ResetAddMetricName(vals.Values...) } + st.close = cancel + updateMetricNames() + + go func() { + for { + select { + case <-t.C: + updateMetricNames() + case <-ctx.Done(): + return + } + } + }() + + return st } func (s *TSDBStore) SetExtLset(extLset labels.Labels) {