Skip to content

Commit

Permalink
compact: wire compact with hashes
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Aug 17, 2020
1 parent 935968f commit 9eb1c6f
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 13 deletions.
6 changes: 5 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, conf.addHashes, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, comp, compactDir, bkt, conf.compactionConcurrency)
if err != nil {
Expand Down Expand Up @@ -449,6 +449,7 @@ func runCompact(
type compactConfig struct {
haltOnError bool
acceptMalformedIndex bool
addHashes bool
maxCompactionLevel int
http httpConfig
dataDir string
Expand Down Expand Up @@ -495,6 +496,9 @@ func (cc *compactConfig) registerFlag(cmd *kingpin.CmdClause) {
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&cc.retentionOneHr)

cmd.Flag("add-hashes", "Add hashes to meta file after producing new blocks which permits avoiding downloading some files twice albeit at some performance cost.").
BoolVar(&cc.addHashes)

// TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266.
cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').BoolVar(&cc.wait)
Expand Down
3 changes: 3 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ Flags:
How long to retain samples of resolution 2 (1
hour) in bucket. Setting this to 0d will retain
samples of this resolution forever
--add-hashes Add hashes to meta file after producing new
blocks which permits avoiding downloading some
files twice albeit at some performance cost.
-w, --wait Do not exit after all compactions have been
processed and wait for new work.
--wait-interval=5m Wait interval between consecutive compaction
Expand Down
1 change: 1 addition & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestHashDownload(t *testing.T) {
testutil.Equals(t, 3, len(m.Thanos.Hashes), "expected there to be three hashes calculated")

err = Download(ctx, log.NewNopLogger(), bkt, m, path.Join(tmpDir, b1.String()), downloadDir)
testutil.Ok(t, err)
testutil.Equals(t, len(m.Thanos.Hashes), len(ignoredPathsLastCall))
}

Expand Down
26 changes: 16 additions & 10 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ type DefaultGrouper struct {
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
addHashes bool
}

// NewDefaultGrouper makes a new DefaultGrouper.
Expand All @@ -238,6 +239,7 @@ func NewDefaultGrouper(
bkt objstore.Bucket,
acceptMalformedIndex bool,
enableVerticalCompaction bool,
addHashes bool,
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
Expand Down Expand Up @@ -289,6 +291,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
m.Thanos.Downsample.Resolution,
g.acceptMalformedIndex,
g.enableVerticalCompaction,
g.addHashes,
g.compactions.WithLabelValues(groupKey),
g.compactionRunsStarted.WithLabelValues(groupKey),
g.compactionRunsCompleted.WithLabelValues(groupKey),
Expand Down Expand Up @@ -332,6 +335,7 @@ type Group struct {
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
addHashes bool
}

// NewGroup returns a new compaction group.
Expand All @@ -343,6 +347,7 @@ func NewGroup(
resolution int64,
acceptMalformedIndex bool,
enableVerticalCompaction bool,
addHashes bool,
compactions prometheus.Counter,
compactionRunsStarted prometheus.Counter,
compactionRunsCompleted prometheus.Counter,
Expand Down Expand Up @@ -370,6 +375,7 @@ func NewGroup(
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
addHashes: addHashes,
}
return g, nil
}
Expand Down Expand Up @@ -462,9 +468,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}
}()

if err := os.RemoveAll(subDir); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir")
}
if err := os.MkdirAll(subDir, 0777); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}
Expand Down Expand Up @@ -781,11 +784,19 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
m := metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
}, nil)
}

if cg.addHashes {
if m.Hashes, err = metadata.GetHashDir(bdir, metadata.SHA256Kind, metadata.NominalIgnoredFiles...); err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to calculate hashes for block %s", bdir)
}
}

newMeta, err := metadata.InjectThanos(cg.logger, bdir, m, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}
Expand Down Expand Up @@ -936,11 +947,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
}()
}

// Clean up the compaction temporary directory at the beginning of every compaction loop.
if err := os.RemoveAll(c.compactDir); err != nil {
return errors.Wrap(err, "clean up the compaction temporary directory")
}

level.Info(c.logger).Log("msg", "start sync of metas")
if err := c.sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync")
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks)
grouper := NewDefaultGrouper(nil, bkt, false, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks)
groups, err := grouper.Groups(sy.Metas())
testutil.Ok(t, err)

Expand Down Expand Up @@ -195,7 +195,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
testutil.Ok(t, err)

grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
grouper := NewDefaultGrouper(logger, bkt, false, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
bComp, err := NewBucketCompactor(logger, sy, grouper, comp, dir, bkt, 2)
testutil.Ok(t, err)

Expand Down

0 comments on commit 9eb1c6f

Please sign in to comment.