From 5eb98d63487d76951733492336eac632ae7a8d87 Mon Sep 17 00:00:00 2001 From: Mindaugas Niaura Date: Fri, 27 Sep 2024 15:50:39 +0300 Subject: [PATCH] receive/multitsdb: add cuckoo filter on metric names Signed-off-by: Mindaugas Niaura --- cmd/thanos/receive.go | 18 +++++++- cmd/thanos/rule.go | 2 +- go.mod | 3 ++ go.sum | 5 ++ pkg/api/query/v1_test.go | 2 +- pkg/filter/cuckoo.go | 33 ++++++++++++++ pkg/filter/filter.go | 14 ++++++ pkg/query/endpointset.go | 6 ++- pkg/receive/handler_test.go | 1 + pkg/receive/multitsdb.go | 13 +++++- pkg/receive/multitsdb_test.go | 10 ++++ pkg/receive/receive_test.go | 1 + pkg/receive/writer_test.go | 2 + pkg/store/acceptance_test.go | 4 +- pkg/store/proxy.go | 13 ++++++ pkg/store/storepb/testutil/client.go | 1 + pkg/store/tsdb.go | 68 ++++++++++++++++++++++++++-- pkg/store/tsdb_test.go | 10 ++-- 18 files changed, 190 insertions(+), 16 deletions(-) create mode 100644 pkg/filter/cuckoo.go create mode 100644 pkg/filter/filter.go diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c517cf8aa52..f2f01c866ce 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -53,7 +53,10 @@ import ( "github.com/thanos-io/thanos/pkg/tls" ) -const compressionNone = "none" +const ( + compressionNone = "none" + metricNamesFilter = "metric-names-filter" +) func registerReceive(app *extkingpin.App) { cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.") @@ -136,6 +139,14 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") + var metricNameFilterEnabled bool + for _, feature := range *conf.featureList { + if feature == metricNamesFilter { + metricNameFilterEnabled = true + level.Info(logger).Log("msg", "metric name filter feature enabled") + } + } + rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) if err != nil { return err @@ -215,6 +226,7 @@ func runReceive( bkt, conf.allowOutOfOrderUpload, hashFunc, + metricNameFilterEnabled, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{ Intern: conf.writerInterning, @@ -845,6 +857,8 @@ type receiveConfig struct { limitsConfigReloadTimer time.Duration asyncForwardWorkerCount uint + + featureList *[]string } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -985,6 +999,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads."). Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer) + + rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 7f418361ab7..0ee996ae26f 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -736,7 +736,7 @@ func runRule( } infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()} if tsdbDB != nil { - tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset) + tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset, false) infoOptions = append( infoOptions, info.WithLabelSetFunc(func() []*labelpb.LabelSet { diff --git a/go.mod b/go.mod index f68330a5f54..b4c4fb553dd 100644 --- a/go.mod +++ b/go.mod @@ -120,10 +120,13 @@ require ( github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.34.2 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 + github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 go.opentelemetry.io/contrib/propagators/autoprop v0.54.0 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 ) +require github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect + require ( cloud.google.com/go/auth v0.5.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect diff --git a/go.sum b/go.sum index 82164dde441..80d49717550 100644 --- a/go.sum +++ b/go.sum @@ -699,6 +699,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.117.0 h1:WVlTe09melDYTd7VCVyvHcNWbgB+uI1O115+5LOtdSw= github.com/digitalocean/godo v1.117.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= @@ -1321,6 +1323,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27 h1:yGAraK1uUjlhSXgNMIy8o/J4LFNcy7yeipBqt9N9mVg= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY= github.com/sercand/kuberesolver/v5 v5.1.1/go.mod h1:Fs1KbKhVRnB2aDWN12NjKCB+RgYMWZJ294T3BtmVCpQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= @@ -2184,6 +2188,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 71433530d21..3ff3a686893 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -659,7 +659,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) { func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore { c := &storetestutil.TestClient{ Name: "1", - StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels())), + StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels(), false)), MinTime: math.MinInt64, MaxTime: math.MaxInt64, } diff --git a/pkg/filter/cuckoo.go b/pkg/filter/cuckoo.go new file mode 100644 index 00000000000..e6879d14de5 --- /dev/null +++ b/pkg/filter/cuckoo.go @@ -0,0 +1,33 @@ +package filter + +import ( + "sync" + + cuckoo "github.com/seiflotfy/cuckoofilter" +) + +type CuckooFilterMetricNameFilter struct { + filter *cuckoo.Filter + mtx sync.RWMutex +} + +func NewCuckooFilterMetricNameFilter(capacity uint) *CuckooFilterMetricNameFilter { + return &CuckooFilterMetricNameFilter{ + filter: cuckoo.NewFilter(capacity), + } +} + +func (f *CuckooFilterMetricNameFilter) MatchesMetricName(metricName string) bool { + f.mtx.RLock() + defer f.mtx.RUnlock() + return f.filter.Lookup([]byte(metricName)) +} + +func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) { + f.mtx.Lock() + defer f.mtx.Unlock() + f.filter.Reset() + for _, metricName := range metricNames { + f.filter.Insert([]byte(metricName)) + } +} diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go new file mode 100644 index 00000000000..ec73dffc8db --- /dev/null +++ b/pkg/filter/filter.go @@ -0,0 +1,14 @@ +package filter + +type MetricNameFilter interface { + MatchesMetricName(metricName string) bool + ResetAddMetricName(metricNames ...string) +} + +type AllowAllMetricNameFilter struct{} + +func (f AllowAllMetricNameFilter) MatchesMetricName(metricName string) bool { + return true +} + +func (f AllowAllMetricNameFilter) ResetAddMetricName(metricNames ...string) {} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 5d6040d1546..a97d3118e53 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -568,7 +568,7 @@ type endpointRef struct { logger log.Logger } -// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. +// newendpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { var dialOpts []grpc.DialOption @@ -815,6 +815,10 @@ func (er *endpointRef) apisPresent() []string { return apisPresent } +func (er *endpointRef) MatchesMetricName(name string) bool { + return true +} + type endpointMetadata struct { *infopb.InfoResponse } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index b931a25bae1..46331e09694 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -999,6 +999,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(b, m.Close()) }() handler.writer = NewWriter(logger, m, &WriterOptions{}) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 529d3137f31..8df7463868f 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -70,6 +70,8 @@ type MultiTSDB struct { exemplarClients map[string]*exemplars.TSDB exemplarClientsNeedUpdate bool + + metricNameFilterEnabled bool } // NewMultiTSDB creates new MultiTSDB. @@ -84,6 +86,7 @@ func NewMultiTSDB( bucket objstore.Bucket, allowOutOfOrderUpload bool, hashFunc metadata.HashFunc, + metricNameFilterEnabled bool, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() @@ -97,6 +100,7 @@ func NewMultiTSDB( mtx: &sync.RWMutex{}, tenants: map[string]*tenant{}, labels: labels, + metricNameFilterEnabled: metricNameFilterEnabled, tsdbClientsNeedUpdate: true, exemplarClientsNeedUpdate: true, tenantLabelName: tenantLabelName, @@ -179,6 +183,10 @@ func newLocalClient(store *store.TSDBStore) *localClient { } } +func (l *localClient) MatchesMetricName(metricName string) bool { + return l.store.MatchesMetricName(metricName) +} + func (l *localClient) LabelSets() []labels.Labels { return labelpb.LabelpbLabelSetsToPromLabels(l.store.LabelSet()...) } @@ -302,6 +310,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship } func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { + if storeTSDB == nil && t.storeTSDB != nil { + t.storeTSDB.Close() + } t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB @@ -751,7 +762,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset)) + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, t.metricNameFilterEnabled), s, ship, exemplars.NewTSDB(s, lset)) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 5c2b760180e..5dc3902b0f5 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -57,6 +57,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -141,6 +142,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -184,6 +186,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -451,6 +454,7 @@ func TestMultiTSDBPrune(t *testing.T) { test.bucket, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -524,6 +528,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { objstore.NewInMemBucket(), false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -585,6 +590,7 @@ func TestAlignedHeadFlush(t *testing.T) { test.bucket, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -659,6 +665,7 @@ func TestMultiTSDBStats(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -688,6 +695,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -729,6 +737,7 @@ func TestProxyLabelValues(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -819,6 +828,7 @@ func BenchmarkMultiTSDB(b *testing.B) { nil, false, metadata.NoneFunc, + false, ) defer func() { testutil.Ok(b, m.Close()) }() diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 943a51ce7d5..b8cbe87eb93 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -810,6 +810,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB { bucket, false, metadata.NoneFunc, + false, ) return m diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index ba6c6940dd6..f4dce51fc58 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -344,6 +344,7 @@ func TestWriter(t *testing.T) { nil, false, metadata.NoneFunc, + false, ) t.Cleanup(func() { testutil.Ok(t, m.Close()) }) @@ -436,6 +437,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr nil, false, metadata.NoneFunc, + false, ) b.Cleanup(func() { testutil.Ok(b, m.Close()) }) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index e7e15ca2b4d..d1e7bfd9918 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -1019,7 +1019,7 @@ func TestTSDBStore_Acceptance(t *testing.T) { tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) appendFn(db.Appender(context.Background())) - return NewTSDBStore(nil, db, component.Rule, extLset) + return NewTSDBStore(nil, db, component.Rule, extLset, false) } testStoreAPIsAcceptance(t, startStore) @@ -1173,7 +1173,7 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) { tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) appendFn(db.Appender(context.Background())) - return NewTSDBStore(nil, db, component.Rule, extLset) + return NewTSDBStore(nil, db, component.Rule, extLset, false) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 90ed0fb2c29..f805e307d77 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -71,6 +71,9 @@ type Client interface { // Addr returns address of the store client. If second parameter is true, the client // represents a local client (server-as-client) and has no remote address. Addr() (addr string, isLocalClient bool) + + // MatchesMetricName returns true if the metric name is allowed in the store. + MatchesMetricName(metricName string) bool } // ProxyStore implements the store API that proxies request to all given underlying stores. @@ -594,6 +597,16 @@ func storeMatches(ctx context.Context, s Client, mint, maxt int64, matchers ...* if !LabelSetsMatch(matchers, extLset...) { return false, fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers) } + + for _, m := range matchers { + if m.Type == labels.MatchEqual && m.Name == labels.MetricName { + if !s.MatchesMetricName(m.Value) { + return false, fmt.Sprintf("metric name %v does not match filter", m.Value) + } + break + } + } + return true, "" } diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 1c38079faf8..d8654c499fb 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -30,3 +30,4 @@ func (c TestClient) SupportsSharding() bool { return c.Shardable } func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } func (c TestClient) String() string { return c.Name } func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } +func (c TestClient) MatchesMetricName(_ string) bool { return true } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index ff47745bc95..e8806d2c8fc 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -11,9 +11,12 @@ import ( "sort" "strings" "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "google.golang.org/grpc" @@ -21,6 +24,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -44,11 +48,17 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - extLset labels.Labels - mtx sync.RWMutex + extLset labels.Labels + metricNameFilter filter.MetricNameFilter + mtx sync.RWMutex + close func() storepb.UnimplementedStoreServer } +func (s *TSDBStore) Close() { + s.close() +} + func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) { return func(s *grpc.Server) { storepb.RegisterWriteableStoreServer(s, storeSrv) @@ -63,21 +73,67 @@ type ReadWriteTSDBStore struct { // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { +func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, metricNameFilterEnabled bool) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } - return &TSDBStore{ + + var ( + metricNameFilter filter.MetricNameFilter + startMetricNamesUpdate bool + ) + + metricNameFilter = filter.AllowAllMetricNameFilter{} + if metricNameFilterEnabled { + startMetricNamesUpdate = true + metricNameFilter = filter.NewCuckooFilterMetricNameFilter(1000000) // about 1MB on 64bit machines. + } + + st := &TSDBStore{ logger: logger, db: db, component: component, extLset: extLset, + metricNameFilter: metricNameFilter, maxBytesPerFrame: RemoteReadFrameLimit, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b }}, } + + if startMetricNamesUpdate { + t := time.NewTicker(15 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) + updateMetricNames := func() { + vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{ + Label: model.MetricNameLabel, + Start: 0, + End: math.MaxInt64, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to update metric names", "err", err) + return + } + + st.metricNameFilter.ResetAddMetricName(vals.Values...) + } + st.close = cancel + updateMetricNames() + + go func() { + for { + select { + case <-t.C: + updateMetricNames() + case <-ctx.Done(): + return + } + } + }() + } + + return st } func (s *TSDBStore) SetExtLset(extLset labels.Labels) { @@ -136,6 +192,10 @@ func (s *TSDBStore) TimeRange() (int64, int64) { return minTime, math.MaxInt64 } +func (s *TSDBStore) MatchesMetricName(metricName string) bool { + return s.metricNameFilter.MatchesMetricName(metricName) +} + // CloseDelegator allows to delegate close (releasing resources used by request to the server). // This is useful when we invoke StoreAPI within another StoreAPI and results are ephemeral until copied. type CloseDelegator interface { diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 453fb1b97d7..6e2e9bcbcb3 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -39,7 +39,7 @@ func TestTSDBStore_Series_ChunkChecksum(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false) appender := db.Appender(context.Background()) @@ -79,7 +79,7 @@ func TestTSDBStore_Series(t *testing.T) { defer func() { testutil.Ok(t, db.Close()) }() testutil.Ok(t, err) - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west"), false) appender := db.Appender(context.Background()) @@ -251,7 +251,7 @@ func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) srv := storetestutil.NewSeriesServer(context.Background()) csrv := &delegatorServer{SeriesServer: srv} @@ -414,7 +414,7 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { }) extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) srv := storetestutil.NewSeriesServer(context.Background()) t.Run("call series and access results", func(t *testing.T) { @@ -555,7 +555,7 @@ func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { defer func() { testutil.Ok(t, db.Close()) }() extLabels := labels.FromStrings("ext", "1") - store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + store := NewTSDBStore(logger, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels, false) var expected []*storepb.Series for _, resp := range resps {