From 0fcee44632e3cccf887046c9d1c2fdcb2e76cc4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 5 Feb 2024 12:34:29 +0100 Subject: [PATCH 1/6] Compute number of compaction jobs from bucket index and export it via cortex_bucket_index_compaction_jobs metric. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + pkg/compactor/blocks_cleaner.go | 123 ++++++++++++++++++++++++--- pkg/compactor/blocks_cleaner_test.go | 72 ++++++++++++++++ pkg/compactor/compactor.go | 1 + 4 files changed, 185 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa70f5fbd49..515f9b608dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ * [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131 * [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146 * [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262 #7267 +* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estaimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_compaction_jobs` metric. * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 * [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766 * [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 6c230e0f665..c912796cdce 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -43,6 +43,7 @@ type BlocksCleanerConfig struct { TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". DeleteBlocksConcurrency int NoBlocksFileCleanupEnabled bool + CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs. } type BlocksCleaner struct { @@ -60,18 +61,20 @@ type BlocksCleaner struct { lastOwnedUsers []string // Metrics. - runsStarted prometheus.Counter - runsCompleted prometheus.Counter - runsFailed prometheus.Counter - runsLastSuccess prometheus.Gauge - blocksCleanedTotal prometheus.Counter - blocksFailedTotal prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - partialBlocksMarkedForDeletion prometheus.Counter - tenantBlocks *prometheus.GaugeVec - tenantMarkedBlocks *prometheus.GaugeVec - tenantPartialBlocks *prometheus.GaugeVec - tenantBucketIndexLastUpdate *prometheus.GaugeVec + runsStarted prometheus.Counter + runsCompleted prometheus.Counter + runsFailed prometheus.Counter + runsLastSuccess prometheus.Gauge + blocksCleanedTotal prometheus.Counter + blocksFailedTotal prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + partialBlocksMarkedForDeletion prometheus.Counter + tenantBlocks *prometheus.GaugeVec + tenantMarkedBlocks *prometheus.GaugeVec + tenantPartialBlocks *prometheus.GaugeVec + tenantBucketIndexLastUpdate *prometheus.GaugeVec + bucketIndexCompactionJobs *prometheus.GaugeVec + bucketIndexCompactionPlanningErrors prometheus.Counter } func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { @@ -137,6 +140,15 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, own Name: "cortex_bucket_index_last_successful_update_timestamp_seconds", Help: "Timestamp of the last successful update of a tenant's bucket index.", }, []string{"user"}), + + bucketIndexCompactionJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_bucket_index_compaction_jobs", + Help: "Number of compaction jobs based on latest version of bucket index.", + }, []string{"user", "type"}), + bucketIndexCompactionPlanningErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_bucket_index_compaction_jobs_errors_total", + Help: "Total number of failed executions of compaction job planning based on latest version of bucket index.", + }), } c.Service = services.NewTimerService(cfg.CleanupInterval, c.starting, c.ticker, c.stopping) @@ -231,6 +243,8 @@ func (c *BlocksCleaner) refreshOwnedUsers(ctx context.Context) ([]string, map[st c.tenantMarkedBlocks.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) + c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit)) + c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge)) } } c.lastOwnedUsers = allUsers @@ -337,6 +351,8 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID c.tenantBlocks.DeleteLabelValues(userID) c.tenantMarkedBlocks.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) + c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit)) + c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge)) if deletedBlocks > 0 { level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks) @@ -457,6 +473,19 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() + // Compute pending compaction jobs based on current index. + splitJobs, mergeJobs, err := c.computeCompactionJobs(ctx, userID, userBucket, idx) + if err != nil { + // When compactor is shutting down, we get context cancellation. There's no reason to report that as error. + if !errors.Is(err, context.Canceled) { + level.Error(userLogger).Log("msg", "failed to compute compaction jobs from bucket index for user", "err", err) + c.bucketIndexCompactionPlanningErrors.Inc() + } + } else { + c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs)) + c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageMerge)).Set(float64(mergeJobs)) + } + return nil } @@ -638,3 +667,73 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u } return lastModified, err } + +func (c *BlocksCleaner) computeCompactionJobs(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) { + metas := convertBucketIndexToMetasForCompactionJobPlanning(idx) + + // We need to pass this metric to Filters, but we don't need to report this value from BlocksCleaner. + synced := newNoopGaugeVec() + + for _, f := range []block.MetadataFilter{ + // We don't include ShardAwareDeduplicateFilter, because thus filter relies on list of compaction sources, which are not present in the BucketIndex. + // We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction. + NewNoCompactionMarkFilter(userBucket, true), + } { + err := f.Filter(ctx, metas, synced) + if err != nil { + return 0, 0, err + } + } + + grouper := NewSplitAndMergeGrouper(userID, c.cfg.CompactionBlockRanges.ToMilliseconds(), uint32(c.cfgProvider.CompactorSplitAndMergeShards(userID)), uint32(c.cfgProvider.CompactorSplitGroups(userID)), log.NewNopLogger()) + jobs, err := grouper.Groups(metas) + if err != nil { + return 0, 0, err + } + + split := 0 + merge := 0 + for _, j := range jobs { + if j.UseSplitting() { + split++ + } else { + merge++ + } + } + + return split, merge, nil +} + +// Convert index into map of block Metas, but ignore blocks marked for deletion. +func convertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta { + deletedULIDs := idx.BlockDeletionMarks.GetULIDs() + deleted := make(map[ulid.ULID]bool, len(deletedULIDs)) + for _, id := range deletedULIDs { + deleted[id] = true + } + + metas := map[ulid.ULID]*block.Meta{} + for _, b := range idx.Blocks { + if deleted[b.ID] { + continue + } + metas[b.ID] = b.ThanosMeta() + if metas[b.ID].Thanos.Labels == nil { + metas[b.ID].Thanos.Labels = map[string]string{} + } + metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID // Needed for correct planning. + } + return metas +} + +type noopGaugeVec struct { + g prometheus.Gauge +} + +func newNoopGaugeVec() *noopGaugeVec { + return &noopGaugeVec{g: prometheus.NewGauge(prometheus.GaugeOpts{})} +} + +func (n *noopGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge { + return n.g +} diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 55f59be39ca..d57c11078a2 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -203,10 +203,17 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 2 cortex_bucket_blocks_partials_count{user="user-2"} 0 + # HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # TYPE cortex_bucket_index_compaction_jobs gauge + cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0 + cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0 + cortex_bucket_index_compaction_jobs{type="merge",user="user-2"} 0 + cortex_bucket_index_compaction_jobs{type="split",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_partials_count", + "cortex_bucket_index_compaction_jobs", )) } @@ -371,10 +378,17 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 0 cortex_bucket_blocks_partials_count{user="user-2"} 0 + # HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # TYPE cortex_bucket_index_compaction_jobs gauge + cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0 + cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0 + cortex_bucket_index_compaction_jobs{type="merge",user="user-2"} 0 + cortex_bucket_index_compaction_jobs{type="split",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_partials_count", + "cortex_bucket_index_compaction_jobs", )) // Override the users scanner to reconfigure it to only return a subset of users. @@ -396,10 +410,15 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 0 + # HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # TYPE cortex_bucket_index_compaction_jobs gauge + cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0 + cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_partials_count", + "cortex_bucket_index_compaction_jobs", )) } @@ -1011,6 +1030,59 @@ func TestStalePartialBlockLastModifiedTime(t *testing.T) { } } +func TestComputeCompactionJobs(t *testing.T) { + bucketClient, _ := mimir_testutil.PrepareFilesystemBucket(t) + bucketClient = block.BucketWithGlobalMarkers(bucketClient) + + cfg := BlocksCleanerConfig{ + DeletionDelay: time.Hour, + CleanupInterval: time.Minute, + CleanupConcurrency: 1, + DeleteBlocksConcurrency: 1, + CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour}, + } + + const user = "test" + + cfgProvider := newMockConfigProvider() + cfgProvider.splitGroups[user] = 0 // No grouping of jobs for split-compaction. All jobs will be in single split compaction. + cfgProvider.splitAndMergeShards[user] = 3 + + twoHoursMS := 2 * time.Hour.Milliseconds() + dayMS := 24 * time.Hour.Milliseconds() + + blockMarkedForNoCompact := ulid.MustNew(ulid.Now(), rand.Reader) + + index := bucketindex.Index{} + index.Blocks = bucketindex.Blocks{ + // Some 2h blocks that should be compacted together and split. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS}, + + // Some merge jobs. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"}, + + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"}, + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"}, + + // This merge job is skipped, as block is marked for no-compaction. + &bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"}, + &bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"}, + } + + userBucket := bucket.NewUserBucketClient(user, bucketClient, nil) + // Mark block for no-compaction. + require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", prometheus.NewCounter(prometheus.CounterOpts{}))) + + cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil) + split, merge, err := cleaner.computeCompactionJobs(context.Background(), user, userBucket, &index) + require.NoError(t, err) + require.Equal(t, 1, split) + require.Equal(t, 2, merge) +} + type mockBucketFailure struct { objstore.Bucket diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 0e0f8bde9e5..a998aade0f3 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -493,6 +493,7 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency, NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled, + CompactionBlockRanges: c.compactorCfg.BlockRanges, }, c.bucketClient, c.shardingStrategy.blocksCleanerOwnUser, c.cfgProvider, c.parentLogger, c.registerer) // Start blocks cleaner asynchronously, don't wait until initial cleanup is finished. From db1ac1dd8f60578ba9f8f02651f9684e88d052aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 5 Feb 2024 12:38:33 +0100 Subject: [PATCH 2/6] Add PR number. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 515f9b608dc..3f18c50cf7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,7 @@ * [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131 * [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146 * [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262 #7267 -* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estaimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_compaction_jobs` metric. +* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estaimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_compaction_jobs_errors_total` is updated instead. #7299 * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 * [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766 * [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766 From 526d274721330c336ac7a58bc8f62045e164629d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 5 Feb 2024 13:48:53 +0100 Subject: [PATCH 3/6] Remove unused parameter name. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/compactor/blocks_cleaner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index c912796cdce..c08ce8abf06 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -734,6 +734,6 @@ func newNoopGaugeVec() *noopGaugeVec { return &noopGaugeVec{g: prometheus.NewGauge(prometheus.GaugeOpts{})} } -func (n *noopGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge { +func (n *noopGaugeVec) WithLabelValues(...string) prometheus.Gauge { return n.g } From f61fca3440e2e1a525b6b414819ea0eb1ae84c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 5 Feb 2024 13:55:22 +0100 Subject: [PATCH 4/6] Make linter happy. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/compactor/blocks_cleaner.go | 2 +- pkg/compactor/blocks_cleaner_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index c08ce8abf06..f8a09b31926 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -731,7 +731,7 @@ type noopGaugeVec struct { } func newNoopGaugeVec() *noopGaugeVec { - return &noopGaugeVec{g: prometheus.NewGauge(prometheus.GaugeOpts{})} + return &noopGaugeVec{g: promauto.With(nil).NewGauge(prometheus.GaugeOpts{})} } func (n *noopGaugeVec) WithLabelValues(...string) prometheus.Gauge { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index d57c11078a2..f6c57d1abc0 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1074,7 +1075,7 @@ func TestComputeCompactionJobs(t *testing.T) { userBucket := bucket.NewUserBucketClient(user, bucketClient, nil) // Mark block for no-compaction. - require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", prometheus.NewCounter(prometheus.CounterOpts{}))) + require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil) split, merge, err := cleaner.computeCompactionJobs(context.Background(), user, userBucket, &index) From 0e10d679f40447a145aabb749c6d4cfafad13a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 5 Feb 2024 15:05:57 +0100 Subject: [PATCH 5/6] Address review feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- pkg/compactor/blocks_cleaner.go | 18 +++++++------ pkg/compactor/blocks_cleaner_test.go | 40 ++++++++++++++-------------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f18c50cf7f..e5395013b6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,7 @@ * [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131 * [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146 * [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262 #7267 -* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estaimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_compaction_jobs_errors_total` is updated instead. #7299 +* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_estimated_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_estimated_compaction_jobs_errors_total` is updated instead. #7299 * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 * [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766 * [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index f8a09b31926..86d7c7fd477 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -142,12 +142,12 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, own }, []string{"user"}), bucketIndexCompactionJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_bucket_index_compaction_jobs", - Help: "Number of compaction jobs based on latest version of bucket index.", + Name: "cortex_bucket_index_estimated_compaction_jobs", + Help: "Estimated number of compaction jobs based on latest version of bucket index.", }, []string{"user", "type"}), bucketIndexCompactionPlanningErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_bucket_index_compaction_jobs_errors_total", - Help: "Total number of failed executions of compaction job planning based on latest version of bucket index.", + Name: "cortex_bucket_index_estimated_compaction_jobs_errors_total", + Help: "Total number of failed executions of compaction job estimation based on latest version of bucket index.", }), } @@ -474,12 +474,14 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() // Compute pending compaction jobs based on current index. - splitJobs, mergeJobs, err := c.computeCompactionJobs(ctx, userID, userBucket, idx) + splitJobs, mergeJobs, err := c.estimateCompactionJobsFrom(ctx, userID, userBucket, idx) if err != nil { // When compactor is shutting down, we get context cancellation. There's no reason to report that as error. if !errors.Is(err, context.Canceled) { level.Error(userLogger).Log("msg", "failed to compute compaction jobs from bucket index for user", "err", err) c.bucketIndexCompactionPlanningErrors.Inc() + c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit)) + c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge)) } } else { c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs)) @@ -668,14 +670,14 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u return lastModified, err } -func (c *BlocksCleaner) computeCompactionJobs(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) { +func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) { metas := convertBucketIndexToMetasForCompactionJobPlanning(idx) - // We need to pass this metric to Filters, but we don't need to report this value from BlocksCleaner. + // We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner. synced := newNoopGaugeVec() for _, f := range []block.MetadataFilter{ - // We don't include ShardAwareDeduplicateFilter, because thus filter relies on list of compaction sources, which are not present in the BucketIndex. + // We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex. // We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction. NewNoCompactionMarkFilter(userBucket, true), } { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index f6c57d1abc0..1fbf54883bc 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -204,17 +204,17 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 2 cortex_bucket_blocks_partials_count{user="user-2"} 0 - # HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index. - # TYPE cortex_bucket_index_compaction_jobs gauge - cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0 - cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0 - cortex_bucket_index_compaction_jobs{type="merge",user="user-2"} 0 - cortex_bucket_index_compaction_jobs{type="split",user="user-2"} 0 + # HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # TYPE cortex_bucket_index_estimated_compaction_jobs gauge + cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_partials_count", - "cortex_bucket_index_compaction_jobs", + "cortex_bucket_index_estimated_compaction_jobs", )) } @@ -379,17 +379,17 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 0 cortex_bucket_blocks_partials_count{user="user-2"} 0 - # HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index. - # TYPE cortex_bucket_index_compaction_jobs gauge - cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0 - cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0 - cortex_bucket_index_compaction_jobs{type="merge",user="user-2"} 0 - cortex_bucket_index_compaction_jobs{type="split",user="user-2"} 0 + # HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # TYPE cortex_bucket_index_estimated_compaction_jobs gauge + cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_partials_count", - "cortex_bucket_index_compaction_jobs", + "cortex_bucket_index_estimated_compaction_jobs", )) // Override the users scanner to reconfigure it to only return a subset of users. @@ -411,15 +411,15 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 0 - # HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index. - # TYPE cortex_bucket_index_compaction_jobs gauge - cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0 - cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0 + # HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # TYPE cortex_bucket_index_estimated_compaction_jobs gauge + cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0 + cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", "cortex_bucket_blocks_partials_count", - "cortex_bucket_index_compaction_jobs", + "cortex_bucket_index_estimated_compaction_jobs", )) } @@ -1078,7 +1078,7 @@ func TestComputeCompactionJobs(t *testing.T) { require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil) - split, merge, err := cleaner.computeCompactionJobs(context.Background(), user, userBucket, &index) + split, merge, err := cleaner.estimateCompactionJobsFrom(context.Background(), user, userBucket, &index) require.NoError(t, err) require.Equal(t, 1, split) require.Equal(t, 2, merge) From 1b2a2051bef92c50505ed2a46f7778fd0fb9286e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 5 Feb 2024 15:13:01 +0100 Subject: [PATCH 6/6] Fix tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/compactor/blocks_cleaner_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 1fbf54883bc..d48be225cf2 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -204,7 +204,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 2 cortex_bucket_blocks_partials_count{user="user-2"} 0 - # HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index. # TYPE cortex_bucket_index_estimated_compaction_jobs gauge cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0 cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0 @@ -379,7 +379,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 0 cortex_bucket_blocks_partials_count{user="user-2"} 0 - # HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index. # TYPE cortex_bucket_index_estimated_compaction_jobs gauge cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0 cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0 @@ -411,7 +411,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar # HELP cortex_bucket_blocks_partials_count Total number of partial blocks. # TYPE cortex_bucket_blocks_partials_count gauge cortex_bucket_blocks_partials_count{user="user-1"} 0 - # HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index. + # HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index. # TYPE cortex_bucket_index_estimated_compaction_jobs gauge cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0 cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0