Skip to content

Commit

Permalink
Chore: remove unused code from BucketStore (#1816)
Browse files Browse the repository at this point in the history
* Removed unused Info() and advLabelSets from BucketStore

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Removed unused FilterConfig from BucketStore

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Removed unused relabelConfig from store-gateway tests

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Removed unused function expectedTouchedBlockOps()

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Removed unused recorder from BucketStore tests

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* go mod vendor

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored May 5, 2022
1 parent 286f204 commit e926900
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 665 deletions.
98 changes: 0 additions & 98 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/store/hintspb"
Expand Down Expand Up @@ -78,28 +76,11 @@ const (
chunkBytesPoolMinSize = 64 * 1024 // 64 KiB
chunkBytesPoolMaxSize = 64 * 1024 * 1024 // 64 MiB

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
// Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where
// Querier was blocking Store Gateway as duplicate with sidecar.
//
// Newer Queriers are not strict, no duplicated external labels check is there anymore.
// Additionally newer Queriers removes/ignore this exact labels from UI and querying.
//
// This label name is intentionally against Prometheus label style.
// TODO(bwplotka): Remove it at some point.
CompatibilityTypeLabelName = "@thanos_compatibility_store_type"

// Labels for metrics.
labelEncode = "encode"
labelDecode = "decode"
)

// FilterConfig is a configuration, which Store uses for filtering metrics based on time.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

type BucketStoreStats struct {
// BlocksLoaded is the number of blocks currently loaded in the bucket store.
BlocksLoaded int
Expand Down Expand Up @@ -143,10 +124,6 @@ type BucketStore struct {
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Threadpool for performing operations that block the OS thread (mmap page faults)
threadPool *mimir_indexheader.Threadpool

Expand Down Expand Up @@ -225,13 +202,6 @@ func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption {
}
}

// WithFilterConfig sets a filter which Store uses for filtering metrics based on time.
func WithFilterConfig(filter *FilterConfig) BucketStoreOption {
return func(s *BucketStore) {
s.filterConfig = filter
}
}

// WithDebugLogging enables debug logging.
func WithDebugLogging() BucketStoreOption {
return func(s *BucketStore) {
Expand All @@ -251,7 +221,6 @@ func NewBucketStore(
partitioner Partitioner,
threadPool *mimir_indexheader.Threadpool,
blockSyncConcurrency int,
enableCompatibilityLabel bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
lazyIndexReaderEnabled bool,
Expand All @@ -275,7 +244,6 @@ func NewBucketStore(
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
threadPool: threadPool,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
seriesHashCache: seriesHashCache,
Expand Down Expand Up @@ -375,18 +343,6 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
s.metrics.blockDrops.Inc()
}

// Sync advertise labels.
var storeLabels labels.Labels
s.mtx.Lock()
s.advLabelSets = make([]labelpb.ZLabelSet, 0, len(s.advLabelSets))
for _, bs := range s.blockSets {
storeLabels = storeLabels[:0]
s.advLabelSets = append(s.advLabelSets, labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(append(storeLabels, bs.labels...))})
}
sort.Slice(s.advLabelSets, func(i, j int) bool {
return s.advLabelSets[i].String() < s.advLabelSets[j].String()
})
s.mtx.Unlock()
return nil
}

Expand Down Expand Up @@ -550,61 +506,9 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
}
}

mint = s.limitMinTime(mint)
maxt = s.limitMaxTime(maxt)

return mint, maxt
}

// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
res := &storepb.InfoResponse{
StoreType: component.Store.ToProto(),
MinTime: mint,
MaxTime: maxt,
}

s.mtx.RLock()
res.LabelSets = s.advLabelSets
s.mtx.RUnlock()

if s.enableCompatibilityLabel && len(res.LabelSets) > 0 {
// This is for compatibility with Querier v0.7.0.
// See query.StoreCompatibilityTypeLabelName comment for details.
res.LabelSets = append(res.LabelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}})
}
return res, nil
}

func (s *BucketStore) limitMinTime(mint int64) int64 {
if s.filterConfig == nil {
return mint
}

filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()

if mint < filterMinTime {
return filterMinTime
}

return mint
}

func (s *BucketStore) limitMaxTime(maxt int64) int64 {
if s.filterConfig == nil {
return maxt
}

filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()

if maxt > filterMaxTime {
maxt = filterMaxTime
}

return maxt
}

type seriesEntry struct {
lset labels.Labels
refs []chunks.ChunkRef
Expand Down Expand Up @@ -1000,8 +904,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
req.MinTime = s.limitMinTime(req.MinTime)
req.MaxTime = s.limitMaxTime(req.MaxTime)

// Check if matchers include the query shard selector.
shardSelector, matchers, err := sharding.RemoveShardFromMatchers(matchers)
Expand Down
89 changes: 13 additions & 76 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/gogo/status"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/stretchr/testify/assert"
Expand All @@ -38,14 +37,10 @@ import (
)

var (
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
allowAllFilterConf = &FilterConfig{
MinTime: minTimeDuration,
MaxTime: maxTimeDuration,
}
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
)

type swappableCache struct {
Expand Down Expand Up @@ -136,7 +131,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand All @@ -147,10 +142,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
labels.FromStrings("a", "2", "c", "1"),
labels.FromStrings("a", "2", "c", "2"),
}
return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, relabelConfig, filterConf, series)
return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, series)
}

func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, series []labels.Labels) *storeSuite {
func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, series []labels.Labels) *storeSuite {
extLset := labels.FromStrings("ext1", "value1")

minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset)
Expand All @@ -162,10 +157,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
maxTime: maxTime,
}

metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
})
metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{})
assert.NoError(t, err)

store, err := NewBucketStore(
Expand All @@ -178,7 +170,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
true,
true,
Expand All @@ -187,7 +178,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
NewBucketStoreMetrics(nil),
WithLogger(s.logger),
WithIndexCache(s.cache),
WithFilterConfig(filterConf),
)
assert.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -426,7 +416,7 @@ func TestBucketStore_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -479,7 +469,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))

indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand All @@ -492,59 +482,6 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
})
}

func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir := t.TempDir()

hourAfter := time.Now().Add(1 * time.Hour)
filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}

// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
assert.NoError(t, s.store.SyncBlocks(ctx))

mint, maxt := s.store.TimeRange()
assert.Equal(t, s.minTime, mint)
assert.Equal(t, filterMaxTime.PrometheusTimestamp(), maxt)

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)),
}

expectedLabels := [][]labelpb.ZLabel{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}},
}

s.cache.SwapWith(noopCache{})
srv := newBucketStoreSeriesServer(ctx)

assert.NoError(t, s.store.Series(req, srv))
assert.Equal(t, len(expectedLabels), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
assert.Equal(t, expectedLabels[i], s.Labels)

// prepareTestBlocks makes 3 chunks containing 2 hour data,
// we should only get 1, as we are filtering by time.
assert.Equal(t, 1, len(s.Chunks))
}
}

func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
// The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks.
expectedChunks := uint64(2 * 6)
Expand Down Expand Up @@ -584,7 +521,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code))
assert.NoError(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -619,7 +556,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -719,7 +656,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down
1 change: 0 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
u.partitioner,
u.threadPool,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
u.cfg.BucketStore.PostingOffsetsInMemSampling,
true, // Enable series hints.
u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func BenchmarkBucketStoreLabelValues(tb *testing.B) {
series := generateSeries(card)
tb.Logf("Total %d series generated", len(series))

s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf, series)
s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), series)
mint, maxt := s.store.TimeRange()
assert.Equal(tb, s.minTime, mint)
assert.Equal(tb, s.maxTime, maxt)
Expand Down
Loading

0 comments on commit e926900

Please sign in to comment.