Skip to content

Commit

Permalink
Removed unused FilterConfig from BucketStore
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed May 4, 2022
1 parent b9de3d1 commit 4f5246b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 120 deletions.
48 changes: 0 additions & 48 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/store/hintspb"
Expand Down Expand Up @@ -82,11 +81,6 @@ const (
labelDecode = "decode"
)

// FilterConfig is a configuration, which Store uses for filtering metrics based on time.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

type BucketStoreStats struct {
// BlocksLoaded is the number of blocks currently loaded in the bucket store.
BlocksLoaded int
Expand Down Expand Up @@ -130,8 +124,6 @@ type BucketStore struct {
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

filterConfig *FilterConfig

// Threadpool for performing operations that block the OS thread (mmap page faults)
threadPool *mimir_indexheader.Threadpool

Expand Down Expand Up @@ -210,13 +202,6 @@ func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption {
}
}

// WithFilterConfig sets a filter which Store uses for filtering metrics based on time.
func WithFilterConfig(filter *FilterConfig) BucketStoreOption {
return func(s *BucketStore) {
s.filterConfig = filter
}
}

// WithDebugLogging enables debug logging.
func WithDebugLogging() BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -521,40 +506,9 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
}
}

mint = s.limitMinTime(mint)
maxt = s.limitMaxTime(maxt)

return mint, maxt
}

func (s *BucketStore) limitMinTime(mint int64) int64 {
if s.filterConfig == nil {
return mint
}

filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()

if mint < filterMinTime {
return filterMinTime
}

return mint
}

func (s *BucketStore) limitMaxTime(maxt int64) int64 {
if s.filterConfig == nil {
return maxt
}

filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()

if maxt > filterMaxTime {
maxt = filterMaxTime
}

return maxt
}

type seriesEntry struct {
lset labels.Labels
refs []chunks.ChunkRef
Expand Down Expand Up @@ -950,8 +904,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
req.MinTime = s.limitMinTime(req.MinTime)
req.MaxTime = s.limitMaxTime(req.MaxTime)

// Check if matchers include the query shard selector.
shardSelector, matchers, err := sharding.RemoveShardFromMatchers(matchers)
Expand Down
83 changes: 12 additions & 71 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ import (
)

var (
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
allowAllFilterConf = &FilterConfig{
MinTime: minTimeDuration,
MaxTime: maxTimeDuration,
}
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
)

type swappableCache struct {
Expand Down Expand Up @@ -136,7 +132,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand All @@ -147,10 +143,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
labels.FromStrings("a", "2", "c", "1"),
labels.FromStrings("a", "2", "c", "2"),
}
return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, relabelConfig, filterConf, series)
return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, relabelConfig, series)
}

func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, series []labels.Labels) *storeSuite {
func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, series []labels.Labels) *storeSuite {
extLset := labels.FromStrings("ext1", "value1")

minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset)
Expand All @@ -163,7 +159,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
}

metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
})
assert.NoError(t, err)
Expand All @@ -186,7 +181,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
NewBucketStoreMetrics(nil),
WithLogger(s.logger),
WithIndexCache(s.cache),
WithFilterConfig(filterConf),
)
assert.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -425,7 +419,7 @@ func TestBucketStore_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig)

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -478,7 +472,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig)

indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand All @@ -491,59 +485,6 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
})
}

func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir := t.TempDir()

hourAfter := time.Now().Add(1 * time.Hour)
filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}

// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
assert.NoError(t, s.store.SyncBlocks(ctx))

mint, maxt := s.store.TimeRange()
assert.Equal(t, s.minTime, mint)
assert.Equal(t, filterMaxTime.PrometheusTimestamp(), maxt)

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)),
}

expectedLabels := [][]labelpb.ZLabel{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}},
}

s.cache.SwapWith(noopCache{})
srv := newBucketStoreSeriesServer(ctx)

assert.NoError(t, s.store.Series(req, srv))
assert.Equal(t, len(expectedLabels), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
assert.Equal(t, expectedLabels[i], s.Labels)

// prepareTestBlocks makes 3 chunks containing 2 hour data,
// we should only get 1, as we are filtering by time.
assert.Equal(t, 1, len(s.Chunks))
}
}

func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
// The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks.
expectedChunks := uint64(2 * 6)
Expand Down Expand Up @@ -583,7 +524,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig)
assert.NoError(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -618,7 +559,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -718,7 +659,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func BenchmarkBucketStoreLabelValues(tb *testing.B) {
series := generateSeries(card)
tb.Logf("Total %d series generated", len(series))

s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf, series)
s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, series)
mint, maxt := s.store.TimeRange()
assert.Equal(tb, s.minTime, mint)
assert.Equal(tb, s.maxTime, maxt)
Expand Down

0 comments on commit 4f5246b

Please sign in to comment.