Skip to content

Commit

Permalink
Introduce an internal option to enable vertical compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Dec 20, 2019
1 parent b32108b commit c3770d9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func runCompact(
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex, relabelConfig)
blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
49 changes: 31 additions & 18 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@ var blockTooFreshSentinelError = errors.New("Block too fresh")
// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
relabelConfig []*relabel.Config
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
enableVerticalCompaction bool
relabelConfig []*relabel.Config
}

type syncerMetrics struct {
Expand Down Expand Up @@ -140,7 +141,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool, relabelConfig []*relabel.Config) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -154,6 +155,10 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
relabelConfig: relabelConfig,
// The syncer offers an option to enable vertical compaction, even if it's
// not currently used by Thanos, because the compactor is also used by Cortex
// which needs vertical compaction.
enableVerticalCompaction: enableVerticalCompaction,
}, nil
}

Expand Down Expand Up @@ -366,6 +371,7 @@ func (c *Syncer) Groups() (res []*Group, err error) {
labels.FromMap(m.Thanos.Labels),
m.Thanos.Downsample.Resolution,
c.acceptMalformedIndex,
c.enableVerticalCompaction,
c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)),
Expand Down Expand Up @@ -514,6 +520,7 @@ type Group struct {
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
Expand All @@ -528,6 +535,7 @@ func newGroup(
lset labels.Labels,
resolution int64,
acceptMalformedIndex bool,
enableVerticalCompaction bool,
compactions prometheus.Counter,
compactionRunsStarted prometheus.Counter,
compactionRunsCompleted prometheus.Counter,
Expand Down Expand Up @@ -806,9 +814,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
cg.mtx.Lock()
defer cg.mtx.Unlock()

// Check for overlapped blocks.
if err := cg.areBlocksOverlapping(nil); err != nil {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
// Check for overlapped blocks, unless vertical compaction has been enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(nil); err != nil {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
}
}

// Planning a compaction works purely based on the meta.json files in our future group's dir.
Expand Down Expand Up @@ -942,9 +952,12 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

// Ensure the output block is not overlapping with anything else.
if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}

if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
defer cancel()

relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestGroup_Compact_e2e(t *testing.T) {

reg := prometheus.NewRegistry()

sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil)
sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, false, nil)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
testutil.Ok(t, err)

var ids []ulid.ULID
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {

bkt := inmem.NewBucket()
relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, false, relabelConfig)
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
Expand Down

0 comments on commit c3770d9

Please sign in to comment.