Skip to content

Commit

Permalink
Address review feedback.
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany committed Feb 5, 2024
1 parent f61fca3 commit 0e10d67
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 29 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131
* [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146
* [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262 #7267
* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estaimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_compaction_jobs_errors_total` is updated instead. #7299
* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_estimated_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_estimated_compaction_jobs_errors_total` is updated instead. #7299
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
* [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766
Expand Down
18 changes: 10 additions & 8 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, own
}, []string{"user"}),

bucketIndexCompactionJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_index_compaction_jobs",
Help: "Number of compaction jobs based on latest version of bucket index.",
Name: "cortex_bucket_index_estimated_compaction_jobs",
Help: "Estimated number of compaction jobs based on latest version of bucket index.",
}, []string{"user", "type"}),
bucketIndexCompactionPlanningErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_bucket_index_compaction_jobs_errors_total",
Help: "Total number of failed executions of compaction job planning based on latest version of bucket index.",
Name: "cortex_bucket_index_estimated_compaction_jobs_errors_total",
Help: "Total number of failed executions of compaction job estimation based on latest version of bucket index.",
}),
}

Expand Down Expand Up @@ -474,12 +474,14 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()

// Compute pending compaction jobs based on current index.
splitJobs, mergeJobs, err := c.computeCompactionJobs(ctx, userID, userBucket, idx)
splitJobs, mergeJobs, err := c.estimateCompactionJobsFrom(ctx, userID, userBucket, idx)
if err != nil {
// When compactor is shutting down, we get context cancellation. There's no reason to report that as error.
if !errors.Is(err, context.Canceled) {
level.Error(userLogger).Log("msg", "failed to compute compaction jobs from bucket index for user", "err", err)
c.bucketIndexCompactionPlanningErrors.Inc()
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))
}
} else {
c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs))
Expand Down Expand Up @@ -668,14 +670,14 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u
return lastModified, err
}

func (c *BlocksCleaner) computeCompactionJobs(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) {
func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) {
metas := convertBucketIndexToMetasForCompactionJobPlanning(idx)

// We need to pass this metric to Filters, but we don't need to report this value from BlocksCleaner.
// We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner.
synced := newNoopGaugeVec()

for _, f := range []block.MetadataFilter{
// We don't include ShardAwareDeduplicateFilter, because thus filter relies on list of compaction sources, which are not present in the BucketIndex.
// We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex.
// We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction.
NewNoCompactionMarkFilter(userBucket, true),
} {
Expand Down
40 changes: 20 additions & 20 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,17 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 2
cortex_bucket_blocks_partials_count{user="user-2"} 0
# HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_compaction_jobs gauge
cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_compaction_jobs{type="split",user="user-2"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_compaction_jobs",
"cortex_bucket_index_estimated_compaction_jobs",
))
}

Expand Down Expand Up @@ -379,17 +379,17 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 0
cortex_bucket_blocks_partials_count{user="user-2"} 0
# HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_compaction_jobs gauge
cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_compaction_jobs{type="split",user="user-2"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_compaction_jobs",
"cortex_bucket_index_estimated_compaction_jobs",
))

// Override the users scanner to reconfigure it to only return a subset of users.
Expand All @@ -411,15 +411,15 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 0
# HELP cortex_bucket_index_compaction_jobs Number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_compaction_jobs gauge
cortex_bucket_index_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_compaction_jobs{type="split",user="user-1"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_compaction_jobs",
"cortex_bucket_index_estimated_compaction_jobs",
))
}

Expand Down Expand Up @@ -1078,7 +1078,7 @@ func TestComputeCompactionJobs(t *testing.T) {
require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil)
split, merge, err := cleaner.computeCompactionJobs(context.Background(), user, userBucket, &index)
split, merge, err := cleaner.estimateCompactionJobsFrom(context.Background(), user, userBucket, &index)
require.NoError(t, err)
require.Equal(t, 1, split)
require.Equal(t, 2, merge)
Expand Down

0 comments on commit 0e10d67

Please sign in to comment.