From c2a58ac8a68745ee61e9aff506da0ae63779cd9e Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Mon, 23 Mar 2020 18:39:41 +0100 Subject: [PATCH] Separate filters and modifiers Signed-off-by: Kemal Akkoyun --- cmd/thanos/bucket.go | 8 +++--- cmd/thanos/compact.go | 13 +++++---- cmd/thanos/downsample.go | 2 +- cmd/thanos/main_test.go | 4 +-- cmd/thanos/store.go | 14 ++++----- pkg/block/fetcher.go | 51 +++++++++++++++++++++------------ pkg/block/fetcher_test.go | 36 ++++++++++++++--------- pkg/compact/clean_test.go | 2 +- pkg/compact/compact_e2e_test.go | 14 ++++----- pkg/compact/retention_test.go | 2 +- pkg/replicate/replicator.go | 2 +- pkg/replicate/scheme_test.go | 2 +- pkg/store/bucket_e2e_test.go | 8 +++--- pkg/store/bucket_test.go | 8 +++--- test/e2e/compact_test.go | 1 - 15 files changed, 95 insertions(+), 72 deletions(-) diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 9ca8d3cb0d1..75cc07593bd 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name issues = append(issues, issueFn) } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } @@ -458,7 +458,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati return errors.Wrap(err, "bucket client") } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 6a066f3b0b6..fbc5c12b6cc 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -287,12 +287,13 @@ func runCompact( duplicateBlocksFilter := block.NewDeduplicateFilter() prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, - block.NewLabelShardedMetaFilter(relabelConfig).Filter, - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, - ignoreDeletionMarkFilter.Filter, - duplicateBlocksFilter.Filter, - block.NewReplicaLabelRemover(logger, dedupReplicaLabels).Modify, + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{ + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer), + ignoreDeletionMarkFilter, + duplicateBlocksFilter, + }, + block.NewReplicaLabelRemover(logger, dedupReplicaLabels), ) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 293ef59adf8..beb45b95a9f 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -72,7 +72,7 @@ func RunDownsample( return err } - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil) if err != nil { return errors.Wrap(err, "create meta fetcher") } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index d44844d7685..58e340db3d0 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) { Name: metricIndexGenerateName, Help: metricIndexGenerateHelp, }) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) testutil.Ok(t, err) testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir)) @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) testutil.Ok(t, err) testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir)) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7a4f88a038a..8031cd84e84 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -236,13 +236,13 @@ func runStore( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay) prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, - block.NewLabelShardedMetaFilter(relabelConfig).Filter, - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, - ignoreDeletionMarkFilter.Filter, - block.NewDeduplicateFilter().Filter, - ) + metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer), + ignoreDeletionMarkFilter, + block.NewDeduplicateFilter(), + }) if err != nil { return errors.Wrap(err, "meta fetcher") } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index a66b12cd79b..059529a2ed5 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -122,7 +122,13 @@ type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) } -type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error +type MetadataFilter interface { + Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, gauge *extprom.TxGaugeVec, incompleteView bool) error +} + +type MetadataModifier interface { + Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, gauge *extprom.TxGaugeVec, incompleteView bool) error +} // MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Not go-routine safe. @@ -135,13 +141,14 @@ type MetaFetcher struct { cacheDir string metrics *fetcherMetrics - filters []MetaFetcherFilter + filters []MetadataFilter + modifiers []MetadataModifier cached map[ulid.ULID]*metadata.Meta } // NewMetaFetcher constructs MetaFetcher. -func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters ...MetaFetcherFilter) (*MetaFetcher, error) { +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters []MetadataFilter, modifiers ...MetadataModifier) (*MetaFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -161,6 +168,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade cacheDir: cacheDir, metrics: newFetcherMetrics(r), filters: filters, + modifiers: modifiers, cached: map[ulid.ULID]*metadata.Meta{}, }, nil } @@ -368,11 +376,18 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. for _, f := range s.filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := f(ctx, metas, s.metrics, incompleteView); err != nil { + if err := f.Filter(ctx, metas, s.metrics.synced, incompleteView); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } + for _, m := range s.modifiers { + // NOTE: modifier can update modified metric accordingly to the reason of the modification. + if err := m.Modify(ctx, metas, s.metrics.modified, incompleteView); err != nil { + return nil, nil, errors.Wrap(err, "modify metas") + } + } + s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) s.metrics.submit() @@ -384,7 +399,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. return metas, partial, nil } -var _ MetaFetcherFilter = (&TimePartitionMetaFilter{}).Filter +var _ MetadataFilter = &TimePartitionMetaFilter{} // TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range. // Not go-routine safe. @@ -398,18 +413,18 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim } // Filter filters out blocks that are outside of specified time range. -func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error { +func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { for id, m := range metas { if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { continue } - metrics.synced.WithLabelValues(timeExcludedMeta).Inc() + synced.WithLabelValues(timeExcludedMeta).Inc() delete(metas, id) } return nil } -var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter +var _ MetadataFilter = &LabelShardedMetaFilter{} // LabelShardedMetaFilter represents struct that allows sharding. // Not go-routine safe. @@ -426,7 +441,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet const blockIDLabel = "__block_id" // Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. -func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error { +func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { var lbls labels.Labels for id, m := range metas { lbls = lbls[:0] @@ -436,7 +451,7 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* } if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 { - metrics.synced.WithLabelValues(labelExcludedMeta).Inc() + synced.WithLabelValues(labelExcludedMeta).Inc() delete(metas, id) } } @@ -456,7 +471,7 @@ func NewDeduplicateFilter() *DeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error { +func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { var wg sync.WaitGroup metasByResolution := make(map[int64][]*metadata.Meta) @@ -473,7 +488,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustNew(uint64(0), nil), }, - }), metasByResolution[res], metas, res, metrics.synced) + }), metasByResolution[res], metas, res, synced) }(res) } @@ -570,14 +585,14 @@ func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaL } // Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. -func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, view bool) error { +func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, view bool) error { for u, meta := range metas { labels := meta.Thanos.Labels for _, replicaLabel := range r.replicaLabels { if _, exists := labels[replicaLabel]; exists { level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel) delete(labels, replicaLabel) - metrics.modified.WithLabelValues(replicaRemovedMeta).Inc() + modified.WithLabelValues(replicaRemovedMeta).Inc() } } metas[u].Thanos.Labels = labels @@ -611,7 +626,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura } // Filter filters out blocks that filters blocks that have are created before a specified consistency delay. -func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error { +func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { for id, meta := range metas { // TODO(khyatisoneji): Remove the checks about Thanos Source // by implementing delete delay to fetch metas. @@ -622,7 +637,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL meta.Thanos.Source != metadata.CompactorRepairSource { level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) - metrics.synced.WithLabelValues(tooFreshMeta).Inc() + synced.WithLabelValues(tooFreshMeta).Inc() delete(metas, id) } } @@ -657,7 +672,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata. // Filter filters out blocks that are marked for deletion after a given delay. // It also returns the blocks that can be deleted since they were uploaded delay duration before current time. -func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, m *fetcherMetrics, _ bool) error { +func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) for id := range metas { @@ -674,7 +689,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL } f.deletionMarkMap[id] = deletionMark if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() { - m.synced.WithLabelValues(markedForDeletionMeta).Inc() + synced.WithLabelValues(markedForDeletionMeta).Inc() delete(metas, id) } } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index b0060f66367..99920b8e5fc 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -41,6 +41,18 @@ func newTestFetcherMetrics() *fetcherMetrics { } } +type ulidFilter struct { + ulidToDelete *ulid.ULID +} + +func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error { + if _, ok := metas[*f.ulidToDelete]; ok { + synced.WithLabelValues("filtered").Inc() + delete(metas, *f.ulidToDelete) + } + return nil +} + func ULID(i int) ulid.ULID { return ulid.MustNew(uint64(i), nil) } func ULIDs(is ...int) []ulid.ULID { @@ -63,12 +75,8 @@ func TestMetaFetcher_Fetch(t *testing.T) { var ulidToDelete ulid.ULID r := prometheus.NewRegistry() - f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error { - if _, ok := metas[ulidToDelete]; ok { - metrics.synced.WithLabelValues("filtered").Inc() - delete(metas, ulidToDelete) - } - return nil + f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, []MetadataFilter{ + &ulidFilter{ulidToDelete: &ulidToDelete}, }) testutil.Ok(t, err) @@ -344,7 +352,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m, false)) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) testutil.Equals(t, expected, input) @@ -442,7 +450,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { deleted := len(input) - len(expected) m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m, false)) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) testutil.Equals(t, expected, input) testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) @@ -506,7 +514,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m, false)) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta))) testutil.Equals(t, expected, input) @@ -857,7 +865,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, } } - testutil.Ok(t, f.Filter(ctx, metas, m, false)) + testutil.Ok(t, f.Filter(ctx, metas, m.synced, false)) compareSliceWithMapKeys(t, metas, tcase.expected) testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta))) }); !ok { @@ -909,7 +917,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) { }, } { m := newTestFetcherMetrics() - testutil.Ok(t, rm.Modify(ctx, tcase.input, m, false)) + testutil.Ok(t, rm.Modify(ctx, tcase.input, m.modified, false)) testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta))) testutil.Equals(t, tcase.expected, tcase.input) @@ -1015,7 +1023,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, m, false)) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) @@ -1040,7 +1048,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, m, false)) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) @@ -1093,7 +1101,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m, false)) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta))) testutil.Equals(t, expected, input) }) diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index 74cfbaaf5ab..4507b3f5a50 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -29,7 +29,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { bkt := inmem.NewBucket() logger := log.NewNopLogger() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) testutil.Ok(t, err) // 1. No meta, old block, should be removed. diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 5520a9a8487..52e4147623f 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -90,9 +90,9 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } duplicateBlocksFilter := block.NewDeduplicateFilter() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, - duplicateBlocksFilter.Filter, - ) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{ + duplicateBlocksFilter, + }) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) @@ -178,10 +178,10 @@ func TestGroup_Compact_e2e(t *testing.T) { ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 48*time.Hour) duplicateBlocksFilter := block.NewDeduplicateFilter() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, - ignoreDeletionMarkFilter.Filter, - duplicateBlocksFilter.Filter, - ) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{ + ignoreDeletionMarkFilter, + duplicateBlocksFilter, + }) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 5c9813c1172..980b883839e 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -245,7 +245,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 1e674fa6a49..d419681f853 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -150,7 +150,7 @@ func RunReplicate( Help: "The Duration of replication runs split by success and error.", }, []string{"result"}) - fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg) + fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil) if err != nil { return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt) } diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 9aaf9cff058..6b441a572ba 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -313,7 +313,7 @@ func TestReplicationSchemeAll(t *testing.T) { } filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter - fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil) + fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil, nil) testutil.Ok(t, err) r := newReplicationScheme( diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6bdc6a5950a..821cd70d278 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -147,10 +147,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m maxTime: maxTime, } - metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, - block.NewLabelShardedMetaFilter(relabelConfig).Filter, - ) + metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConfig), + }) testutil.Ok(t, err) store, err := NewBucketStore( diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a95372ec8f0..75cf4466435 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -710,10 +710,10 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul testutil.Ok(t, yaml.Unmarshal([]byte(sc.relabel), &relabelConf)) rec := &recorder{Bucket: bkt} - metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, - block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime).Filter, - block.NewLabelShardedMetaFilter(relabelConf).Filter, - ) + metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConf), + }) testutil.Ok(t, err) bucketStore, err := NewBucketStore( diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 47b76be5ef2..a9061179051 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -128,7 +128,6 @@ func TestCompact(t *testing.T) { samplesPerSeries: 12, }, }, - replicaLabels: []string{}, downsamplingEnabled: true, query: "{a=\"1\"}",