Skip to content

Commit

Permalink
Merge pull request #112 from vinted/cuckoofilter_thanos
Browse files Browse the repository at this point in the history
receive/multitsdb: add cuckoo filter on metric names
  • Loading branch information
GiedriusS authored Jun 6, 2024
2 parents d702771 + f912c5a commit 681b7e1
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .bingo/gotesplit.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/Songmu/gotesplit v0.2.1 h1:qJFvR75nJpeKyMQFwyDtFrcc6zDWhrHAkks7DvM8oLo=
github.com/Songmu/gotesplit v0.2.1/go.mod h1:sVBfmLT26b1H5VhUpq8cRhCVK75GAmW9c8r2NiK0gzk=
github.com/jstemmer/go-junit-report v1.0.0 h1:8X1gzZpR+nVQLAht+L/foqOeX2l9DTZoaIPbEQHxsds=
github.com/jstemmer/go-junit-report v1.0.0/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ require (
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.7.0
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
go.opentelemetry.io/contrib/propagators/autoprop v0.38.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
)

require (
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/go-openapi/runtime v0.26.0 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.106.0 h1:m5iErwl3xHovGFlawd50n54ntgXHt1BLsvU6BXsVxEU=
github.com/digitalocean/godo v1.106.0/go.mod h1:R6EmmWI8CT1+fCtjWY9UCB+L5uufuZH13wk3YhxycCs=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
Expand Down Expand Up @@ -918,6 +920,8 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrY
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21 h1:yWfiTPwYxB0l5fGMhl/G+liULugVIHD9AU77iNLrURQ=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.21/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down
49 changes: 49 additions & 0 deletions pkg/filter/cuckoo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package filter

import (
"sync"

cuckoo "github.com/seiflotfy/cuckoofilter"
)

type MetricNameFilter interface {
MatchesMetricName(metricName string) bool
ResetAddMetricName(metricNames ...string)
}

type AllowAllMetricNameFilter struct{}

func (f AllowAllMetricNameFilter) MatchesMetricName(metricName string) bool {
return true
}

func (f AllowAllMetricNameFilter) ResetAddMetricName(metricNames ...string) {}

type CuckooFilterMetricNameFilter struct {
filter *cuckoo.Filter
mtx sync.RWMutex
}

func NewCuckooFilterMetricNameFilter(capacity uint) *CuckooFilterMetricNameFilter {
return &CuckooFilterMetricNameFilter{
filter: cuckoo.NewFilter(capacity),
}
}

func (f *CuckooFilterMetricNameFilter) MatchesMetricName(metricName string) bool {
f.mtx.RLock()
defer f.mtx.RUnlock()
return f.filter.Lookup([]byte(metricName))
}

func (f *CuckooFilterMetricNameFilter) ResetAddMetricName(metricNames ...string) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.filter.Reset()
for _, metricName := range metricNames {
f.filter.Insert([]byte(metricName))
}
}
2 changes: 2 additions & 0 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"unicode/utf8"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/filter"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -636,6 +637,7 @@ func (e *EndpointSet) GetEndpointStatus() []EndpointStatus {

type endpointRef struct {
storepb.StoreClient
filter.AllowAllMetricNameFilter

mtx sync.RWMutex
cc *grpc.ClientConn
Expand Down
2 changes: 2 additions & 0 deletions pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/filter"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -169,6 +170,7 @@ func NewStoreSet(

type storeRef struct {
storepb.StoreClient
filter.AllowAllMetricNameFilter

mtx sync.RWMutex
cc *grpc.ClientConn
Expand Down
10 changes: 8 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/filter"
"github.com/thanos-io/thanos/pkg/info/infopb"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -103,14 +104,16 @@ func NewMultiTSDB(
type localClient struct {
store *store.TSDBStore
desc string
filter.MetricNameFilter
}

func newLocalClient(store *store.TSDBStore) *localClient {
mint, maxt := store.TimeRange()

return &localClient{
store: store,
desc: fmt.Sprintf("LabelSets: %v MinTime: %d MaxTime: %d", labelpb.PromLabelSetsToString(labelpb.ZLabelSetsToPromLabelSets(store.LabelSet()...)), mint, maxt),
MetricNameFilter: store.MetricNameFilter,
store: store,
desc: fmt.Sprintf("LabelSets: %v MinTime: %d MaxTime: %d", labelpb.PromLabelSetsToString(labelpb.ZLabelSetsToPromLabelSets(store.LabelSet()...)), mint, maxt),
}
}

Expand Down Expand Up @@ -306,6 +309,9 @@ func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *ship
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) {
if storeTSDB == nil && t.storeTSDB != nil {
t.storeTSDB.Close()
}
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
Expand Down
16 changes: 16 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/filter"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -71,6 +72,8 @@ type Client interface {
// Addr returns address of the store client. If second parameter is true, the client
// represents a local client (server-as-client) and has no remote address.
Addr() (addr string, isLocalClient bool)

filter.MetricNameFilter
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
Expand Down Expand Up @@ -406,6 +409,19 @@ func storeMatches(ctx context.Context, s Client, debugLogging bool, mint, maxt i
}
return false, reason
}

for _, m := range matchers {
if m.Type == labels.MatchEqual && m.Name == labels.MetricName {
if !s.MatchesMetricName(m.Value) {
if debugLogging {
reason = fmt.Sprintf("metric name %v does not match filter", m.Value)
}
return false, reason
}
break
}
}

return true, ""
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/store/storepb/testutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package storetestutil
import (
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/thanos/pkg/filter"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

type TestClient struct {
storepb.StoreClient
filter.AllowAllMetricNameFilter

Name string

Expand Down
46 changes: 45 additions & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/filter"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -38,6 +42,7 @@ type TSDBReader interface {
// It attaches the provided external labels to all results. It only responds with raw data
// and does not support downsampling.
type TSDBStore struct {
filter.MetricNameFilter
logger log.Logger
db TSDBReader
component component.StoreAPI
Expand All @@ -46,6 +51,11 @@ type TSDBStore struct {

extLset labels.Labels
mtx sync.RWMutex
close func()
}

func (s *TSDBStore) Close() {
s.close()
}

func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) {
Expand All @@ -66,7 +76,8 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
if logger == nil {
logger = log.NewNopLogger()
}
return &TSDBStore{

st := &TSDBStore{
logger: logger,
db: db,
component: component,
Expand All @@ -76,7 +87,40 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
b := make([]byte, 0, initialBufSize)
return &b
}},
// NOTE(GiedriusS): about 1MB on 64bit machines.
MetricNameFilter: filter.NewCuckooFilterMetricNameFilter(1000000),
}

t := time.NewTicker(15 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
updateMetricNames := func() {
vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{
Label: model.MetricNameLabel,
Start: 0,
End: math.MaxInt64,
})
if err != nil {
level.Error(logger).Log("msg", "failed to update metric names", "err", err)
return
}

st.MetricNameFilter.ResetAddMetricName(vals.Values...)
}
st.close = cancel
updateMetricNames()

go func() {
for {
select {
case <-t.C:
updateMetricNames()
case <-ctx.Done():
return
}
}
}()

return st
}

func (s *TSDBStore) SetExtLset(extLset labels.Labels) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,7 @@ func checkNetworkRequests(t *testing.T, addr string) {
ctx, cancel := chromedp.NewContext(new(emptyCtx))
t.Cleanup(cancel)

testutil.Ok(t, runutil.Retry(1*time.Minute, ctx.Done(), func() error {
testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stderr), 1*time.Second, ctx.Done(), func() error {
var networkErrors []string

// Listen for failed network requests and push them to an array.
Expand Down

0 comments on commit 681b7e1

Please sign in to comment.