From c632532fca55fc853b584dc377847cff9a5adfaa Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Mon, 11 Mar 2019 14:41:07 +0000 Subject: [PATCH 1/9] skip compaction for blocks with no samples --- pkg/compact/compact.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5eea403d01..0e6484319c 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -779,6 +779,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err != nil { return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) } + if compID == (ulid.ULID{}) { + // Prometheus compactor found that the compacted block would have no samples. + level.Info(cg.logger).Log("msg", "skipping compaction as compacted block would have no samples", "blocks", fmt.Sprintf("%v", plan)) + return compID, nil + } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) From 79b98eda7b7f4c115ba03e793148f5c698293903 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Mon, 11 Mar 2019 16:48:44 +0000 Subject: [PATCH 2/9] update to actually delete the empty input blocks, and to correctly handle from bucket compactor --- pkg/compact/compact.go | 59 +++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 0e6484319c..9738f58963 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -34,6 +34,7 @@ const ( ) var blockTooFreshSentinelError = errors.New("Block too fresh") +var emptyCompactedBlockSentinelError = errors.New("Compaction would have created empty block") // Syncer syncronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. @@ -781,8 +782,18 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. - level.Info(cg.logger).Log("msg", "skipping compaction as compacted block would have no samples", "blocks", fmt.Sprintf("%v", plan)) - return compID, nil + level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan)) + for _, block := range plan { + meta, err := metadata.Read(block) + if err != nil { + level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block) + continue + } + if meta.Stats.NumSamples == 0 { + cg.deleteBlock(block) + } + } + return compID, emptyCompactedBlockSentinelError } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -823,22 +834,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, b := range plan { - id, err := ulid.Parse(filepath.Base(b)) - if err != nil { - return compID, errors.Wrapf(err, "plan dir %s", b) - } - - if err := os.RemoveAll(b); err != nil { - return compID, errors.Wrapf(err, "remove old block dir %s", id) - } - - // Spawn a new context so we always delete a block in full on shutdown. - delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id, "result_block", compID) - err = block.Delete(delCtx, cg.bkt, id) - cancel() - if err != nil { - return compID, retry(errors.Wrapf(err, "delete old block %s from bucket ", id)) + if err := cg.deleteBlock(b); err != nil { + return compID, retry(errors.Wrapf(err, "delete old block from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } @@ -846,6 +843,26 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return compID, nil } +func (cg *Group) deleteBlock(b string) error { + id, err := ulid.Parse(filepath.Base(b)) + if err != nil { + return errors.Wrapf(err, "plan dir %s", b) + } + + if err := os.RemoveAll(b); err != nil { + return errors.Wrapf(err, "remove old block dir %s", id) + } + + // Spawn a new context so we always delete a block in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id) + if err := block.Delete(delCtx, cg.bkt, id); err != nil { + return errors.Wrapf(err, "delete block %s from bucket", id) + } + return nil +} + // BucketCompactor compacts blocks in a bucket. type BucketCompactor struct { logger log.Logger @@ -887,6 +904,8 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { return errors.Wrap(err, "garbage") } + level.Info(c.logger).Log("msg", "start of compaction") + groups, err := c.sy.Groups() if err != nil { return errors.Wrap(err, "build compaction groups") @@ -903,6 +922,10 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { continue } + if err == emptyCompactedBlockSentinelError { + continue + } + if IsIssue347Error(err) { if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { done = false From eb2b5df439dfd661be9c97a3a2f8ae66dfe6c33f Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Mon, 11 Mar 2019 16:52:30 +0000 Subject: [PATCH 3/9] warn on error deleting empty block --- pkg/compact/compact.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 9738f58963..7018d96edb 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -790,7 +790,9 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( continue } if meta.Stats.NumSamples == 0 { - cg.deleteBlock(block) + if err := cg.deleteBlock(block); err != nil { + level.Warn(cg.logger).Log("msg", "failed to delete empty block found during compaction", "block", block) + } } } return compID, emptyCompactedBlockSentinelError From 05bd7fd4f53d54aa89e2f15cca2b593b60e78c6a Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Mon, 11 Mar 2019 17:37:49 +0000 Subject: [PATCH 4/9] use ULID instead of error for emptyBlockSentinel --- pkg/compact/compact.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7018d96edb..b2f4ad50ed 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -34,7 +34,7 @@ const ( ) var blockTooFreshSentinelError = errors.New("Block too fresh") -var emptyCompactedBlockSentinelError = errors.New("Compaction would have created empty block") +var emptyBlockSentinelULID = ulid.MustNew(123, nil) // Syncer syncronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. @@ -795,7 +795,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } } } - return compID, emptyCompactedBlockSentinelError + return emptyBlockSentinelULID, nil } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -924,10 +924,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { continue } - if err == emptyCompactedBlockSentinelError { - continue - } - if IsIssue347Error(err) { if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { done = false From 7453588f26558127b130f4d1d450a8dd5551fde4 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Mon, 11 Mar 2019 17:43:29 +0000 Subject: [PATCH 5/9] don't use a global variable --- pkg/compact/compact.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b2f4ad50ed..a2f026d991 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -34,7 +34,6 @@ const ( ) var blockTooFreshSentinelError = errors.New("Block too fresh") -var emptyBlockSentinelULID = ulid.MustNew(123, nil) // Syncer syncronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. @@ -795,7 +794,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } } } - return emptyBlockSentinelULID, nil + // Return a dummy ULID since the bucket compactor takes this as a signal to continue + return ulid.New(123, nil) } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) From b0ac1b7ff1deeb11fab4526ff553d234a700e495 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Mon, 11 Mar 2019 17:44:21 +0000 Subject: [PATCH 6/9] full stop at end of comment --- pkg/compact/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a2f026d991..eb18182f4e 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -794,7 +794,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } } } - // Return a dummy ULID since the bucket compactor takes this as a signal to continue + // Return a dummy ULID since the bucket compactor takes this as a signal to continue. return ulid.New(123, nil) } level.Debug(cg.logger).Log("msg", "compacted blocks", From 385a9919d68fc0960abf64beb8e61c7574fbc031 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Tue, 12 Mar 2019 11:27:37 +0000 Subject: [PATCH 7/9] use boolean to indicate whether there is more compaction work --- pkg/compact/compact.go | 76 ++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index eb18182f4e..34f092eab7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -513,23 +513,23 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (ulid.ULID, error) { +func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, error) { subDir := filepath.Join(dir, cg.Key()) if err := os.RemoveAll(subDir); err != nil { - return ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") + return false, errors.Wrap(err, "clean compaction group dir") } if err := os.MkdirAll(subDir, 0777); err != nil { - return ulid.ULID{}, errors.Wrap(err, "create compaction group dir") + return false, errors.Wrap(err, "create compaction group dir") } - compID, err := cg.compact(ctx, subDir, comp) + done, err := cg.compact(ctx, subDir, comp) if err != nil { cg.compactionFailures.Inc() } cg.compactions.Inc() - return compID, err + return done, err } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -688,13 +688,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (compID ulid.ULID, err error) { +func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (isFullyCompacted bool, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() // Check for overlapped blocks. if err := cg.areBlocksOverlapping(nil); err != nil { - return compID, halt(errors.Wrap(err, "pre compaction overlap check")) + return false, halt(errors.Wrap(err, "pre compaction overlap check")) } // Planning a compaction works purely based on the meta.json files in our future group's dir. @@ -702,21 +702,21 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( for _, meta := range cg.blocks { bdir := filepath.Join(dir, meta.ULID.String()) if err := os.MkdirAll(bdir, 0777); err != nil { - return compID, errors.Wrap(err, "create planning block dir") + return false, errors.Wrap(err, "create planning block dir") } if err := metadata.Write(cg.logger, bdir, meta); err != nil { - return compID, errors.Wrap(err, "write planning meta file") + return false, errors.Wrap(err, "write planning meta file") } } // Plan against the written meta.json files. plan, err := comp.Plan(dir) if err != nil { - return compID, errors.Wrap(err, "plan compaction") + return false, errors.Wrap(err, "plan compaction") } if len(plan) == 0 { // Nothing to do. - return compID, nil + return true, nil } // Due to #183 we verify that none of the blocks in the plan have overlapping sources. @@ -729,45 +729,45 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( for _, pdir := range plan { meta, err := metadata.Read(pdir) if err != nil { - return compID, errors.Wrapf(err, "read meta from %s", pdir) + return false, errors.Wrapf(err, "read meta from %s", pdir) } if cg.Key() != GroupKey(*meta) { - return compID, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) + return false, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) } for _, s := range meta.Compaction.Sources { if _, ok := uniqueSources[s]; ok { - return compID, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) + return false, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) } uniqueSources[s] = struct{}{} } id, err := ulid.Parse(filepath.Base(pdir)) if err != nil { - return compID, errors.Wrapf(err, "plan dir %s", pdir) + return false, errors.Wrapf(err, "plan dir %s", pdir) } if meta.ULID.Compare(id) != 0 { - return compID, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) + return false, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) } if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil { - return compID, retry(errors.Wrapf(err, "download block %s", id)) + return false, retry(errors.Wrapf(err, "download block %s", id)) } // Ensure all input blocks are valid. stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { - return compID, errors.Wrapf(err, "gather index issues for block %s", pdir) + return false, errors.Wrapf(err, "gather index issues for block %s", pdir) } if err := stats.CriticalErr(); err != nil { - return compID, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) + return false, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) } if err := stats.Issue347OutsideChunksErr(); err != nil { - return compID, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) + return false, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", @@ -775,9 +775,9 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() - compID, err = comp.Compact(dir, plan, nil) + compID, err := comp.Compact(dir, plan, nil) if err != nil { - return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) + return false, halt(errors.Wrapf(err, "compact blocks %v", plan)) } if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. @@ -794,8 +794,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } } } - // Return a dummy ULID since the bucket compactor takes this as a signal to continue. - return ulid.New(123, nil) + // Even though this block was empty, there may be more work to do + return false, nil } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -808,27 +808,27 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( Source: metadata.CompactorSource, }, nil) if err != nil { - return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) + return false, errors.Wrapf(err, "failed to finalize the block %s", bdir) } if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return compID, errors.Wrap(err, "remove tombstones") + return false, errors.Wrap(err, "remove tombstones") } // Ensure the output block is valid. if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { - return compID, halt(errors.Wrapf(err, "invalid result block %s", bdir)) + return false, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } // Ensure the output block is not overlapping with anything else. if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil { - return compID, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + return false, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { - return compID, retry(errors.Wrapf(err, "upload of %s failed", compID)) + return false, retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) @@ -837,12 +837,12 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, b := range plan { if err := cg.deleteBlock(b); err != nil { - return compID, retry(errors.Wrapf(err, "delete old block from bucket")) + return false, retry(errors.Wrapf(err, "delete old block from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } - return compID, nil + return false, nil } func (cg *Group) deleteBlock(b string) error { @@ -912,27 +912,25 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { if err != nil { return errors.Wrap(err, "build compaction groups") } - done := true + finishedAllGroups := true for _, g := range groups { - id, err := g.Compact(ctx, c.compactDir, c.comp) + finishedGroup, err := g.Compact(ctx, c.compactDir, c.comp) if err == nil { - // If the returned ID has a zero value, the group had no blocks to be compacted. - // We keep going through the outer loop until no group has any work left. - if id != (ulid.ULID{}) { - done = false + if !finishedGroup { + finishedAllGroups = false } continue } if IsIssue347Error(err) { if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { - done = false + finishedAllGroups = false continue } } return errors.Wrap(err, "compaction") } - if done { + if finishedAllGroups { break } } From a731d8508b9cdf10fa1da3c13ccf3baa7e585166 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Tue, 12 Mar 2019 14:24:54 +0000 Subject: [PATCH 8/9] rename variables --- pkg/compact/compact.go | 64 ++++++++++++++++----------------- pkg/compact/compact_e2e_test.go | 8 ++--- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 34f092eab7..ca05adfb4a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -513,23 +513,23 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, error) { +func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) { subDir := filepath.Join(dir, cg.Key()) if err := os.RemoveAll(subDir); err != nil { - return false, errors.Wrap(err, "clean compaction group dir") + return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") } if err := os.MkdirAll(subDir, 0777); err != nil { - return false, errors.Wrap(err, "create compaction group dir") + return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - done, err := cg.compact(ctx, subDir, comp) + shouldRerun, compID, err := cg.compact(ctx, subDir, comp) if err != nil { cg.compactionFailures.Inc() } cg.compactions.Inc() - return done, err + return shouldRerun, compID, err } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -688,13 +688,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (isFullyCompacted bool, err error) { +func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() // Check for overlapped blocks. if err := cg.areBlocksOverlapping(nil); err != nil { - return false, halt(errors.Wrap(err, "pre compaction overlap check")) + return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check")) } // Planning a compaction works purely based on the meta.json files in our future group's dir. @@ -702,21 +702,21 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( for _, meta := range cg.blocks { bdir := filepath.Join(dir, meta.ULID.String()) if err := os.MkdirAll(bdir, 0777); err != nil { - return false, errors.Wrap(err, "create planning block dir") + return false, ulid.ULID{}, errors.Wrap(err, "create planning block dir") } if err := metadata.Write(cg.logger, bdir, meta); err != nil { - return false, errors.Wrap(err, "write planning meta file") + return false, ulid.ULID{}, errors.Wrap(err, "write planning meta file") } } // Plan against the written meta.json files. plan, err := comp.Plan(dir) if err != nil { - return false, errors.Wrap(err, "plan compaction") + return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") } if len(plan) == 0 { // Nothing to do. - return true, nil + return false, ulid.ULID{}, nil } // Due to #183 we verify that none of the blocks in the plan have overlapping sources. @@ -729,45 +729,45 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( for _, pdir := range plan { meta, err := metadata.Read(pdir) if err != nil { - return false, errors.Wrapf(err, "read meta from %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir) } if cg.Key() != GroupKey(*meta) { - return false, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) } for _, s := range meta.Compaction.Sources { if _, ok := uniqueSources[s]; ok { - return false, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) + return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) } uniqueSources[s] = struct{}{} } id, err := ulid.Parse(filepath.Base(pdir)) if err != nil { - return false, errors.Wrapf(err, "plan dir %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "plan dir %s", pdir) } if meta.ULID.Compare(id) != 0 { - return false, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) + return false, ulid.ULID{}, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) } if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil { - return false, retry(errors.Wrapf(err, "download block %s", id)) + return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id)) } // Ensure all input blocks are valid. stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { - return false, errors.Wrapf(err, "gather index issues for block %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", pdir) } if err := stats.CriticalErr(); err != nil { - return false, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) } if err := stats.Issue347OutsideChunksErr(); err != nil { - return false, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) + return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", @@ -775,9 +775,9 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() - compID, err := comp.Compact(dir, plan, nil) + compID, err = comp.Compact(dir, plan, nil) if err != nil { - return false, halt(errors.Wrapf(err, "compact blocks %v", plan)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", plan)) } if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. @@ -795,7 +795,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } } // Even though this block was empty, there may be more work to do - return false, nil + return true, ulid.ULID{}, nil } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -808,27 +808,27 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( Source: metadata.CompactorSource, }, nil) if err != nil { - return false, errors.Wrapf(err, "failed to finalize the block %s", bdir) + return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) } if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return false, errors.Wrap(err, "remove tombstones") + return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") } // Ensure the output block is valid. if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { - return false, halt(errors.Wrapf(err, "invalid result block %s", bdir)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } // Ensure the output block is not overlapping with anything else. if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil { - return false, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { - return false, retry(errors.Wrapf(err, "upload of %s failed", compID)) + return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) @@ -837,12 +837,12 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, b := range plan { if err := cg.deleteBlock(b); err != nil { - return false, retry(errors.Wrapf(err, "delete old block from bucket")) + return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } - return false, nil + return true, compID, nil } func (cg *Group) deleteBlock(b string) error { @@ -914,9 +914,9 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } finishedAllGroups := true for _, g := range groups { - finishedGroup, err := g.Compact(ctx, c.compactDir, c.comp) + shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) if err == nil { - if !finishedGroup { + if shouldRerunGroup { finishedAllGroups = false } continue diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index a2073ad633..ba7142f796 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -253,18 +253,18 @@ func TestGroup_Compact_e2e(t *testing.T) { comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil) testutil.Ok(t, err) - id, err := g.Compact(ctx, dir, comp) + shouldRerun, id, err := g.Compact(ctx, dir, comp) testutil.Ok(t, err) - testutil.Assert(t, id == ulid.ULID{}, "group should be empty, but somehow compaction took place") + testutil.Assert(t, shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") // Add all metas that would be gathered by syncMetas. for _, m := range metas { testutil.Ok(t, g.Add(m)) } - id, err = g.Compact(ctx, dir, comp) + shouldRerun, id, err = g.Compact(ctx, dir, comp) testutil.Ok(t, err) - testutil.Assert(t, id != ulid.ULID{}, "no compaction took place") + testutil.Assert(t, !shouldRerun, "there should be compactible data, but the compactor reported there was not") resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir)) From aedea91a3402b317cfe185aa251123a335ea568d Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Tue, 12 Mar 2019 14:44:16 +0000 Subject: [PATCH 9/9] fix test --- pkg/compact/compact_e2e_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index ba7142f796..6f2194c41f 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -255,7 +255,7 @@ func TestGroup_Compact_e2e(t *testing.T) { shouldRerun, id, err := g.Compact(ctx, dir, comp) testutil.Ok(t, err) - testutil.Assert(t, shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") + testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") // Add all metas that would be gathered by syncMetas. for _, m := range metas { @@ -264,7 +264,7 @@ func TestGroup_Compact_e2e(t *testing.T) { shouldRerun, id, err = g.Compact(ctx, dir, comp) testutil.Ok(t, err) - testutil.Assert(t, !shouldRerun, "there should be compactible data, but the compactor reported there was not") + testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not") resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir))