Skip to content

Commit

Permalink
Parallelize vertical compactions
Browse files Browse the repository at this point in the history
The current compaction implementation allocates one goroutine per compaction
stream. This means that compaction can run as fast as the slowest stream.
As a result, as soon as one stream starts to fall behind, the all other streams
can become affected. In addition, despite setting a high compact-concurrency,
CPU utilization can still be low because of the one-goroutine-per-stream limit.

The compaction algorithm also prioritizes vertical compactions over horizontal ones.
As soon as it detects any overlapping blocks, it will compact those blocks and reevaluate
the plan in a subsequent iteration.

This commit enables parallel execution of vertical compactions within a single compaction
stream. It does that by first changing the Planner interface to allow it to return multiple
compaction tasks per group instead of a single one. It also adapts the algorithm for detecting
overlapping blocks to be able to detect multiple independent groups. These groups are then
returned as distinct compaction tasks and the compactor can execute them in separate goroutines.

By modifying the planner interface, this commit also enables parallelizing
horizontal compactions in the future.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Dec 1, 2022
1 parent afdb30e commit 912172b
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 203 deletions.
152 changes: 81 additions & 71 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,26 +542,28 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g
if len(plan) == 0 {
continue
}
groupCompactions[g.key]++

toRemove := make(map[ulid.ULID]struct{}, len(plan))
metas := make([]*tsdb.BlockMeta, 0, len(plan))
for _, p := range plan {
metas = append(metas, &p.BlockMeta)
toRemove[p.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)
for _, groupTask := range plan {
groupCompactions[g.key]++
toRemove := make(map[ulid.ULID]struct{}, len(groupTask))
metas := make([]*tsdb.BlockMeta, 0, len(groupTask))
for _, meta := range groupTask {
metas = append(metas, &meta.BlockMeta)
toRemove[meta.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)
groupBlocks[g.key] += len(groupTask)

groupBlocks[g.key] += len(plan)
newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
}

if len(g.metasByMinTime) == 0 {
continue
}

newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
tmpGroups = append(tmpGroups, g)
}

Expand Down Expand Up @@ -727,7 +729,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr
type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error)
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error)
}

// Compactor provides compaction against an underlying storage of time series data.
Expand Down Expand Up @@ -990,19 +992,19 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
overlappingBlocks = true
}

var toCompact []*metadata.Meta
var tasks []compactionTask
if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) {
toCompact, e = planner.Plan(ctx, cg.metasByMinTime)
tasks, e = planner.Plan(ctx, cg.metasByMinTime)
return e
}); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
}
if len(toCompact) == 0 {
if len(tasks) == 0 {
// Nothing to do.
return false, ulid.ULID{}, nil
}

level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact))
level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks))

// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
Expand All @@ -1014,53 +1016,55 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
g, errCtx := errgroup.WithContext(ctx)
g.SetLimit(cg.compactBlocksFetchConcurrency)

toCompactDirs := make([]string, 0, len(toCompact))
for _, m := range toCompact {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact))
}
uniqueSources[s] = struct{}{}
}
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
toCompactDirs := make([]string, 0, len(tasks))
for _, task := range tasks {
for _, m := range task {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", tasks))
}
uniqueSources[s] = struct{}{}
}
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
var stats block.HealthStats
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) {
stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return e
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}
// Ensure all input blocks are valid.
var stats block.HealthStats
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) {
stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return e
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}

if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}
if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}
if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
return nil
})
}(errCtx, m)
if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
return nil
})
}(errCtx, m)

toCompactDirs = append(toCompactDirs, bdir)
toCompactDirs = append(toCompactDirs, bdir)
}
}
sourceBlockStr := fmt.Sprintf("%v", toCompactDirs)

Expand All @@ -1080,10 +1084,12 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr)
for _, meta := range toCompact {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
for _, task := range tasks {
for _, meta := range task {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
}
}
}
}
Expand Down Expand Up @@ -1125,8 +1131,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
for _, task := range tasks {
if err := cg.areBlocksOverlapping(newMeta, task...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}
}

Expand All @@ -1143,14 +1151,16 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Mark for deletion the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, meta := range toCompact {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
for _, task := range tasks {
for _, meta := range task {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}
cg.groupGarbageCollectedBlocks.Inc()
}

level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr,
Expand Down
Loading

0 comments on commit 912172b

Please sign in to comment.