Skip to content

Commit

Permalink
receive/multitsdb: add cuckoo filter on metric names
Browse files Browse the repository at this point in the history
Signed-off-by: Mindaugas Niaura <mindaugas.niaura@vinted.com>
  • Loading branch information
niaurys committed Sep 27, 2024
1 parent 6ff5e1b commit 5eb98d6
Show file tree
Hide file tree
Showing 18 changed files with 190 additions and 16 deletions.
18 changes: 17 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ import (
"github.com/thanos-io/thanos/pkg/tls"
)

const compressionNone = "none"
const (
compressionNone = "none"
metricNamesFilter = "metric-names-filter"
)

func registerReceive(app *extkingpin.App) {
cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.")
Expand Down Expand Up @@ -136,6 +139,14 @@ func runReceive(

level.Info(logger).Log("mode", receiveMode, "msg", "running receive")

var metricNameFilterEnabled bool
for _, feature := range *conf.featureList {
if feature == metricNamesFilter {
metricNameFilterEnabled = true
level.Info(logger).Log("msg", "metric name filter feature enabled")
}
}

rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA)
if err != nil {
return err
Expand Down Expand Up @@ -215,6 +226,7 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
metricNameFilterEnabled,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
Expand Down Expand Up @@ -845,6 +857,8 @@ type receiveConfig struct {
limitsConfigReloadTimer time.Duration

asyncForwardWorkerCount uint

featureList *[]string
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -985,6 +999,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
cmd.Flag("receive.limits-config-reload-timer", "Minimum amount of time to pass for the limit configuration to be reloaded. Helps to avoid excessive reloads.").
Default("1s").Hidden().DurationVar(&rc.limitsConfigReloadTimer)

rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func runRule(
}
infoOptions := []info.ServerOptionFunc{info.WithRulesInfoFunc()}
if tsdbDB != nil {
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset)
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset, false)
infoOptions = append(
infoOptions,
info.WithLabelSetFunc(func() []*labelpb.LabelSet {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ require (
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.34.2
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771
go.opentelemetry.io/contrib/propagators/autoprop v0.54.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
)

require github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect

require (
cloud.google.com/go/auth v0.5.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.117.0 h1:WVlTe09melDYTd7VCVyvHcNWbgB+uI1O115+5LOtdSw=
github.com/digitalocean/godo v1.117.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
Expand Down Expand Up @@ -1321,6 +1323,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybL
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27 h1:yGAraK1uUjlhSXgNMIy8o/J4LFNcy7yeipBqt9N9mVg=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.27/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4=
github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY=
github.com/sercand/kuberesolver/v5 v5.1.1/go.mod h1:Fs1KbKhVRnB2aDWN12NjKCB+RgYMWZJ294T3BtmVCpQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down Expand Up @@ -2184,6 +2188,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels())),
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, labels.EmptyLabels(), false)),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/filter/cuckoo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package filter

import (
"sync"

cuckoo "github.com/seiflotfy/cuckoofilter"
)

type CuckooFilterMetricNameFilter struct {
filter *cuckoo.Filter
mtx sync.RWMutex
}

func NewCuckooFilterMetricNameFilter(capacity uint) *CuckooFilterMetricNameFilter {
return &CuckooFilterMetricNameFilter{
filter: cuckoo.NewFilter(capacity),
}
}

func (f *CuckooFilterMetricNameFilter) MatchesMetricName(metricName string) bool {
f.mtx.RLock()
defer f.mtx.RUnlock()
return f.filter.Lookup([]byte(metricName))
}

func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.filter.Reset()
for _, metricName := range metricNames {
f.filter.Insert([]byte(metricName))
}
}
14 changes: 14 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package filter

type MetricNameFilter interface {
MatchesMetricName(metricName string) bool
ResetAddMetricName(metricNames ...string)
}

type AllowAllMetricNameFilter struct{}

func (f AllowAllMetricNameFilter) MatchesMetricName(metricName string) bool {
return true
}

func (f AllowAllMetricNameFilter) ResetAddMetricName(metricNames ...string) {}
6 changes: 5 additions & 1 deletion pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ type endpointRef struct {
logger log.Logger
}

// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
// newendpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
// The call to newEndpointRef will return an error if establishing the channel fails.
func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) {
var dialOpts []grpc.DialOption
Expand Down Expand Up @@ -815,6 +815,10 @@ func (er *endpointRef) apisPresent() []string {
return apisPresent
}

func (er *endpointRef) MatchesMetricName(name string) bool {
return true
}

type endpointMetadata struct {
*infopb.InfoResponse
}
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m, &WriterOptions{})
Expand Down
13 changes: 12 additions & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type MultiTSDB struct {

exemplarClients map[string]*exemplars.TSDB
exemplarClientsNeedUpdate bool

metricNameFilterEnabled bool
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -84,6 +86,7 @@ func NewMultiTSDB(
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
hashFunc metadata.HashFunc,
metricNameFilterEnabled bool,
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
Expand All @@ -97,6 +100,7 @@ func NewMultiTSDB(
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
metricNameFilterEnabled: metricNameFilterEnabled,
tsdbClientsNeedUpdate: true,
exemplarClientsNeedUpdate: true,
tenantLabelName: tenantLabelName,
Expand Down Expand Up @@ -179,6 +183,10 @@ func newLocalClient(store *store.TSDBStore) *localClient {
}
}

func (l *localClient) MatchesMetricName(metricName string) bool {
return l.store.MatchesMetricName(metricName)
}

func (l *localClient) LabelSets() []labels.Labels {
return labelpb.LabelpbLabelSetsToPromLabels(l.store.LabelSet()...)
}
Expand Down Expand Up @@ -302,6 +310,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) {
if storeTSDB == nil && t.storeTSDB != nil {
t.storeTSDB.Close()
}
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
Expand Down Expand Up @@ -751,7 +762,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset))
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, t.metricNameFilterEnabled), s, ship, exemplars.NewTSDB(s, lset))
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -141,6 +142,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -184,6 +186,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -451,6 +454,7 @@ func TestMultiTSDBPrune(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -524,6 +528,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -585,6 +590,7 @@ func TestAlignedHeadFlush(t *testing.T) {
test.bucket,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -659,6 +665,7 @@ func TestMultiTSDBStats(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -688,6 +695,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -729,6 +737,7 @@ func TestProxyLabelValues(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(t, m.Close()) }()

Expand Down Expand Up @@ -819,6 +828,7 @@ func BenchmarkMultiTSDB(b *testing.B) {
nil,
false,
metadata.NoneFunc,
false,
)
defer func() { testutil.Ok(b, m.Close()) }()

Expand Down
1 change: 1 addition & 0 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB {
bucket,
false,
metadata.NoneFunc,
false,
)

return m
Expand Down
2 changes: 2 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func TestWriter(t *testing.T) {
nil,
false,
metadata.NoneFunc,
false,
)
t.Cleanup(func() { testutil.Ok(t, m.Close()) })

Expand Down Expand Up @@ -436,6 +437,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
nil,
false,
metadata.NoneFunc,
false,
)
b.Cleanup(func() { testutil.Ok(b, m.Close()) })

Expand Down
4 changes: 2 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func TestTSDBStore_Acceptance(t *testing.T) {
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)
return NewTSDBStore(nil, db, component.Rule, extLset, false)
}

testStoreAPIsAcceptance(t, startStore)
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)
return NewTSDBStore(nil, db, component.Rule, extLset, false)

}

Expand Down
13 changes: 13 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Client interface {
// Addr returns address of the store client. If second parameter is true, the client
// represents a local client (server-as-client) and has no remote address.
Addr() (addr string, isLocalClient bool)

// MatchesMetricName returns true if the metric name is allowed in the store.
MatchesMetricName(metricName string) bool
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
Expand Down Expand Up @@ -594,6 +597,16 @@ func storeMatches(ctx context.Context, s Client, mint, maxt int64, matchers ...*
if !LabelSetsMatch(matchers, extLset...) {
return false, fmt.Sprintf("external labels %v does not match request label matchers: %v", extLset, matchers)
}

for _, m := range matchers {
if m.Type == labels.MatchEqual && m.Name == labels.MetricName {
if !s.MatchesMetricName(m.Value) {
return false, fmt.Sprintf("metric name %v does not match filter", m.Value)
}
break
}
}

return true, ""
}

Expand Down
1 change: 1 addition & 0 deletions pkg/store/storepb/testutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ func (c TestClient) SupportsSharding() bool { return c.Shardable }
func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled }
func (c TestClient) String() string { return c.Name }
func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore }
func (c TestClient) MatchesMetricName(_ string) bool { return true }
Loading

0 comments on commit 5eb98d6

Please sign in to comment.