Skip to content

Commit

Permalink
Store: remove stringset
Browse files Browse the repository at this point in the history
This is the wrong approach to detect if we need to resort. It cannot
detect if we might end up with an unsorted series set if we add
extLabels.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Sep 11, 2023
1 parent b1bba54 commit d54de93
Show file tree
Hide file tree
Showing 15 changed files with 3 additions and 331 deletions.
15 changes: 0 additions & 15 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,21 +366,6 @@ func runReceive(
grpcserver.WithTLSConfig(tlsCfg),
)

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

dbs.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

g.Add(
func() error {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress)
Expand Down
15 changes: 0 additions & 15 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,21 +642,6 @@ func runRule(
)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

tsdbStore.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}

options = append(options, grpcserver.WithServer(
Expand Down
47 changes: 3 additions & 44 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -113,9 +112,8 @@ func runSidecar(
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
labelNamesSet: stringset.AllStrings(),
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
}

confContentYaml, err := conf.objStore.Content()
Expand Down Expand Up @@ -239,19 +237,6 @@ func runSidecar(
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

m.UpdateLabelNames(context.Background())

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -264,7 +249,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -434,8 +419,6 @@ type promMetadata struct {
limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client

labelNamesSet stringset.Set
}

func (s *promMetadata) UpdateLabels(ctx context.Context) error {
Expand Down Expand Up @@ -463,30 +446,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labelNamesSet = stringset.AllStrings()
return
}

filter := stringset.NewFromStrings(labelNames...)
s.mtx.Lock()
s.labelNamesSet = filter
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesSet() stringset.Set {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labelNamesSet
}

func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
17 changes: 0 additions & 17 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,23 +501,6 @@ func runStore(
})
}

{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

bs.UpdateLabelNames()

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

}
// Add bucket UI for loaded blocks.
{
ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,12 @@ require (

require (
github.com/onsi/gomega v1.27.10
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
go.opentelemetry.io/contrib/propagators/autoprop v0.38.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
)

require (
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
Expand Down Expand Up @@ -851,8 +849,6 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 h1:a9hSJdJcd16e0HoMsnFvaHvxB3pxSD+SC7+CISp7xY0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down
13 changes: 0 additions & 13 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab
return initialLset, nil
}

func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) {
t.mtx.RLock()
defer t.mtx.RUnlock()

for _, tenant := range t.tenants {
db := tenant.storeTSDB
if db == nil {
continue
}
db.UpdateLabelNames(ctx)
}
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
Expand Down
2 changes: 0 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -827,7 +826,6 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return extLset },
func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) },
func() stringset.Set { return stringset.AllStrings() },
func() string { return version })
testutil.Ok(tt, err)

Expand Down
37 changes: 0 additions & 37 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -387,9 +386,6 @@ type BucketStore struct {

enabledLazyExpandedPostings bool

bmtx sync.Mutex
labelNamesSet stringset.Set

blockEstimatedMaxSeriesFunc BlockEstimator
blockEstimatedMaxChunkFunc BlockEstimator
}
Expand Down Expand Up @@ -543,7 +539,6 @@ func NewBucketStore(
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
seriesBatchSize: SeriesBatchSize,
labelNamesSet: stringset.AllStrings(),
}

for _, option := range options {
Expand Down Expand Up @@ -1790,38 +1785,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}, nil
}

func (s *BucketStore) UpdateLabelNames() {
s.mtx.RLock()
defer s.mtx.RUnlock()

newSet := stringset.New()
for _, b := range s.blocks {
labelNames, err := b.indexHeaderReader.LabelNames()
if err != nil {
level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error())
s.updateLabelNamesSet(stringset.AllStrings())
return
}
for _, l := range labelNames {
newSet.Insert(l)
}
}
s.updateLabelNamesSet(newSet)
}

func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) {
s.bmtx.Lock()
s.labelNamesSet = newSet
s.bmtx.Unlock()
}

func (b *BucketStore) LabelNamesSet() stringset.Set {
b.bmtx.Lock()
defer b.bmtx.Unlock()

return b.labelNamesSet
}

func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) {
// We filter external labels from matchers so we won't try to match series on them.
var result []*labels.Matcher
Expand Down
24 changes: 0 additions & 24 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,30 +779,6 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
})
}

func TestBucketStore_LabelNamesSet_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

s.store.UpdateLabelNames()
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

filter := s.store.LabelNamesSet()
for _, n := range []string{"a", "b", "c"} {
testutil.Assert(t, filter.Has(n), "expected filter to have %s", n)
}
testutil.Equals(t, 3, filter.Count())
})
}

func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) {
cases := map[string]struct {
maxSeriesLimit uint64
Expand Down
5 changes: 0 additions & 5 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -1661,7 +1660,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
chunksLimiterFactory: NewChunksLimiterFactory(0),
seriesLimiterFactory: NewSeriesLimiterFactory(0),
bytesLimiterFactory: NewBytesLimiterFactory(0),
labelNamesSet: stringset.AllStrings(),
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down Expand Up @@ -3471,9 +3469,6 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) {

testutil.Ok(t, bucketStore.SyncBlocks(context.Background()))

// make sure to have updated inner label names
bucketStore.UpdateLabelNames()

srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{
WithoutReplicaLabels: []string{"replica"},
Expand Down
4 changes: 0 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -54,7 +53,6 @@ type PrometheusStore struct {
buffers sync.Pool
component component.StoreAPI
externalLabelsFn func() labels.Labels
labelNamesSet func() stringset.Set

promVersion func() string
timestamps func() (mint int64, maxt int64)
Expand All @@ -81,7 +79,6 @@ func NewPrometheusStore(
component component.StoreAPI,
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
labelNamesSet func() stringset.Set,
promVersion func() string,
) (*PrometheusStore, error) {
if logger == nil {
Expand All @@ -95,7 +92,6 @@ func NewPrometheusStore(
externalLabelsFn: externalLabelsFn,
promVersion: promVersion,
timestamps: timestamps,
labelNamesSet: labelNamesSet,
remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
Expand Down
Loading

0 comments on commit d54de93

Please sign in to comment.