diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index d69b54db849..b20099cb1de 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -122,8 +122,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").Int() - dedupReplicaLabels := cmd.Flag("offline-deduplication.replica-labels", "Label to treat as a replica indicator of blocks that can be deduplicated. This will merge multiple replica blocks into one. This process is irrevertable. Experminteal"). - Hidden().String() + dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ + "Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks."). + Hidden().Strings() selectorRelabelConf := regSelectorRelabelFlags(cmd) @@ -173,7 +174,7 @@ func runCompact( maxCompactionLevel int, blockSyncConcurrency int, concurrency int, - dedupReplicaLabels string, + dedupReplicaLabels []string, selectorRelabelConf *extflag.PathOrContent, ) error { halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ @@ -246,13 +247,12 @@ func runCompact( duplicateBlocksFilter := block.NewDeduplicateFilter() prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - replicaLabelFilter := block.ReplicaLabelsFilter{ReplicaLabels: strings.Split(dedupReplicaLabels, ",")} metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, block.NewLabelShardedMetaFilter(relabelConfig).Filter, block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, duplicateBlocksFilter.Filter, - replicaLabelFilter.Filter, + block.NewReplicaLabelRemover(dedupReplicaLabels).Modify, ) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 7faa4148325..f22d39a4127 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -32,12 +32,19 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" ) +type txLabeledGauge interface { + WithLabelValues(lvs ...string) prometheus.Gauge + ResetTx() + Submit() +} + type syncMetrics struct { syncs prometheus.Counter syncFailures prometheus.Counter syncDuration prometheus.Histogram - synced *extprom.TxGaugeVec + synced txLabeledGauge + modified txLabeledGauge } const ( @@ -49,11 +56,11 @@ const ( failedMeta = "failed" // Filter's label values. - labelExcludedMeta = "label-excluded" - timeExcludedMeta = "time-excluded" - tooFreshMeta = "too-fresh" - duplicateMeta = "duplicate" - replicaExclude = "replica-exclude" + labelExcludedMeta = "label-excluded" + timeExcludedMeta = "time-excluded" + tooFreshMeta = "too-fresh" + duplicateMeta = "duplicate" + replicaRemovedMeta = "replica-label-removed" ) func newSyncMetrics(reg prometheus.Registerer) *syncMetrics { @@ -89,6 +96,15 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics { []string{labelExcludedMeta}, []string{timeExcludedMeta}, []string{duplicateMeta}, + []string{replicaRemovedMeta}, + ) + m.modified = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Subsystem: syncMetricSubSys, + Name: "modified", + Help: "Number of block metadata that modified", + }, + []string{"modified"}, + []string{replicaRemovedMeta}, ) return &m } @@ -97,11 +113,7 @@ type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) } -type GaugeLabeled interface { - WithLabelValues(lvs ...string) prometheus.Gauge -} - -type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) +type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, incompleteView bool) // MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. type MetaFetcher struct { @@ -346,7 +358,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. for _, f := range s.filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - f(metas, s.metrics.synced, incompleteView) + f(metas, s.metrics, incompleteView) } s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) @@ -373,12 +385,12 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim } // Filter filters out blocks that are outside of specified time range. -func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) { for id, m := range metas { if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { continue } - synced.WithLabelValues(timeExcludedMeta).Inc() + metrics.synced.WithLabelValues(timeExcludedMeta).Inc() delete(metas, id) } } @@ -399,7 +411,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet const blockIDLabel = "__block_id" // Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. -func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) { var lbls labels.Labels for id, m := range metas { lbls = lbls[:0] @@ -409,7 +421,7 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync } if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 { - synced.WithLabelValues(labelExcludedMeta).Inc() + metrics.synced.WithLabelValues(labelExcludedMeta).Inc() delete(metas, id) } } @@ -427,7 +439,7 @@ func NewDeduplicateFilter() *DeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) { var wg sync.WaitGroup metasByResolution := make(map[int64][]*metadata.Meta) @@ -444,14 +456,14 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga BlockMeta: tsdb.BlockMeta{ ULID: ulid.MustNew(uint64(0), nil), }, - }), metasByResolution[res], metas, res, synced) + }), metasByResolution[res], metas, res, metrics.synced) }(res) } wg.Wait() } -func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) { +func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced txLabeledGauge) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -527,17 +539,24 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool { return true } -type ReplicaLabelsFilter struct { - ReplicaLabels []string +// ReplicaLabelRemover is a MetaFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. +type ReplicaLabelRemover struct { + replicaLabels []string +} + +// NewReplicaLabelRemover creates a ReplicaLabelRemover. +func NewReplicaLabelRemover(replicaLabels []string) *ReplicaLabelRemover { + return &ReplicaLabelRemover{replicaLabels: replicaLabels} } -func (f *ReplicaLabelsFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, view bool) { +// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. +func (r *ReplicaLabelRemover) Modify(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, view bool) { for u, meta := range metas { labels := meta.Thanos.Labels - for _, replicaLabel := range f.ReplicaLabels { + for _, replicaLabel := range r.replicaLabels { if _, exists := labels[replicaLabel]; exists { delete(labels, replicaLabel) - synced.WithLabelValues(replicaExclude).Inc() + metrics.modified.WithLabelValues(replicaRemovedMeta).Inc() } } metas[u].Thanos.Labels = labels @@ -569,7 +588,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura } // Filter filters out blocks that filters blocks that have are created before a specified consistency delay. -func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) { for id, meta := range metas { // TODO(khyatisoneji): Remove the checks about Thanos Source // by implementing delete delay to fetch metas. @@ -580,7 +599,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, meta.Thanos.Source != metadata.CompactorRepairSource { level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) - synced.WithLabelValues(tooFreshMeta).Inc() + metrics.synced.WithLabelValues(tooFreshMeta).Inc() delete(metas, id) } } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 00cb63c9f80..e486f17994e 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -35,6 +35,23 @@ import ( "gopkg.in/yaml.v2" ) +type testTxLabeledGauge struct { + g *prometheus.GaugeVec +} + +func (t *testTxLabeledGauge) WithLabelValues(lvs ...string) prometheus.Gauge { + return t.g.WithLabelValues(lvs...) +} +func (t *testTxLabeledGauge) ResetTx() {} +func (t *testTxLabeledGauge) Submit() {} + +func newTestSyncMetrics() *syncMetrics { + return &syncMetrics{ + synced: &testTxLabeledGauge{promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})}, + modified: &testTxLabeledGauge{promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"modified"})}, + } +} + func ULID(i int) ulid.ULID { return ulid.MustNew(uint64(i), nil) } func ULIDs(is ...int) []ulid.ULID { @@ -57,9 +74,9 @@ func TestMetaFetcher_Fetch(t *testing.T) { var ulidToDelete ulid.ULID r := prometheus.NewRegistry() - f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) { + f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, incompleteView bool) { if _, ok := metas[ulidToDelete]; ok { - synced.WithLabelValues("filtered").Inc() + metrics.synced.WithLabelValues("filtered").Inc() delete(metas, ulidToDelete) } }) @@ -333,10 +350,10 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { ULID(6): input[ULID(6)], } - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - f.Filter(input, synced, false) + m := newTestSyncMetrics() + f.Filter(input, m, false) - testutil.Equals(t, 3.0, promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) testutil.Equals(t, expected, input) } @@ -428,11 +445,11 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { } deleted := len(input) - len(expected) - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - f.Filter(input, synced, false) + m := newTestSyncMetrics() + f.Filter(input, m, false) testutil.Equals(t, expected, input) - testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) }) @@ -489,10 +506,10 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { ULID(4): input[ULID(4)], } - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - f.Filter(input, synced, false) + m := newTestSyncMetrics() + f.Filter(input, m, false) - testutil.Equals(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta))) testutil.Equals(t, expected, input) } @@ -820,7 +837,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { } { f := NewDeduplicateFilter() if ok := t.Run(tcase.name, func(t *testing.T) { - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + m := newTestSyncMetrics() metas := make(map[ulid.ULID]*metadata.Meta) inputLen := len(tcase.input) for id, metaInfo := range tcase.input { @@ -838,10 +855,10 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, } } - f.Filter(metas, synced, false) + f.Filter(metas, m, false) compareSliceWithMapKeys(t, metas, tcase.expected) - testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(synced.WithLabelValues(duplicateMeta))) + testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta))) }); !ok { return } @@ -849,7 +866,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { } func TestReplicaLabel_Filter(t *testing.T) { - rf := ReplicaLabelsFilter{ReplicaLabels: []string{"replica", "rule_replica"}} + rm := NewReplicaLabelRemover([]string{"replica", "rule_replica"}) input := map[ulid.ULID]*metadata.Meta{ ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, @@ -864,10 +881,10 @@ func TestReplicaLabel_Filter(t *testing.T) { ULID(4): {Thanos: metadata.Thanos{Labels: map[string]string{}}}, } - synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - rf.Filter(input, synced, false) + m := newTestSyncMetrics() + rm.Modify(input, m, false) - testutil.Equals(t, 5.0, promtest.ToFloat64(synced.WithLabelValues(replicaExclude))) + testutil.Equals(t, 5.0, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta))) testutil.Equals(t, expected, input) } @@ -956,7 +973,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { } t.Run("consistency 0 (turned off)", func(t *testing.T) { - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + m := newTestSyncMetrics() expected := map[ulid.ULID]*metadata.Meta{} // Copy all. for _, id := range u.created { @@ -967,14 +984,14 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - f.Filter(input, synced, false) + f.Filter(input, m, false) - testutil.Equals(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) t.Run("consistency 30m.", func(t *testing.T) { - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + m := newTestSyncMetrics() expected := map[ulid.ULID]*metadata.Meta{} // Only certain sources and those with 30m or more age go through. for i, id := range u.created { @@ -993,9 +1010,9 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - f.Filter(input, synced, false) + f.Filter(input, m, false) - testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index b0f6c3a06b1..82d2e0ec934 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -63,7 +63,7 @@ func TestCompact(t *testing.T) { Type: client.S3, Config: s3Config, }) + testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(compact)) - } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 1f13925a079..66658dee3ac 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -327,8 +327,6 @@ func NewCompact(sharedDir string, name string, bucketConfig client.BucketConfig) return nil, errors.Wrapf(err, "generate compact config file: %v", bucketConfig) } - fmt.Println(string(bktConfigBytes)) - return NewService( fmt.Sprintf("compact-%s", name), DefaultImage(),