Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: export estimated number of compaction jobs based on bucket-index #7299

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131
* [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146
* [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262 #7267
* [ENHANCEMENT] Compactor: After updating bucket-index, compactor now also computes estimated number of compaction jobs based on current bucket-index, and reports the result in `cortex_bucket_index_estimated_compaction_jobs` metric. If computation of jobs fails, `cortex_bucket_index_estimated_compaction_jobs_errors_total` is updated instead. #7299
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
* [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766
Expand Down
125 changes: 113 additions & 12 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type BlocksCleanerConfig struct {
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
DeleteBlocksConcurrency int
NoBlocksFileCleanupEnabled bool
CompactionBlockRanges mimir_tsdb.DurationList // Used for estimating compaction jobs.
}

type BlocksCleaner struct {
Expand All @@ -60,18 +61,20 @@ type BlocksCleaner struct {
lastOwnedUsers []string

// Metrics.
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
partialBlocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantMarkedBlocks *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
partialBlocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantMarkedBlocks *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
bucketIndexCompactionJobs *prometheus.GaugeVec
bucketIndexCompactionPlanningErrors prometheus.Counter
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, ownUser func(userID string) (bool, error), cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
Expand Down Expand Up @@ -137,6 +140,15 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, own
Name: "cortex_bucket_index_last_successful_update_timestamp_seconds",
Help: "Timestamp of the last successful update of a tenant's bucket index.",
}, []string{"user"}),

bucketIndexCompactionJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_index_estimated_compaction_jobs",
Help: "Estimated number of compaction jobs based on latest version of bucket index.",
}, []string{"user", "type"}),
bucketIndexCompactionPlanningErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_bucket_index_estimated_compaction_jobs_errors_total",
Help: "Total number of failed executions of compaction job estimation based on latest version of bucket index.",
}),
}

c.Service = services.NewTimerService(cfg.CleanupInterval, c.starting, c.ticker, c.stopping)
Expand Down Expand Up @@ -231,6 +243,8 @@ func (c *BlocksCleaner) refreshOwnedUsers(ctx context.Context) ([]string, map[st
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))
}
}
c.lastOwnedUsers = allUsers
Expand Down Expand Up @@ -337,6 +351,8 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))

if deletedBlocks > 0 {
level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks)
Expand Down Expand Up @@ -457,6 +473,21 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, userLogger
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()

// Compute pending compaction jobs based on current index.
splitJobs, mergeJobs, err := c.estimateCompactionJobsFrom(ctx, userID, userBucket, idx)
if err != nil {
// When compactor is shutting down, we get context cancellation. There's no reason to report that as error.
if !errors.Is(err, context.Canceled) {
level.Error(userLogger).Log("msg", "failed to compute compaction jobs from bucket index for user", "err", err)
c.bucketIndexCompactionPlanningErrors.Inc()
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageSplit))
c.bucketIndexCompactionJobs.DeleteLabelValues(userID, string(stageMerge))
}
} else {
c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageSplit)).Set(float64(splitJobs))
c.bucketIndexCompactionJobs.WithLabelValues(userID, string(stageMerge)).Set(float64(mergeJobs))
}

return nil
}

Expand Down Expand Up @@ -638,3 +669,73 @@ func stalePartialBlockLastModifiedTime(ctx context.Context, blockID ulid.ULID, u
}
return lastModified, err
}

func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID string, userBucket objstore.InstrumentedBucket, idx *bucketindex.Index) (int, int, error) {
metas := convertBucketIndexToMetasForCompactionJobPlanning(idx)

// We need to pass this metric to MetadataFilters, but we don't need to report this value from BlocksCleaner.
synced := newNoopGaugeVec()

for _, f := range []block.MetadataFilter{
// We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex.
// We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction.
NewNoCompactionMarkFilter(userBucket, true),
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
} {
err := f.Filter(ctx, metas, synced)
if err != nil {
return 0, 0, err
}
}

grouper := NewSplitAndMergeGrouper(userID, c.cfg.CompactionBlockRanges.ToMilliseconds(), uint32(c.cfgProvider.CompactorSplitAndMergeShards(userID)), uint32(c.cfgProvider.CompactorSplitGroups(userID)), log.NewNopLogger())
jobs, err := grouper.Groups(metas)
if err != nil {
return 0, 0, err
}

split := 0
merge := 0
for _, j := range jobs {
if j.UseSplitting() {
split++
} else {
merge++
}
}

return split, merge, nil
}

// Convert index into map of block Metas, but ignore blocks marked for deletion.
func convertBucketIndexToMetasForCompactionJobPlanning(idx *bucketindex.Index) map[ulid.ULID]*block.Meta {
deletedULIDs := idx.BlockDeletionMarks.GetULIDs()
deleted := make(map[ulid.ULID]bool, len(deletedULIDs))
for _, id := range deletedULIDs {
deleted[id] = true
}

metas := map[ulid.ULID]*block.Meta{}
for _, b := range idx.Blocks {
if deleted[b.ID] {
continue
}
metas[b.ID] = b.ThanosMeta()
if metas[b.ID].Thanos.Labels == nil {
metas[b.ID].Thanos.Labels = map[string]string{}
}
metas[b.ID].Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] = b.CompactorShardID // Needed for correct planning.
}
return metas
}

type noopGaugeVec struct {
g prometheus.Gauge
}

func newNoopGaugeVec() *noopGaugeVec {
return &noopGaugeVec{g: promauto.With(nil).NewGauge(prometheus.GaugeOpts{})}
}

func (n *noopGaugeVec) WithLabelValues(...string) prometheus.Gauge {
return n.g
}
73 changes: 73 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -203,10 +204,17 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 2
cortex_bucket_blocks_partials_count{user="user-2"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_estimated_compaction_jobs",
))
}

Expand Down Expand Up @@ -371,10 +379,17 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 0
cortex_bucket_blocks_partials_count{user="user-2"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-2"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-2"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_estimated_compaction_jobs",
))

// Override the users scanner to reconfigure it to only return a subset of users.
Expand All @@ -396,10 +411,15 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 0
# HELP cortex_bucket_index_estimated_compaction_jobs Estimated number of compaction jobs based on latest version of bucket index.
# TYPE cortex_bucket_index_estimated_compaction_jobs gauge
cortex_bucket_index_estimated_compaction_jobs{type="merge",user="user-1"} 0
cortex_bucket_index_estimated_compaction_jobs{type="split",user="user-1"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_partials_count",
"cortex_bucket_index_estimated_compaction_jobs",
))
}

Expand Down Expand Up @@ -1011,6 +1031,59 @@ func TestStalePartialBlockLastModifiedTime(t *testing.T) {
}
}

func TestComputeCompactionJobs(t *testing.T) {
bucketClient, _ := mimir_testutil.PrepareFilesystemBucket(t)
bucketClient = block.BucketWithGlobalMarkers(bucketClient)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
DeleteBlocksConcurrency: 1,
CompactionBlockRanges: tsdb.DurationList{2 * time.Hour, 24 * time.Hour},
}

const user = "test"

cfgProvider := newMockConfigProvider()
cfgProvider.splitGroups[user] = 0 // No grouping of jobs for split-compaction. All jobs will be in single split compaction.
cfgProvider.splitAndMergeShards[user] = 3

twoHoursMS := 2 * time.Hour.Milliseconds()
dayMS := 24 * time.Hour.Milliseconds()

blockMarkedForNoCompact := ulid.MustNew(ulid.Now(), rand.Reader)

index := bucketindex.Index{}
index.Blocks = bucketindex.Blocks{
// Some 2h blocks that should be compacted together and split.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: 0, MaxTime: twoHoursMS},

// Some merge jobs.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "1_of_3"},

&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "2_of_3"},

// This merge job is skipped, as block is marked for no-compaction.
&bucketindex.Block{ID: ulid.MustNew(ulid.Now(), rand.Reader), MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
&bucketindex.Block{ID: blockMarkedForNoCompact, MinTime: dayMS, MaxTime: 2 * dayMS, CompactorShardID: "3_of_3"},
}

userBucket := bucket.NewUserBucketClient(user, bucketClient, nil)
// Mark block for no-compaction.
require.NoError(t, block.MarkForNoCompact(context.Background(), log.NewNopLogger(), userBucket, blockMarkedForNoCompact, block.CriticalNoCompactReason, "testing", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

cleaner := NewBlocksCleaner(cfg, bucketClient, tsdb.AllUsers, cfgProvider, log.NewNopLogger(), nil)
split, merge, err := cleaner.estimateCompactionJobsFrom(context.Background(), user, userBucket, &index)
require.NoError(t, err)
require.Equal(t, 1, split)
require.Equal(t, 2, merge)
}

type mockBucketFailure struct {
objstore.Bucket

Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
NoBlocksFileCleanupEnabled: c.compactorCfg.NoBlocksFileCleanupEnabled,
CompactionBlockRanges: c.compactorCfg.BlockRanges,
}, c.bucketClient, c.shardingStrategy.blocksCleanerOwnUser, c.cfgProvider, c.parentLogger, c.registerer)

// Start blocks cleaner asynchronously, don't wait until initial cleanup is finished.
Expand Down
Loading