Skip to content

Commit

Permalink
compact: Made MarkForDeletion less strict; Added more debugability to…
Browse files Browse the repository at this point in the history
… block deletion logic, made meta sync explicit.

Also:

* Changed order: Now BestEffortCleanAbortedPartialUploads is before DeleteMarkedBlocks.
* Increment markedForDeletion counter only when we actually uploaded block.
* Fixed logging issues.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 6, 2020
1 parent c05daee commit 2b9fa84
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 130 deletions.
88 changes: 57 additions & 31 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -113,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
Default("5m").Duration()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "DEPRECATED flag. Will be removed in next release. If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Hidden().Default("false").Bool()

disableDownsampling := cmd.Flag("downsampling.disable", "Disables downsampling. This is not recommended "+
Expand Down Expand Up @@ -290,29 +291,48 @@ func runCompact(
}()

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2)
duplicateBlocksFilter := block.NewDeduplicateFilter()

baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
compactFetcher := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})

enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
var sy *compact.Syncer
{
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)},
)
cf.UpdateOnChange(compactorView.Set)
sy, err = compact.NewSyncer(
logger,
reg,
bkt,
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
blocksMarkedForDeletion,
blockSyncConcurrency,
acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
}

levels, err := compactions.levels(maxCompactionLevel)
Expand Down Expand Up @@ -371,39 +391,52 @@ func runCompact(
// We run two passes of this to ensure that the 1h downsampling is generated
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
level.Info(logger).Log("msg", "downsampling was explicitly disabled")
}
// TODO(bwplotka): Find a way to avoid syncing if no op was done

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
return nil
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Generate index file.
// Generate index files.
// TODO(bwplotka): Remove this in next release.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return err
}
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, sy.Metas(), indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -451,16 +484,14 @@ func runCompact(
r := route.New()

ins := extpromhttp.NewInstrumentationMiddleware(reg)
compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
compactorView.Register(r, ins)
compactFetcher.UpdateOnChange(compactorView.Set)

global := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/global"), prefixHeader)
global.Register(r, ins)

// Separate fetcher for global view.
// TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well.
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil)
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil, "component", "globalBucketUI")
f.UpdateOnChange(global.Set)

srv.Handle("/", r)
Expand Down Expand Up @@ -494,7 +525,7 @@ func runCompact(
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string) error {
genIndex := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
Expand All @@ -515,11 +546,6 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom

level.Info(logger).Log("msg", "start index cache processing")

metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "fetch metas")
}

for _, meta := range metas {
// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
Expand Down
21 changes: 11 additions & 10 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,20 @@ func RunDownsample(
statusProber.Ready()

level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
metas, _, err := metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
metas, _, err = metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -144,7 +150,7 @@ func downsampleBucket(
logger log.Logger,
metrics *DownsampleMetrics,
bkt objstore.Bucket,
fetcher block.MetadataFetcher,
metas map[ulid.ULID]*metadata.Meta,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
Expand All @@ -160,11 +166,6 @@ func downsampleBucket(
}
}()

metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "downsampling meta fetch")
}

// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
// if a downsampled version with the same hash already exists.
sources5m := map[ulid.ULID]struct{}{}
Expand Down
8 changes: 6 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metas, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)
Expand Down Expand Up @@ -119,7 +121,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -129,14 +130,15 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
}

// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error {
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile)
}
if deletionMarkExists {
return errors.Errorf("file %s already exists in bucket", deletionMarkFile)
level.Warn(logger).Log("msg", "requested to mark for deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", deletionMarkFile))
return nil
}

deletionMark, err := json.Marshal(metadata.DeletionMark{
Expand All @@ -151,7 +153,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
if err := bkt.Upload(ctx, deletionMarkFile, bytes.NewBuffer(deletionMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", deletionMarkFile)
}

markedForDeletion.Inc()
level.Info(logger).Log("msg", "block has been marked for deletion", "block", id)
return nil
}
Expand All @@ -168,6 +170,7 @@ func Delete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid
if err != nil {
return errors.Wrapf(err, "stat %s", metaFile)
}

if ok {
if err := bkt.Delete(ctx, metaFile); err != nil {
return errors.Wrapf(err, "delete %s", metaFile)
Expand Down
82 changes: 45 additions & 37 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
Expand All @@ -17,6 +16,9 @@ import (

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -232,44 +234,50 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := objstore.NewInMemBucket()
{
blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutDeletionMark.String())))
for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

testutil.Ok(t, MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithoutDeletionMark))
exists, err := bkt.Exists(ctx, path.Join(blockWithoutDeletionMark.String(), metadata.DeletionMarkFilename))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}
{
blockWithDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDeletionMark.String())))
blocksMarked int
}{
{
name: "block marked for deletion",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with deletion mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)

deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: blockWithDeletionMark,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
tcase.preUpload(t, id, bkt)

err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithDeletionMark)
testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf("file %s already exists in bucket", path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename)), err.Error())
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}
Loading

0 comments on commit 2b9fa84

Please sign in to comment.