Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce an internal option to enable vertical compaction #1917

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func runCompact(
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex, relabelConfig)
blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
64 changes: 47 additions & 17 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@ var blockTooFreshSentinelError = errors.New("Block too fresh")
// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
relabelConfig []*relabel.Config
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
enableVerticalCompaction bool
relabelConfig []*relabel.Config
}

type syncerMetrics struct {
Expand All @@ -66,6 +67,7 @@ type syncerMetrics struct {
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
}

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
Expand Down Expand Up @@ -119,6 +121,10 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
Name: "thanos_compact_group_compactions_failures_total",
Help: "Total number of failed group compactions.",
}, []string{"group"})
m.verticalCompactions = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"group"})

if reg != nil {
reg.MustRegister(
Expand All @@ -133,14 +139,15 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
m.compactionRunsStarted,
m.compactionRunsCompleted,
m.compactionFailures,
m.verticalCompactions,
)
}
return &m
}

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool, relabelConfig []*relabel.Config) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -154,6 +161,10 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
relabelConfig: relabelConfig,
// The syncer offers an option to enable vertical compaction, even if it's
// not currently used by Thanos, because the compactor is also used by Cortex
// which needs vertical compaction.
enableVerticalCompaction: enableVerticalCompaction,
}, nil
}

Expand Down Expand Up @@ -366,10 +377,12 @@ func (c *Syncer) Groups() (res []*Group, err error) {
labels.FromMap(m.Thanos.Labels),
m.Thanos.Downsample.Resolution,
c.acceptMalformedIndex,
c.enableVerticalCompaction,
c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)),
c.metrics.garbageCollectedBlocks,
)
if err != nil {
Expand Down Expand Up @@ -514,10 +527,12 @@ type Group struct {
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
}

Expand All @@ -528,10 +543,12 @@ func newGroup(
lset labels.Labels,
resolution int64,
acceptMalformedIndex bool,
enableVerticalCompaction bool,
compactions prometheus.Counter,
compactionRunsStarted prometheus.Counter,
compactionRunsCompleted prometheus.Counter,
compactionFailures prometheus.Counter,
verticalCompactions prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
) (*Group, error) {
if logger == nil {
Expand All @@ -544,10 +561,12 @@ func newGroup(
resolution: resolution,
blocks: map[ulid.ULID]*metadata.Meta{},
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
compactions: compactions,
compactionRunsStarted: compactionRunsStarted,
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
}
return g, nil
Expand Down Expand Up @@ -807,8 +826,13 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
defer cg.mtx.Unlock()

// Check for overlapped blocks.
overlappingBlocks := false
if err := cg.areBlocksOverlapping(nil); err != nil {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
if !cg.enableVerticalCompaction {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
}

overlappingBlocks = true
}

// Planning a compaction works purely based on the meta.json files in our future group's dir.
Expand Down Expand Up @@ -917,8 +941,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return true, ulid.ULID{}, nil
}
cg.compactions.Inc()
if overlappingBlocks {
cg.verticalCompactions.Inc()
}
level.Debug(cg.logger).Log("msg", "compacted blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks)

bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)
Expand All @@ -942,9 +969,12 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

// Ensure the output block is not overlapping with anything else.
if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}

if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
defer cancel()

relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestGroup_Compact_e2e(t *testing.T) {

reg := prometheus.NewRegistry()

sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil)
sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, false, nil)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
testutil.Ok(t, err)

var ids []ulid.ULID
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {

bkt := inmem.NewBucket()
relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, false, relabelConfig)
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
Expand Down