diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index b96323787d8..26153bd86de 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -43,9 +43,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" - "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/gate" - "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/store/hintspb" @@ -78,28 +76,11 @@ const ( chunkBytesPoolMinSize = 64 * 1024 // 64 KiB chunkBytesPoolMaxSize = 64 * 1024 * 1024 // 64 MiB - // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility - // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. - // Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where - // Querier was blocking Store Gateway as duplicate with sidecar. - // - // Newer Queriers are not strict, no duplicated external labels check is there anymore. - // Additionally newer Queriers removes/ignore this exact labels from UI and querying. - // - // This label name is intentionally against Prometheus label style. - // TODO(bwplotka): Remove it at some point. - CompatibilityTypeLabelName = "@thanos_compatibility_store_type" - // Labels for metrics. labelEncode = "encode" labelDecode = "decode" ) -// FilterConfig is a configuration, which Store uses for filtering metrics based on time. -type FilterConfig struct { - MinTime, MaxTime model.TimeOrDurationValue -} - type BucketStoreStats struct { // BlocksLoaded is the number of blocks currently loaded in the bucket store. BlocksLoaded int @@ -143,10 +124,6 @@ type BucketStore struct { seriesLimiterFactory SeriesLimiterFactory partitioner Partitioner - filterConfig *FilterConfig - advLabelSets []labelpb.ZLabelSet - enableCompatibilityLabel bool - // Threadpool for performing operations that block the OS thread (mmap page faults) threadPool *mimir_indexheader.Threadpool @@ -225,13 +202,6 @@ func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption { } } -// WithFilterConfig sets a filter which Store uses for filtering metrics based on time. -func WithFilterConfig(filter *FilterConfig) BucketStoreOption { - return func(s *BucketStore) { - s.filterConfig = filter - } -} - // WithDebugLogging enables debug logging. func WithDebugLogging() BucketStoreOption { return func(s *BucketStore) { @@ -251,7 +221,6 @@ func NewBucketStore( partitioner Partitioner, threadPool *mimir_indexheader.Threadpool, blockSyncConcurrency int, - enableCompatibilityLabel bool, postingOffsetsInMemSampling int, enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. lazyIndexReaderEnabled bool, @@ -275,7 +244,6 @@ func NewBucketStore( seriesLimiterFactory: seriesLimiterFactory, partitioner: partitioner, threadPool: threadPool, - enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, seriesHashCache: seriesHashCache, @@ -375,18 +343,6 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { s.metrics.blockDrops.Inc() } - // Sync advertise labels. - var storeLabels labels.Labels - s.mtx.Lock() - s.advLabelSets = make([]labelpb.ZLabelSet, 0, len(s.advLabelSets)) - for _, bs := range s.blockSets { - storeLabels = storeLabels[:0] - s.advLabelSets = append(s.advLabelSets, labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(append(storeLabels, bs.labels...))}) - } - sort.Slice(s.advLabelSets, func(i, j int) bool { - return s.advLabelSets[i].String() < s.advLabelSets[j].String() - }) - s.mtx.Unlock() return nil } @@ -550,61 +506,9 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { } } - mint = s.limitMinTime(mint) - maxt = s.limitMaxTime(maxt) - return mint, maxt } -// Info implements the storepb.StoreServer interface. -func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { - mint, maxt := s.TimeRange() - res := &storepb.InfoResponse{ - StoreType: component.Store.ToProto(), - MinTime: mint, - MaxTime: maxt, - } - - s.mtx.RLock() - res.LabelSets = s.advLabelSets - s.mtx.RUnlock() - - if s.enableCompatibilityLabel && len(res.LabelSets) > 0 { - // This is for compatibility with Querier v0.7.0. - // See query.StoreCompatibilityTypeLabelName comment for details. - res.LabelSets = append(res.LabelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}}) - } - return res, nil -} - -func (s *BucketStore) limitMinTime(mint int64) int64 { - if s.filterConfig == nil { - return mint - } - - filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp() - - if mint < filterMinTime { - return filterMinTime - } - - return mint -} - -func (s *BucketStore) limitMaxTime(maxt int64) int64 { - if s.filterConfig == nil { - return maxt - } - - filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp() - - if maxt > filterMaxTime { - maxt = filterMaxTime - } - - return maxt -} - type seriesEntry struct { lset labels.Labels refs []chunks.ChunkRef @@ -1000,8 +904,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } - req.MinTime = s.limitMinTime(req.MinTime) - req.MaxTime = s.limitMaxTime(req.MaxTime) // Check if matchers include the query shard selector. shardSelector, matchers, err := sharding.RemoveShardFromMatchers(matchers) diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 009dcb93b3b..b99964ada0c 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -18,7 +18,6 @@ import ( "github.com/gogo/status" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb/hashcache" "github.com/stretchr/testify/assert" @@ -38,14 +37,10 @@ import ( ) var ( - minTime = time.Unix(0, 0) - maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") - minTimeDuration = model.TimeOrDurationValue{Time: &minTime} - maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime} - allowAllFilterConf = &FilterConfig{ - MinTime: minTimeDuration, - MaxTime: maxTimeDuration, - } + minTime = time.Unix(0, 0) + maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + minTimeDuration = model.TimeOrDurationValue{Time: &minTime} + maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime} ) type swappableCache struct { @@ -136,7 +131,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF } } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -147,10 +142,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m labels.FromStrings("a", "2", "c", "1"), labels.FromStrings("a", "2", "c", "2"), } - return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, relabelConfig, filterConf, series) + return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, series) } -func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, series []labels.Labels) *storeSuite { +func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, series []labels.Labels) *storeSuite { extLset := labels.FromStrings("ext1", "value1") minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) @@ -162,10 +157,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore. maxTime: maxTime, } - metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{ - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), - block.NewLabelShardedMetaFilter(relabelConfig), - }) + metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{}) assert.NoError(t, err) store, err := NewBucketStore( @@ -178,7 +170,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore. newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), nil, 20, - true, mimir_tsdb.DefaultPostingOffsetInMemorySampling, true, true, @@ -187,7 +178,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore. NewBucketStoreMetrics(nil), WithLogger(s.logger), WithIndexCache(s.cache), - WithFilterConfig(filterConf), ) assert.NoError(t, err) t.Cleanup(func() { @@ -426,7 +416,7 @@ func TestBucketStore_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0)) if ok := t.Run("no index cache", func(t *testing.T) { s.cache.SwapWith(noopCache{}) @@ -479,7 +469,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0)) indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ MaxItemSize: 1e5, @@ -492,59 +482,6 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { }) } -func TestBucketStore_TimePartitioning_e2e(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - bkt := objstore.NewInMemBucket() - - dir := t.TempDir() - - hourAfter := time.Now().Add(1 * time.Hour) - filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter} - - // The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks. - expectedChunks := uint64(2 * 2) - - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{ - MinTime: minTimeDuration, - MaxTime: filterMaxTime, - }) - assert.NoError(t, s.store.SyncBlocks(ctx)) - - mint, maxt := s.store.TimeRange() - assert.Equal(t, s.minTime, mint) - assert.Equal(t, filterMaxTime.PrometheusTimestamp(), maxt) - - req := &storepb.SeriesRequest{ - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, - }, - MinTime: mint, - MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)), - } - - expectedLabels := [][]labelpb.ZLabel{ - {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}}, - {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}}, - {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}}, - {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}}, - } - - s.cache.SwapWith(noopCache{}) - srv := newBucketStoreSeriesServer(ctx) - - assert.NoError(t, s.store.Series(req, srv)) - assert.Equal(t, len(expectedLabels), len(srv.SeriesSet)) - - for i, s := range srv.SeriesSet { - assert.Equal(t, expectedLabels[i], s.Labels) - - // prepareTestBlocks makes 3 chunks containing 2 hour data, - // we should only get 1, as we are filtering by time. - assert.Equal(t, 1, len(s.Chunks)) - } -} - func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { // The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks. expectedChunks := uint64(2 * 6) @@ -584,7 +521,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code)) assert.NoError(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ @@ -619,7 +556,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0)) s.cache.SwapWith(noopCache{}) mint, maxt := s.store.TimeRange() @@ -719,7 +656,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0)) s.cache.SwapWith(noopCache{}) mint, maxt := s.store.TimeRange() diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index fdcad0fa617..e3e8449a4a5 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -531,7 +531,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { u.partitioner, u.threadPool, u.cfg.BucketStore.BlockSyncConcurrency, - false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers u.cfg.BucketStore.PostingOffsetsInMemSampling, true, // Enable series hints. u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled, diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index e77ba2030f5..c2c27037999 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -757,7 +757,7 @@ func BenchmarkBucketStoreLabelValues(tb *testing.B) { series := generateSeries(card) tb.Logf("Total %d series generated", len(series)) - s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf, series) + s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), series) mint, maxt := s.store.TimeRange() assert.Equal(tb, s.minTime, mint) assert.Equal(tb, s.maxTime, maxt) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 83cd10eb9b6..77fca418a72 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -13,7 +13,6 @@ import ( "context" "encoding/binary" "fmt" - "io" "math" "math/rand" "os" @@ -37,7 +36,6 @@ import ( "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -72,8 +70,6 @@ const ( labelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" ) -var emptyRelabelConfig = make([]*relabel.Config, 0) - func TestBucketBlock_Property(t *testing.T) { parameters := gopter.DefaultTestParameters() parameters.Rng.Seed(2000) @@ -502,358 +498,6 @@ func TestBucketBlockSet_labelMatchers(t *testing.T) { } } -func TestBucketStore_Info(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dir := t.TempDir() - - chunkPool, err := NewDefaultChunkBytesPool(2e5) - assert.NoError(t, err) - - bucketStore, err := NewBucketStore( - "test", - nil, - nil, - dir, - NewChunksLimiterFactory(0), - NewSeriesLimiterFactory(0), - newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - nil, - 20, - true, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - false, - false, - 0, - hashcache.NewSeriesHashCache(1024*1024), - NewBucketStoreMetrics(nil), - WithChunkPool(chunkPool), - WithFilterConfig(allowAllFilterConf), - ) - assert.NoError(t, err) - defer func() { assert.NoError(t, bucketStore.Close()) }() - - resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) - assert.NoError(t, err) - - assert.Equal(t, storepb.StoreType_STORE, resp.StoreType) - assert.Equal(t, int64(math.MaxInt64), resp.MinTime) - assert.Equal(t, int64(math.MinInt64), resp.MaxTime) - assert.Equal(t, []labelpb.ZLabelSet(nil), resp.LabelSets) - assert.Equal(t, []labelpb.ZLabel(nil), resp.Labels) -} - -type recorder struct { - mtx sync.Mutex - objstore.Bucket - - getRangeTouched []string - getTouched []string -} - -func (r *recorder) Get(ctx context.Context, name string) (io.ReadCloser, error) { - r.mtx.Lock() - defer r.mtx.Unlock() - - r.getTouched = append(r.getTouched, name) - return r.Bucket.Get(ctx, name) -} - -func (r *recorder) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - r.mtx.Lock() - defer r.mtx.Unlock() - - r.getRangeTouched = append(r.getRangeTouched, name) - return r.Bucket.GetRange(ctx, name, off, length) -} - -func TestBucketStore_Sharding(t *testing.T) { - ctx := context.Background() - logger := log.NewNopLogger() - - dir := t.TempDir() - - bkt := objstore.NewInMemBucket() - series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} - - id1, err := CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0, metadata.NoneFunc) - assert.NoError(t, err) - assert.NoError(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()), metadata.NoneFunc)) - - id2, err := CreateBlock(ctx, dir, series, 10, 1000, 2000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0, metadata.NoneFunc) - assert.NoError(t, err) - assert.NoError(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id2.String()), metadata.NoneFunc)) - - id3, err := CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "b"}, {Name: "region", Value: "r1"}}, 0, metadata.NoneFunc) - assert.NoError(t, err) - assert.NoError(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id3.String()), metadata.NoneFunc)) - - id4, err := CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r2"}}, 0, metadata.NoneFunc) - assert.NoError(t, err) - assert.NoError(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id4.String()), metadata.NoneFunc)) - - if ok := t.Run("new_runs", func(t *testing.T) { - testSharding(t, "", bkt, id1, id2, id3, id4) - }); !ok { - return - } - - dir2 := t.TempDir() - - t.Run("reuse_disk", func(t *testing.T) { - testSharding(t, dir2, bkt, id1, id2, id3, id4) - }) -} - -func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ulid.ULID) { - var cached []ulid.ULID - - logger := log.NewNopLogger() - for _, sc := range []struct { - name string - relabel string - expectedIDs []ulid.ULID - expectedAdvLabels []labelpb.ZLabelSet - }{ - { - name: "no sharding", - expectedIDs: all, - expectedAdvLabels: []labelpb.ZLabelSet{ - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "a"}, - {Name: "region", Value: "r1"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "a"}, - {Name: "region", Value: "r2"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "b"}, - {Name: "region", Value: "r1"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: CompatibilityTypeLabelName, Value: "store"}, - }, - }, - }, - }, - { - name: "drop cluster=a sources", - relabel: ` - - action: drop - regex: "a" - source_labels: - - cluster - `, - expectedIDs: []ulid.ULID{all[2]}, - expectedAdvLabels: []labelpb.ZLabelSet{ - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "b"}, - {Name: "region", Value: "r1"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: CompatibilityTypeLabelName, Value: "store"}, - }, - }, - }, - }, - { - name: "keep only cluster=a sources", - relabel: ` - - action: keep - regex: "a" - source_labels: - - cluster - `, - expectedIDs: []ulid.ULID{all[0], all[1], all[3]}, - expectedAdvLabels: []labelpb.ZLabelSet{ - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "a"}, - {Name: "region", Value: "r1"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "a"}, - {Name: "region", Value: "r2"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: CompatibilityTypeLabelName, Value: "store"}, - }, - }, - }, - }, - { - name: "keep only cluster=a without .*2 region sources", - relabel: ` - - action: keep - regex: "a" - source_labels: - - cluster - - action: drop - regex: ".*2" - source_labels: - - region - `, - expectedIDs: []ulid.ULID{all[0], all[1]}, - expectedAdvLabels: []labelpb.ZLabelSet{ - { - Labels: []labelpb.ZLabel{ - {Name: "cluster", Value: "a"}, - {Name: "region", Value: "r1"}, - }, - }, - { - Labels: []labelpb.ZLabel{ - {Name: CompatibilityTypeLabelName, Value: "store"}, - }, - }, - }, - }, - { - name: "drop all", - relabel: ` - - action: drop - regex: "a" - source_labels: - - cluster - - action: drop - regex: "r1" - source_labels: - - region - `, - expectedIDs: []ulid.ULID{}, - expectedAdvLabels: []labelpb.ZLabelSet{}, - }, - } { - t.Run(sc.name, func(t *testing.T) { - dir := reuseDisk - - if dir == "" { - dir = t.TempDir() - } - relabelConf, err := block.ParseRelabelConfig([]byte(sc.relabel), block.SelectorSupportedRelabelActions) - assert.NoError(t, err) - - rec := &recorder{Bucket: bkt} - metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{ - block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), - block.NewLabelShardedMetaFilter(relabelConf), - }) - assert.NoError(t, err) - - bucketStore, err := NewBucketStore( - "test", - objstore.WithNoopInstr(rec), - metaFetcher, - dir, - NewChunksLimiterFactory(0), - NewSeriesLimiterFactory(0), - newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - nil, - 20, - true, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - false, - false, - 0, - hashcache.NewSeriesHashCache(1024*1024), - NewBucketStoreMetrics(nil), - WithLogger(logger), - WithFilterConfig(allowAllFilterConf), - ) - assert.NoError(t, err) - defer func() { assert.NoError(t, bucketStore.Close()) }() - - assert.NoError(t, bucketStore.InitialSync(context.Background())) - - // Check "stored" blocks. - ids := make([]ulid.ULID, 0, len(bucketStore.blocks)) - for id := range bucketStore.blocks { - ids = append(ids, id) - } - sort.Slice(ids, func(i, j int) bool { - return ids[i].Compare(ids[j]) < 0 - }) - assert.Equal(t, sc.expectedIDs, ids) - - // Check Info endpoint. - resp, err := bucketStore.Info(context.Background(), &storepb.InfoRequest{}) - assert.NoError(t, err) - - assert.Equal(t, storepb.StoreType_STORE, resp.StoreType) - assert.Equal(t, []labelpb.ZLabel(nil), resp.Labels) - assert.Equal(t, sc.expectedAdvLabels, resp.LabelSets) - - // Make sure we don't download files we did not expect to. - // Regression test: https://github.com/thanos-io/thanos/issues/1664 - - // Sort records. We load blocks concurrently so operations might be not ordered. - sort.Strings(rec.getRangeTouched) - - // With binary header nothing should be downloaded fully. - assert.Equal(t, []string(nil), rec.getTouched) - if reuseDisk != "" { - assert.Equal(t, expectedTouchedBlockOps(all, sc.expectedIDs, cached), rec.getRangeTouched) - cached = sc.expectedIDs - return - } - - assert.Equal(t, expectedTouchedBlockOps(all, sc.expectedIDs, nil), rec.getRangeTouched) - }) - } -} - -func expectedTouchedBlockOps(all, expected, cached []ulid.ULID) []string { - var ops []string - for _, id := range all { - blockCached := false - for _, fid := range cached { - if id.Compare(fid) == 0 { - blockCached = true - break - } - } - if blockCached { - continue - } - - found := false - for _, fid := range expected { - if id.Compare(fid) == 0 { - found = true - break - } - } - - if found { - ops = append(ops, - // To create binary header we touch part of index few times. - path.Join(id.String(), block.IndexFilename), // Version. - path.Join(id.String(), block.IndexFilename), // TOC. - path.Join(id.String(), block.IndexFilename), // Symbols. - path.Join(id.String(), block.IndexFilename), // PostingOffsets. - ) - } - } - sort.Strings(ops) - return ops -} - // Regression tests against: https://github.com/thanos-io/thanos/issues/1983. func TestReadIndexCache_LoadSeries(t *testing.T) { bkt := objstore.NewInMemBucket() @@ -1715,7 +1359,6 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), nil, 1, - false, mimir_tsdb.DefaultPostingOffsetInMemorySampling, false, false, @@ -2081,7 +1724,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), nil, 10, - false, mimir_tsdb.DefaultPostingOffsetInMemorySampling, true, false, @@ -2173,7 +1815,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), nil, 10, - false, mimir_tsdb.DefaultPostingOffsetInMemorySampling, true, false, @@ -2358,7 +1999,6 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), nil, 10, - false, mimir_tsdb.DefaultPostingOffsetInMemorySampling, true, false, @@ -3034,7 +2674,7 @@ type seriesCase struct { } // runTestServerSeries runs tests against given cases. -func runTestServerSeries(t test.TB, store storepb.StoreServer, cases ...*seriesCase) { +func runTestServerSeries(t test.TB, store *BucketStore, cases ...*seriesCase) { for _, c := range cases { t.Run(c.Name, func(t test.TB) { t.ResetTimer() diff --git a/vendor/github.com/thanos-io/thanos/pkg/component/component.go b/vendor/github.com/thanos-io/thanos/pkg/component/component.go deleted file mode 100644 index b648aca5585..00000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/component/component.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package component - -import ( - "strings" - - "github.com/thanos-io/thanos/pkg/store/storepb" -) - -// Component is a generic component interface. -type Component interface { - String() string -} - -// StoreAPI is a component that implements Thanos' gRPC StoreAPI. -type StoreAPI interface { - implementsStoreAPI() - String() string - ToProto() storepb.StoreType -} - -// Source is a Thanos component that produce blocks of metrics. -type Source interface { - producesBlocks() - String() string -} - -// SourceStoreAPI is a component that implements Thanos' gRPC StoreAPI -// and produce blocks of metrics. -type SourceStoreAPI interface { - implementsStoreAPI() - producesBlocks() - String() string - ToProto() storepb.StoreType -} - -type component struct { - name string -} - -func (c component) String() string { return c.name } - -type storeAPI struct { - component -} - -func (storeAPI) implementsStoreAPI() {} - -func (s sourceStoreAPI) ToProto() storepb.StoreType { - return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())]) -} - -func (s storeAPI) ToProto() storepb.StoreType { - return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())]) -} - -type source struct { - component -} - -func (source) producesBlocks() {} - -type sourceStoreAPI struct { - component - source - storeAPI -} - -// FromProto converts from a gRPC StoreType to StoreAPI. -func FromProto(storeType storepb.StoreType) StoreAPI { - switch storeType { - case storepb.StoreType_QUERY: - return Query - case storepb.StoreType_RULE: - return Rule - case storepb.StoreType_SIDECAR: - return Sidecar - case storepb.StoreType_STORE: - return Store - case storepb.StoreType_RECEIVE: - return Receive - case storepb.StoreType_DEBUG: - return Debug - default: - return UnknownStoreAPI - } -} - -func FromString(storeType string) StoreAPI { - switch storeType { - case "query": - return Query - case "rule": - return Rule - case "sidecar": - return Sidecar - case "store": - return Store - case "receive": - return Receive - case "debug": - return Debug - default: - return UnknownStoreAPI - } -} - -var ( - Bucket = source{component: component{name: "bucket"}} - Cleanup = source{component: component{name: "cleanup"}} - Mark = source{component: component{name: "mark"}} - Rewrite = source{component: component{name: "rewrite"}} - Retention = source{component: component{name: "retention"}} - Compact = source{component: component{name: "compact"}} - Downsample = source{component: component{name: "downsample"}} - Replicate = source{component: component{name: "replicate"}} - QueryFrontend = source{component: component{name: "query-frontend"}} - Debug = sourceStoreAPI{component: component{name: "debug"}} - Receive = sourceStoreAPI{component: component{name: "receive"}} - Rule = sourceStoreAPI{component: component{name: "rule"}} - Sidecar = sourceStoreAPI{component: component{name: "sidecar"}} - Store = storeAPI{component: component{name: "store"}} - UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}} - Query = storeAPI{component: component{name: "query"}} -) diff --git a/vendor/modules.txt b/vendor/modules.txt index aebb076f478..2178898c62a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -815,7 +815,6 @@ github.com/thanos-io/thanos/pkg/block/metadata github.com/thanos-io/thanos/pkg/cache github.com/thanos-io/thanos/pkg/cacheutil github.com/thanos-io/thanos/pkg/compact/downsample -github.com/thanos-io/thanos/pkg/component github.com/thanos-io/thanos/pkg/discovery/dns github.com/thanos-io/thanos/pkg/discovery/dns/godns github.com/thanos-io/thanos/pkg/discovery/dns/miekgdns