diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5eea403d01..ca05adfb4a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -513,23 +513,23 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (ulid.ULID, error) { +func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) { subDir := filepath.Join(dir, cg.Key()) if err := os.RemoveAll(subDir); err != nil { - return ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") + return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") } if err := os.MkdirAll(subDir, 0777); err != nil { - return ulid.ULID{}, errors.Wrap(err, "create compaction group dir") + return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - compID, err := cg.compact(ctx, subDir, comp) + shouldRerun, compID, err := cg.compact(ctx, subDir, comp) if err != nil { cg.compactionFailures.Inc() } cg.compactions.Inc() - return compID, err + return shouldRerun, compID, err } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -688,13 +688,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (compID ulid.ULID, err error) { +func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() // Check for overlapped blocks. if err := cg.areBlocksOverlapping(nil); err != nil { - return compID, halt(errors.Wrap(err, "pre compaction overlap check")) + return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check")) } // Planning a compaction works purely based on the meta.json files in our future group's dir. @@ -702,21 +702,21 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( for _, meta := range cg.blocks { bdir := filepath.Join(dir, meta.ULID.String()) if err := os.MkdirAll(bdir, 0777); err != nil { - return compID, errors.Wrap(err, "create planning block dir") + return false, ulid.ULID{}, errors.Wrap(err, "create planning block dir") } if err := metadata.Write(cg.logger, bdir, meta); err != nil { - return compID, errors.Wrap(err, "write planning meta file") + return false, ulid.ULID{}, errors.Wrap(err, "write planning meta file") } } // Plan against the written meta.json files. plan, err := comp.Plan(dir) if err != nil { - return compID, errors.Wrap(err, "plan compaction") + return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") } if len(plan) == 0 { // Nothing to do. - return compID, nil + return false, ulid.ULID{}, nil } // Due to #183 we verify that none of the blocks in the plan have overlapping sources. @@ -729,45 +729,45 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( for _, pdir := range plan { meta, err := metadata.Read(pdir) if err != nil { - return compID, errors.Wrapf(err, "read meta from %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir) } if cg.Key() != GroupKey(*meta) { - return compID, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) } for _, s := range meta.Compaction.Sources { if _, ok := uniqueSources[s]; ok { - return compID, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) + return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) } uniqueSources[s] = struct{}{} } id, err := ulid.Parse(filepath.Base(pdir)) if err != nil { - return compID, errors.Wrapf(err, "plan dir %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "plan dir %s", pdir) } if meta.ULID.Compare(id) != 0 { - return compID, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) + return false, ulid.ULID{}, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) } if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil { - return compID, retry(errors.Wrapf(err, "download block %s", id)) + return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id)) } // Ensure all input blocks are valid. stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { - return compID, errors.Wrapf(err, "gather index issues for block %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", pdir) } if err := stats.CriticalErr(); err != nil { - return compID, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) } if err := stats.Issue347OutsideChunksErr(); err != nil { - return compID, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) + return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", @@ -777,7 +777,25 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( compID, err = comp.Compact(dir, plan, nil) if err != nil { - return compID, halt(errors.Wrapf(err, "compact blocks %v", plan)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", plan)) + } + if compID == (ulid.ULID{}) { + // Prometheus compactor found that the compacted block would have no samples. + level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan)) + for _, block := range plan { + meta, err := metadata.Read(block) + if err != nil { + level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block) + continue + } + if meta.Stats.NumSamples == 0 { + if err := cg.deleteBlock(block); err != nil { + level.Warn(cg.logger).Log("msg", "failed to delete empty block found during compaction", "block", block) + } + } + } + // Even though this block was empty, there may be more work to do + return true, ulid.ULID{}, nil } level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -790,27 +808,27 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( Source: metadata.CompactorSource, }, nil) if err != nil { - return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) + return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) } if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return compID, errors.Wrap(err, "remove tombstones") + return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") } // Ensure the output block is valid. if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { - return compID, halt(errors.Wrapf(err, "invalid result block %s", bdir)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } // Ensure the output block is not overlapping with anything else. if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil { - return compID, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { - return compID, retry(errors.Wrapf(err, "upload of %s failed", compID)) + return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) @@ -818,27 +836,33 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, b := range plan { - id, err := ulid.Parse(filepath.Base(b)) - if err != nil { - return compID, errors.Wrapf(err, "plan dir %s", b) + if err := cg.deleteBlock(b); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket")) } + cg.groupGarbageCollectedBlocks.Inc() + } - if err := os.RemoveAll(b); err != nil { - return compID, errors.Wrapf(err, "remove old block dir %s", id) - } + return true, compID, nil +} - // Spawn a new context so we always delete a block in full on shutdown. - delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id, "result_block", compID) - err = block.Delete(delCtx, cg.bkt, id) - cancel() - if err != nil { - return compID, retry(errors.Wrapf(err, "delete old block %s from bucket ", id)) - } - cg.groupGarbageCollectedBlocks.Inc() +func (cg *Group) deleteBlock(b string) error { + id, err := ulid.Parse(filepath.Base(b)) + if err != nil { + return errors.Wrapf(err, "plan dir %s", b) + } + + if err := os.RemoveAll(b); err != nil { + return errors.Wrapf(err, "remove old block dir %s", id) } - return compID, nil + // Spawn a new context so we always delete a block in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id) + if err := block.Delete(delCtx, cg.bkt, id); err != nil { + return errors.Wrapf(err, "delete block %s from bucket", id) + } + return nil } // BucketCompactor compacts blocks in a bucket. @@ -882,31 +906,31 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { return errors.Wrap(err, "garbage") } + level.Info(c.logger).Log("msg", "start of compaction") + groups, err := c.sy.Groups() if err != nil { return errors.Wrap(err, "build compaction groups") } - done := true + finishedAllGroups := true for _, g := range groups { - id, err := g.Compact(ctx, c.compactDir, c.comp) + shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) if err == nil { - // If the returned ID has a zero value, the group had no blocks to be compacted. - // We keep going through the outer loop until no group has any work left. - if id != (ulid.ULID{}) { - done = false + if shouldRerunGroup { + finishedAllGroups = false } continue } if IsIssue347Error(err) { if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { - done = false + finishedAllGroups = false continue } } return errors.Wrap(err, "compaction") } - if done { + if finishedAllGroups { break } } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index a2073ad633..6f2194c41f 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -253,18 +253,18 @@ func TestGroup_Compact_e2e(t *testing.T) { comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil) testutil.Ok(t, err) - id, err := g.Compact(ctx, dir, comp) + shouldRerun, id, err := g.Compact(ctx, dir, comp) testutil.Ok(t, err) - testutil.Assert(t, id == ulid.ULID{}, "group should be empty, but somehow compaction took place") + testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") // Add all metas that would be gathered by syncMetas. for _, m := range metas { testutil.Ok(t, g.Add(m)) } - id, err = g.Compact(ctx, dir, comp) + shouldRerun, id, err = g.Compact(ctx, dir, comp) testutil.Ok(t, err) - testutil.Assert(t, id != ulid.ULID{}, "no compaction took place") + testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not") resDir := filepath.Join(dir, id.String()) testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir))