Skip to content

Commit

Permalink
compact: Added support for no-compact markers in planner.
Browse files Browse the repository at this point in the history
The planner algo was adapted to avoid unnecessary changes to
blocks caused by excluded blocks, so we can quickly switch to
different planning logic in next iteration.

Fixes: #1424

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 4, 2020
1 parent cb727ab commit 7d8b92b
Show file tree
Hide file tree
Showing 13 changed files with 591 additions and 200 deletions.
4 changes: 3 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func runCompact(
component,
)
api := blocksAPI.NewBlocksAPI(logger, conf.label, flagsMap)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt)
var sy *compact.Syncer
{
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
Expand All @@ -229,6 +230,7 @@ func runCompact(
block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)},
)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
Expand Down Expand Up @@ -280,7 +282,7 @@ func runCompact(

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels), comp, compactDir, bkt, conf.compactionConcurrency)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
return nil
}

deletionMark, err := json.Marshal(metadata.DeletionMark{
deletionMark, err := json.Marshal(metadata.DeletionMarker{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestMarkForDeletion(t *testing.T) {
{
name: "block with deletion mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
deletionMark, err := json.Marshal(metadata.DeletionMark{
deletionMark, err := json.Marshal(metadata.DeletionMarker{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
Expand Down
33 changes: 19 additions & 14 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down Expand Up @@ -111,6 +114,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
reg,
Expand Down Expand Up @@ -759,7 +763,7 @@ type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
bkt objstore.InstrumentedBucketReader
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
deletionMarkMap map[ulid.ULID]*metadata.DeletionMarker
}

// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
Expand All @@ -772,29 +776,30 @@ func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBuc
}

// DeletionMarkBlocks returns block ids that were marked for deletion.
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark {
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMarker {
return f.deletionMarkMap
}

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMarker)

for id := range metas {
deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String())
if err == metadata.ErrorDeletionMarkNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
if err != nil {
m := &metadata.DeletionMarker{}
if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil {
if errors.Cause(err) == metadata.ErrorMarkerNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalMarker {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
return err
}
f.deletionMarkMap[id] = deletionMark
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {

f.deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
delete(metas, id)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,13 +1084,13 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
delay: 48 * time.Hour,
}

shouldFetch := &metadata.DeletionMark{
shouldFetch := &metadata.DeletionMarker{
ID: ULID(1),
DeletionTime: now.Add(-15 * time.Hour).Unix(),
Version: 1,
}

shouldIgnore := &metadata.DeletionMark{
shouldIgnore := &metadata.DeletionMarker{
ID: ULID(2),
DeletionTime: now.Add(-60 * time.Hour).Unix(),
Version: 1,
Expand Down
76 changes: 0 additions & 76 deletions pkg/block/metadata/deletionmark.go

This file was deleted.

81 changes: 0 additions & 81 deletions pkg/block/metadata/deletionmark_test.go

This file was deleted.

Loading

0 comments on commit 7d8b92b

Please sign in to comment.