diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index dbff235696a..fd3872239c7 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -273,7 +273,7 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } - grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks) + grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, conf.addHashes, reg, blocksMarkedForDeletion, garbageCollectedBlocks) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures) compactor, err := compact.NewBucketCompactor(logger, sy, grouper, comp, compactDir, bkt, conf.compactionConcurrency) if err != nil { @@ -449,6 +449,7 @@ func runCompact( type compactConfig struct { haltOnError bool acceptMalformedIndex bool + addHashes bool maxCompactionLevel int http httpConfig dataDir string @@ -495,6 +496,9 @@ func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) { cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever"). Default("0d").SetValue(&cc.retentionOneHr) + cmd.Flag("add-hashes", "Add hashes to meta file after producing new blocks which permits avoiding downloading some files twice albeit at some performance cost."). + BoolVar(&cc.addHashes) + // TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266. cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). Short('w').BoolVar(&cc.wait) diff --git a/docs/components/compact.md b/docs/components/compact.md index ee924bfdecd..699af4fa20b 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -130,6 +130,9 @@ Flags: How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever + --add-hashes Add hashes to meta file after producing new + blocks which permits avoiding downloading some + files twice albeit at some performance cost. -w, --wait Do not exit after all compactions have been processed and wait for new work. --wait-interval=5m Wait interval between consecutive compaction diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 8398cbfb7f5..e79c5aca8ca 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -106,6 +106,7 @@ func TestHashDownload(t *testing.T) { testutil.Equals(t, 3, len(m.Thanos.Hashes), "expected there to be three hashes calculated") err = Download(ctx, log.NewNopLogger(), bkt, m, path.Join(tmpDir, b1.String()), downloadDir) + testutil.Ok(t, err) testutil.Equals(t, len(m.Thanos.Hashes), len(ignoredPathsLastCall)) } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 9a62a41f12b..a294f33dd9a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -230,6 +230,7 @@ type DefaultGrouper struct { verticalCompactions *prometheus.CounterVec garbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter + addHashes bool } // NewDefaultGrouper makes a new DefaultGrouper. @@ -238,6 +239,7 @@ func NewDefaultGrouper( bkt objstore.Bucket, acceptMalformedIndex bool, enableVerticalCompaction bool, + addHashes bool, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, @@ -289,6 +291,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro m.Thanos.Downsample.Resolution, g.acceptMalformedIndex, g.enableVerticalCompaction, + g.addHashes, g.compactions.WithLabelValues(groupKey), g.compactionRunsStarted.WithLabelValues(groupKey), g.compactionRunsCompleted.WithLabelValues(groupKey), @@ -332,6 +335,7 @@ type Group struct { verticalCompactions prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter + addHashes bool } // NewGroup returns a new compaction group. @@ -343,6 +347,7 @@ func NewGroup( resolution int64, acceptMalformedIndex bool, enableVerticalCompaction bool, + addHashes bool, compactions prometheus.Counter, compactionRunsStarted prometheus.Counter, compactionRunsCompleted prometheus.Counter, @@ -370,6 +375,7 @@ func NewGroup( verticalCompactions: verticalCompactions, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, + addHashes: addHashes, } return g, nil } @@ -462,9 +468,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } }() - if err := os.RemoveAll(subDir); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") - } if err := os.MkdirAll(subDir, 0777); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } @@ -781,11 +784,19 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) index := filepath.Join(bdir, block.IndexFilename) - newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ + m := metadata.Thanos{ Labels: cg.labels.Map(), Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, Source: metadata.CompactorSource, - }, nil) + } + + if cg.addHashes { + if m.Hashes, err = metadata.GetHashDir(bdir, metadata.SHA256Kind, metadata.NominalIgnoredFiles...); err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "failed to calculate hashes for block %s", bdir) + } + } + + newMeta, err := metadata.InjectThanos(cg.logger, bdir, m, nil) if err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) } @@ -936,11 +947,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { }() } - // Clean up the compaction temporary directory at the beginning of every compaction loop. - if err := os.RemoveAll(c.compactDir); err != nil { - return errors.Wrap(err, "clean up the compaction temporary directory") - } - level.Info(c.logger).Log("msg", "start sync of metas") if err := c.sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync") diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 1f7254165e9..4add0255cfc 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -133,7 +133,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks) + grouper := NewDefaultGrouper(nil, bkt, false, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -195,7 +195,7 @@ func TestGroup_Compact_e2e(t *testing.T) { comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) testutil.Ok(t, err) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks) + grouper := NewDefaultGrouper(logger, bkt, false, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks) bComp, err := NewBucketCompactor(logger, sy, grouper, comp, dir, bkt, 2) testutil.Ok(t, err)