Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize vertical compactions inside a single group #5936

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 81 additions & 71 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,26 +542,28 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g
if len(plan) == 0 {
continue
}
groupCompactions[g.key]++

toRemove := make(map[ulid.ULID]struct{}, len(plan))
metas := make([]*tsdb.BlockMeta, 0, len(plan))
for _, p := range plan {
metas = append(metas, &p.BlockMeta)
toRemove[p.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)
for _, groupTask := range plan {
groupCompactions[g.key]++
toRemove := make(map[ulid.ULID]struct{}, len(groupTask))
metas := make([]*tsdb.BlockMeta, 0, len(groupTask))
for _, meta := range groupTask {
metas = append(metas, &meta.BlockMeta)
toRemove[meta.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)
groupBlocks[g.key] += len(groupTask)

groupBlocks[g.key] += len(plan)
newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
}

if len(g.metasByMinTime) == 0 {
continue
}

newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
tmpGroups = append(tmpGroups, g)
}

Expand Down Expand Up @@ -727,7 +729,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr
type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error)
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand the interface. The return value of compactionTasks means those are tasks that we can run in parallel safely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be the idea. I've added this a documentation on the type.

}

// Compactor provides compaction against an underlying storage of time series data.
Expand Down Expand Up @@ -990,19 +992,19 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
overlappingBlocks = true
}

var toCompact []*metadata.Meta
var tasks []compactionTask
if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) {
toCompact, e = planner.Plan(ctx, cg.metasByMinTime)
tasks, e = planner.Plan(ctx, cg.metasByMinTime)
return e
}); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
}
if len(toCompact) == 0 {
if len(tasks) == 0 {
// Nothing to do.
return false, ulid.ULID{}, nil
}

level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact))
level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks))

// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
Expand All @@ -1014,53 +1016,55 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
g, errCtx := errgroup.WithContext(ctx)
g.SetLimit(cg.compactBlocksFetchConcurrency)

toCompactDirs := make([]string, 0, len(toCompact))
for _, m := range toCompact {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact))
}
uniqueSources[s] = struct{}{}
}
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
toCompactDirs := make([]string, 0, len(tasks))
for _, task := range tasks {
for _, m := range task {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", tasks))
}
uniqueSources[s] = struct{}{}
}
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
var stats block.HealthStats
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) {
stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return e
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}
// Ensure all input blocks are valid.
var stats block.HealthStats
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) {
stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return e
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}

if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}
if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}
if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
return nil
})
}(errCtx, m)
if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
return nil
})
}(errCtx, m)

toCompactDirs = append(toCompactDirs, bdir)
toCompactDirs = append(toCompactDirs, bdir)
}
}
sourceBlockStr := fmt.Sprintf("%v", toCompactDirs)

Expand All @@ -1080,10 +1084,12 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr)
for _, meta := range toCompact {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
for _, task := range tasks {
for _, meta := range task {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
}
}
}
}
Expand Down Expand Up @@ -1125,8 +1131,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
for _, task := range tasks {
if err := cg.areBlocksOverlapping(newMeta, task...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}
}

Expand All @@ -1143,14 +1151,16 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Mark for deletion the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, meta := range toCompact {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
for _, task := range tasks {
for _, meta := range task {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}
cg.groupGarbageCollectedBlocks.Inc()
}

level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr,
Expand Down
Loading