From ffd3821a2254f814dc312611bfb3c6d28c1829b2 Mon Sep 17 00:00:00 2001 From: khyatisoneji Date: Fri, 14 Feb 2020 17:56:13 +0530 Subject: [PATCH] store, compact, bucket: schedule block deletion by adding deletion-mark.json file Signed-off-by: khyatisoneji --- CHANGELOG.md | 3 + cmd/thanos/bucket.go | 10 ++- cmd/thanos/compact.go | 43 ++++++++- cmd/thanos/store.go | 11 +++ docs/components/bucket.md | 24 +++-- docs/components/compact.md | 28 ++++-- docs/components/store.md | 5 +- .../201901-read-write-operations-bucket.md | 4 +- pkg/block/block.go | 30 +++++++ pkg/block/block_test.go | 55 ++++++++++++ pkg/block/fetcher.go | 84 +++++++++++++++-- pkg/block/fetcher_test.go | 89 ++++++++++++++++--- pkg/block/metadata/deletionmark.go | 76 ++++++++++++++++ pkg/block/metadata/deletionmark_test.go | 78 ++++++++++++++++ pkg/compact/blocks_cleaner.go | 58 ++++++++++++ pkg/compact/clean.go | 5 +- pkg/compact/clean_test.go | 10 ++- pkg/compact/compact.go | 64 ++++++++----- pkg/compact/compact_e2e_test.go | 23 ++++- pkg/compact/retention.go | 8 +- pkg/compact/retention_test.go | 19 +++- pkg/shipper/shipper.go | 1 + pkg/verifier/duplicated_compaction.go | 4 +- pkg/verifier/index_issue.go | 5 +- pkg/verifier/overlapped_blocks.go | 3 +- pkg/verifier/safe_delete.go | 41 ++++++--- pkg/verifier/verify.go | 66 +++++++++----- 27 files changed, 735 insertions(+), 112 deletions(-) create mode 100644 pkg/block/metadata/deletionmark.go create mode 100644 pkg/block/metadata/deletionmark_test.go create mode 100644 pkg/compact/blocks_cleaner.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 884c316a96d..20b1ac5c5d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,9 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Changed +- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage. +Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. + - [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more. ## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02 diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 57cb41b6d22..9ca8d3cb0d1 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -85,6 +85,12 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings() idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+ "If none is specified, all blocks will be verified. Repeated field").Strings() + deleteDelay := modelDuration(cmd.Flag("delete-delay", "Duration after which blocks marked for deletion would be deleted permanently from source bucket by compactor component. "+ + "If delete-delay is non zero, blocks will be marked for deletion and compactor component is required to delete blocks from source bucket. "+ + "If delete-delay is 0, blocks will be deleted straight away. Use this if you want to get rid of or move the block immediately. "+ + "Note that deleting blocks immediately can cause query failures, if store gateway still has the block loaded, "+ + "or compactor is ignoring the deletion because it's compacting the block at the same time."). + Default("0s")) m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { @@ -140,9 +146,9 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name } if *repair { - v = verifier.NewWithRepair(logger, bkt, backupBkt, fetcher, issues) + v = verifier.NewWithRepair(logger, reg, bkt, backupBkt, fetcher, time.Duration(*deleteDelay), issues) } else { - v = verifier.New(logger, bkt, fetcher, issues) + v = verifier.New(logger, reg, bkt, fetcher, time.Duration(*deleteDelay), issues) } var idMatcher func(ulid.ULID) bool = nil diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 7d54845e76b..03f15243307 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -126,6 +126,13 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").Int() + deleteDelay := modelDuration(cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket. "+ + "If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+ + "If delete-delay is 0, blocks will be deleted straight away. "+ + "Note that deleting blocks immediately can cause query failures, if store gateway still has the block loaded, "+ + "or compactor is ignoring the deletion because it's compacting the block at the same time."). + Default("48h")) + selectorRelabelConf := regSelectorRelabelFlags(cmd) m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -135,6 +142,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { *dataDir, objStoreConfig, time.Duration(*consistencyDelay), + time.Duration(*deleteDelay), *haltOnError, *acceptMalformedIndex, *wait, @@ -164,6 +172,7 @@ func runCompact( dataDir string, objStoreConfig *extflag.PathOrContent, consistencyDelay time.Duration, + deleteDelay time.Duration, haltOnError bool, acceptMalformedIndex bool, wait bool, @@ -194,6 +203,25 @@ func runCompact( Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total", Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.", }) + blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compactor_blocks_cleaned_total", + Help: "Total number of blocks deleted in compactor.", + }) + blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compactor_block_cleanup_failures_total", + Help: "Failures encountered while deleting blocks in compactor.", + }) + blocksMarkedForDeletion := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compactor_blocks_marked_for_deletion_total", + Help: "Total number of blocks marked for deletion in compactor.", + }) + _ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_delete_delay_seconds", + Help: "Configured delete delay in seconds.", + }, func() float64 { + return deleteDelay.Seconds() + }) + downsampleMetrics := newDownsampleMetrics(reg) httpProbe := prober.NewHTTP() @@ -245,18 +273,22 @@ func runCompact( } }() + // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. + // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second) duplicateBlocksFilter := block.NewDeduplicateFilter() prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, block.NewLabelShardedMetaFilter(relabelConfig).Filter, block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, + ignoreDeletionMarkFilter.Filter, duplicateBlocksFilter.Filter, ) if err != nil { return errors.Wrap(err, "create meta fetcher") } - sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false) + sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false) if err != nil { return errors.Wrap(err, "create syncer") } @@ -290,6 +322,7 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } + blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures) compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency) if err != nil { cancel() @@ -331,11 +364,15 @@ func runCompact( level.Warn(logger).Log("msg", "downsampling was explicitly disabled") } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil { + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil { return errors.Wrap(err, fmt.Sprintf("retention failed")) } - compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts) + if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + return errors.Wrap(err, "error cleaning blocks") + } + + compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion) return nil } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 90ecbce6046..7a4f88a038a 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -84,6 +84,13 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). Default("0s")) + ignoreDeletionMarksDelay := modelDuration(cmd.Flag("ignore-deletion-marks-delay", "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+ + "The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+ + "If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away. "+ + "If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. "+ + "Default is 24h, half of the default value for --delete-delay on compactor."). + Default("24h")) + m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", @@ -120,6 +127,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { *advertiseCompatibilityLabel, *enableIndexHeader, time.Duration(*consistencyDelay), + time.Duration(*ignoreDeletionMarksDelay), ) } } @@ -153,6 +161,7 @@ func runStore( advertiseCompatibilityLabel bool, enableIndexHeader bool, consistencyDelay time.Duration, + ignoreDeletionMarksDelay time.Duration, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() @@ -225,11 +234,13 @@ func runStore( return errors.Wrap(err, "create index cache") } + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay) prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, block.NewLabelShardedMetaFilter(relabelConfig).Filter, block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, + ignoreDeletionMarkFilter.Filter, block.NewDeduplicateFilter().Filter, ) if err != nil { diff --git a/docs/components/bucket.md b/docs/components/bucket.md index 8c2149f40b9..fcb492b30fd 100644 --- a/docs/components/bucket.md +++ b/docs/components/bucket.md @@ -26,11 +26,12 @@ config: Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within `/cmd/thanos/bucket.go`. - ## Deployment + ## Flags -[embedmd]:# (flags/bucket.txt $) +[embedmd]: # "flags/bucket.txt $" + ```$ usage: thanos bucket [] [ ...] @@ -98,7 +99,8 @@ Example: $ thanos bucket web --objstore.config-file="..." ``` -[embedmd]:# (flags/bucket_web.txt) +[embedmd]: # "flags/bucket_web.txt" + ```txt usage: thanos bucket web [] @@ -170,7 +172,8 @@ Example: $ thanos bucket verify --objstore.config-file="..." ``` -[embedmd]:# (flags/bucket_verify.txt) +[embedmd]: # "flags/bucket_verify.txt" + ```txt usage: thanos bucket verify [] @@ -222,7 +225,11 @@ Flags: Block IDs to verify (and optionally repair) only. If none is specified, all blocks will be verified. Repeated field - + --delete-delay=0s Duration after which blocks marked for deletion would be deleted permanently from source bucket by compactor component. + If delete-delay is non zero, blocks will be marked for deletion and compactor component is required to delete blocks from source bucket. + If delete-delay is 0, blocks will be deleted straight away. Use this if you want to get rid of or move the block immediately. + Note that deleting blocks immediately can cause query failures, if store gateway still has the block + loaded, or compactor is ignoring the deletion because it's compacting the block at the same time. ``` ### ls @@ -235,7 +242,8 @@ Example: $ thanos bucket ls -o json --objstore.config-file="..." ``` -[embedmd]:# (flags/bucket_ls.txt) +[embedmd]: # "flags/bucket_ls.txt" + ```txt usage: thanos bucket ls [] @@ -276,11 +284,13 @@ Flags: `bucket inspect` is used to inspect buckets in a detailed way using stdout in ASCII table format. Example: + ``` $ thanos bucket inspect -l environment=\"prod\" --objstore.config-file="..." ``` -[embedmd]:# (flags/bucket_inspect.txt) +[embedmd]: # "flags/bucket_inspect.txt" + ```txt usage: thanos bucket inspect [] diff --git a/docs/components/compact.md b/docs/components/compact.md index 4199daeed2f..bff5f29931b 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -11,8 +11,8 @@ It is generally not semantically concurrency safe and must be deployed as a sing It is also responsible for downsampling of data: -* creating 5m downsampling for blocks larger than **40 hours** (2d, 2w) -* creating 1h downsampling for blocks larger than **10 days** (2w). +- creating 5m downsampling for blocks larger than **40 hours** (2d, 2w) +- creating 1h downsampling for blocks larger than **10 days** (2w). Example: @@ -35,9 +35,9 @@ On-disk data is safe to delete between restarts and should be the first attempt Resolution - distance between data points on your graphs. E.g. -* raw - the same as scrape interval at the moment of data ingestion -* 5m - data point is every 5 minutes -* 1h - data point is every 1h +- raw - the same as scrape interval at the moment of data ingestion +- 5m - data point is every 5 minutes +- 1h - data point is every 1h Keep in mind, that the initial goal of downsampling is not saving disk space (Read further for elaboration on storage space consumption). The goal of downsampling is providing an opportunity to get fast results for range queries of big time intervals like months or years. In other words, if you set `--retention.resolution-raw` less then `--retention.resolution-5m` and `--retention.resolution-1h` - you might run into a problem of not being able to "zoom in" to your historical data. @@ -64,9 +64,18 @@ your Prometheus instances, so that the compactor will be able to group blocks by By _persistent_, we mean that one Prometheus instance must keep the same labels if it restarts, so that the compactor will keep compacting blocks from an instance even when a Prometheus instance goes down for some time. +## Block Deletion + +Depending on the Object Storage provider like S3, GCS, Ceph etc; we can divide the storages into strongly consistent or eventually consistent. +Since there are no consistency guarantees provided by some Object Storage providers, we have to make sure that we have a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. + +In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading +`deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. + ## Flags -[embedmd]:# (flags/compact.txt $) +[embedmd]: # "flags/compact.txt $" + ```$ usage: thanos compact [] @@ -146,5 +155,10 @@ Flags: selecting blocks. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config - + --delete-delay=48h Time before a block marked for deletion is deleted from bucket. + If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. + If delete-delay is 0, blocks will be deleted straight away. + Use this if you want to get rid of or move the block immediately. + Note that deleting blocks immediately can cause query failures, if store gateway still has the block + loaded, or compactor is ignoring the deletion because it's compacting the block at the same time. ``` diff --git a/docs/components/store.md b/docs/components/store.md index f1e900212e1..c24d5288a6d 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -139,7 +139,10 @@ Flags: details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config --consistency-delay=30m Minimum age of all blocks before they are being read. - + --ignore-deletion-marks-delay=24h + Duration after which the blocks marked for deletion will be filtered out while fetching blocks. + The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away. + If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. Default is 24h, half of the default value for --delete-delay on compactor. ``` ## Time based partitioning diff --git a/docs/proposals/201901-read-write-operations-bucket.md b/docs/proposals/201901-read-write-operations-bucket.md index 4658e095e76..caaeebe370c 100644 --- a/docs/proposals/201901-read-write-operations-bucket.md +++ b/docs/proposals/201901-read-write-operations-bucket.md @@ -179,7 +179,7 @@ We schedule deletions instead of doing them straight away for 3 reasons: * In further delete steps, starting with meta.json first ensures integrity mark being deleted first, so in case of deletion process being stopped, we can treat this block as partial block (rule 4th) and delete it gracefully. Along with this, we also store information about when the block was scheduled to be deleted so that it can be deleted at a later point in time. -To do so, we create a file `compactor-meta.json` where we store information about when the block was scheduled to be deleted. +To do so, we create a file `deletion-mark.json` where we store information about when the block was scheduled to be deleted. Storing the information in a file makes it resilient to failures that result in restarts. There might be exception for malformed blocks that blocks compaction or reader operations. Since we may need to unblock the system @@ -189,7 +189,7 @@ immediately the block can be forcibly removed meaning that query failures may oc This is to make sure we don't forcibly remove block which is still loaded on reader side. -We check the `compactor-meta.json` file to identify if the block has to be deleted. After 15 minutes of marking the block to be deleted, we are ok to delete the whole block directory. +We check the `deletion-mark.json` file to identify if the block has to be deleted. After 15 minutes of marking the block to be deleted, we are ok to delete the whole block directory. ## Risks diff --git a/pkg/block/block.go b/pkg/block/block.go index 51b8012974d..62433d0799d 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -6,6 +6,7 @@ package block import ( + "bytes" "context" "encoding/json" "fmt" @@ -14,6 +15,7 @@ import ( "path" "path/filepath" "strings" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -126,6 +128,34 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er return err } +// MarkForDeletion creates a file which stores information about when the block was marked for deletion. +func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error { + deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) + deletionMarkExists, err := objstore.Exists(ctx, bkt, deletionMarkFile) + if err != nil { + return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile) + } + if deletionMarkExists { + return errors.Errorf("file %s already exists in bucket", deletionMarkFile) + } + + deletionMark, err := json.Marshal(metadata.DeletionMark{ + ID: id, + DeletionTime: time.Now().Unix(), + Version: metadata.DeletionMarkVersion1, + }) + if err != nil { + return errors.Wrap(err, "json encode deletion mark") + } + + if err := bkt.Upload(ctx, deletionMarkFile, bytes.NewReader(deletionMark)); err != nil { + return errors.Wrapf(err, "upload file %s to bucket", deletionMarkFile) + } + + level.Info(logger).Log("msg", "block has been marked for deletion", "block", id) + return nil +} + // Delete removes directory that is meant to be block directory. // NOTE: Always prefer this method for deleting blocks. // * We have to delete block's files in the certain order (meta.json first) diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index e6bdfc8ce13..6e2193b5d7a 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -4,7 +4,10 @@ package block import ( + "bytes" "context" + "encoding/json" + "fmt" "io/ioutil" "os" "path" @@ -15,6 +18,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore/inmem" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -218,3 +222,54 @@ func TestDelete(t *testing.T) { testutil.Equals(t, 2, len(bkt.Objects())) } } + +func TestMarkForDeletion(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-block-mark-for-delete") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt := inmem.NewBucket() + { + blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutDeletionMark.String()))) + + testutil.Ok(t, MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithoutDeletionMark)) + exists, err := bkt.Exists(ctx, path.Join(blockWithoutDeletionMark.String(), metadata.DeletionMarkFilename)) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) + } + { + blockWithDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDeletionMark.String()))) + + deletionMark, err := json.Marshal(metadata.DeletionMark{ + ID: blockWithDeletionMark, + DeletionTime: time.Now().Unix(), + Version: metadata.DeletionMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark))) + + err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithDeletionMark) + testutil.NotOk(t, err) + testutil.Equals(t, fmt.Sprintf("file %s already exists in bucket", path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename)), err.Error()) + } +} diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index ae4f3b454b1..2ab188bd0fe 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -53,6 +53,10 @@ const ( timeExcludedMeta = "time-excluded" tooFreshMeta = "too-fresh" duplicateMeta = "duplicate" + + // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, + // but don't have a replacement block yet. + markedForDeletionMeta = "marked-for-deletion" ) func newSyncMetrics(reg prometheus.Registerer) *syncMetrics { @@ -88,6 +92,7 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics { []string{labelExcludedMeta}, []string{timeExcludedMeta}, []string{duplicateMeta}, + []string{markedForDeletionMeta}, ) return &m } @@ -100,9 +105,10 @@ type GaugeLabeled interface { WithLabelValues(lvs ...string) prometheus.Gauge } -type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) +type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) error // MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. +// Not go-routine safe. type MetaFetcher struct { logger log.Logger concurrency int @@ -345,7 +351,9 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. for _, f := range s.filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - f(metas, s.metrics.synced, incompleteView) + if err := f(ctx, metas, s.metrics.synced, incompleteView); err != nil { + return nil, nil, errors.Wrap(err, "filter metas") + } } s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) @@ -362,6 +370,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. var _ MetaFetcherFilter = (&TimePartitionMetaFilter{}).Filter // TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range. +// Not go-routine safe. type TimePartitionMetaFilter struct { minTime, maxTime model.TimeOrDurationValue } @@ -372,7 +381,7 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim } // Filter filters out blocks that are outside of specified time range. -func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { for id, m := range metas { if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { continue @@ -380,11 +389,13 @@ func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, syn synced.WithLabelValues(timeExcludedMeta).Inc() delete(metas, id) } + return nil } var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter // LabelShardedMetaFilter represents struct that allows sharding. +// Not go-routine safe. type LabelShardedMetaFilter struct { relabelConfig []*relabel.Config } @@ -398,7 +409,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet const blockIDLabel = "__block_id" // Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. -func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { var lbls labels.Labels for id, m := range metas { lbls = lbls[:0] @@ -412,9 +423,11 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync delete(metas, id) } } + return nil } // DeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data. +// Not go-routine safe. type DeduplicateFilter struct { duplicateIDs []ulid.ULID } @@ -426,7 +439,7 @@ func NewDeduplicateFilter() *DeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { var wg sync.WaitGroup metasByResolution := make(map[int64][]*metadata.Meta) @@ -448,6 +461,8 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga } wg.Wait() + + return nil } func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) { @@ -476,8 +491,7 @@ func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadat } } -// DuplicateIDs returns slice of block ids -// that are filtered out by DeduplicateFilter. +// DuplicateIDs returns slice of block ids that are filtered out by DeduplicateFilter. func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { return f.duplicateIDs } @@ -527,6 +541,7 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool { } // ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay. +// Not go-routine safe. type ConsistencyDelayMetaFilter struct { logger log.Logger consistencyDelay time.Duration @@ -551,7 +566,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura } // Filter filters out blocks that filters blocks that have are created before a specified consistency delay. -func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { +func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { for id, meta := range metas { // TODO(khyatisoneji): Remove the checks about Thanos Source // by implementing delete delay to fetch metas. @@ -566,4 +581,57 @@ func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, delete(metas, id) } } + + return nil +} + +// IgnoreDeletionMarkFilter is a filter that filters out the blocks that are marked for deletion after a given delay. +// The delay duration is to make sure that the replacement block can be fetched before we filter out the old block. +// Delay is not considered when computing DeletionMarkBlocks map. +// Not go-routine safe. +type IgnoreDeletionMarkFilter struct { + logger log.Logger + delay time.Duration + bkt objstore.BucketReader + deletionMarkMap map[ulid.ULID]*metadata.DeletionMark +} + +// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter. +func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.BucketReader, delay time.Duration) *IgnoreDeletionMarkFilter { + return &IgnoreDeletionMarkFilter{ + logger: logger, + bkt: bkt, + delay: delay, + } +} + +// DeletionMarkBlocks returns block ids that were marked for deletion. +func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark { + return f.deletionMarkMap +} + +// Filter filters out blocks that are marked for deletion after a given delay. +// It also returns the blocks that can be deleted since they were uploaded delay duration before current time. +func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { + f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) + + for id := range metas { + deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String()) + if err == metadata.ErrorDeletionMarkNotFound { + continue + } + if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark { + level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err) + continue + } + if err != nil { + return err + } + f.deletionMarkMap[id] = deletionMark + if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() { + synced.WithLabelValues(markedForDeletionMeta).Inc() + delete(metas, id) + } + } + return nil } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index d2c4c9b251c..3c181886dd4 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -57,11 +57,12 @@ func TestMetaFetcher_Fetch(t *testing.T) { var ulidToDelete ulid.ULID r := prometheus.NewRegistry() - f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) { + f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { if _, ok := metas[ulidToDelete]; ok { synced.WithLabelValues("filtered").Inc() delete(metas, ulidToDelete) } + return nil }) testutil.Ok(t, err) @@ -280,6 +281,9 @@ func TestMetaFetcher_Fetch(t *testing.T) { } func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + relabelContentYaml := ` - action: drop regex: "A" @@ -334,14 +338,16 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { } synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - f.Filter(input, synced, false) - + testutil.Ok(t, f.Filter(ctx, input, synced, false)) testutil.Equals(t, 3.0, promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) testutil.Equals(t, expected, input) } func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + relabelContentYamlFmt := ` - action: hashmod source_labels: ["%s"] @@ -429,8 +435,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { deleted := len(input) - len(expected) synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - f.Filter(input, synced, false) - + testutil.Ok(t, f.Filter(ctx, input, synced, false)) testutil.Equals(t, expected, input) testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) @@ -440,6 +445,9 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { } func TestTimePartitionMetaFilter_Filter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + mint := time.Unix(0, 1*time.Millisecond.Nanoseconds()) maxt := time.Unix(0, 10*time.Millisecond.Nanoseconds()) f := NewTimePartitionMetaFilter(model.TimeOrDurationValue{Time: &mint}, model.TimeOrDurationValue{Time: &maxt}) @@ -490,8 +498,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { } synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - f.Filter(input, synced, false) - + testutil.Ok(t, f.Filter(ctx, input, synced, false)) testutil.Equals(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(timeExcludedMeta))) testutil.Equals(t, expected, input) @@ -503,6 +510,9 @@ type sourcesAndResolution struct { } func TestDeduplicateFilter_Filter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + for _, tcase := range []struct { name string input map[ulid.ULID]*sourcesAndResolution @@ -838,8 +848,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, } } - f.Filter(metas, synced, false) - + testutil.Ok(t, f.Filter(ctx, metas, synced, false)) compareSliceWithMapKeys(t, metas, tcase.expected) testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(synced.WithLabelValues(duplicateMeta))) }); !ok { @@ -890,6 +899,9 @@ func (u *ulidBuilder) ULID(t time.Time) ulid.ULID { } func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + u := &ulidBuilder{} now := time.Now() @@ -944,8 +956,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - f.Filter(input, synced, false) - + testutil.Ok(t, f.Filter(ctx, input, synced, false)) testutil.Equals(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) @@ -970,9 +981,61 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - f.Filter(input, synced, false) - + testutil.Ok(t, f.Filter(ctx, input, synced, false)) testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) } + +func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + now := time.Now() + f := &IgnoreDeletionMarkFilter{ + logger: log.NewNopLogger(), + bkt: bkt, + delay: 48 * time.Hour, + } + + shouldFetch := &metadata.DeletionMark{ + ID: ULID(1), + DeletionTime: now.Add(-15 * time.Hour).Unix(), + Version: 1, + } + + shouldIgnore := &metadata.DeletionMark{ + ID: ULID(2), + DeletionTime: now.Add(-60 * time.Hour).Unix(), + Version: 1, + } + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&shouldFetch)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldFetch.ID.String(), metadata.DeletionMarkFilename), &buf)) + + testutil.Ok(t, json.NewEncoder(&buf).Encode(&shouldIgnore)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnore.ID.String(), metadata.DeletionMarkFilename), &buf)) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(ULID(3).String(), metadata.DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) + + input := map[ulid.ULID]*metadata.Meta{ + ULID(1): {}, + ULID(2): {}, + ULID(3): {}, + ULID(4): {}, + } + + expected := map[ulid.ULID]*metadata.Meta{ + ULID(1): {}, + ULID(3): {}, + ULID(4): {}, + } + + synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + testutil.Ok(t, f.Filter(ctx, input, synced, false)) + testutil.Equals(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(markedForDeletionMeta))) + testutil.Equals(t, expected, input) + }) +} diff --git a/pkg/block/metadata/deletionmark.go b/pkg/block/metadata/deletionmark.go new file mode 100644 index 00000000000..bda97e92c6c --- /dev/null +++ b/pkg/block/metadata/deletionmark.go @@ -0,0 +1,76 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "context" + "encoding/json" + "io/ioutil" + "path" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +const ( + // DeletionMarkFilename is the known json filename to store details about when block is marked for deletion. + DeletionMarkFilename = "deletion-mark.json" + + // DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos. + DeletionMarkVersion1 = 1 +) + +// ErrorDeletionMarkNotFound is the error when deletion-mark.json file is not found. +var ErrorDeletionMarkNotFound = errors.New("deletion-mark.json not found") + +// ErrorUnmarshalDeletionMark is the error when unmarshalling deletion-mark.json file. +// This error can occur because deletion-mark.json has been partially uploaded to block storage +// or the deletion-mark.json file is not a valid json file. +var ErrorUnmarshalDeletionMark = errors.New("unmarshal deletion-mark.json") + +// DeletionMark stores block id and when block was marked for deletion. +type DeletionMark struct { + // ID of the tsdb block. + ID ulid.ULID `json:"id"` + + // DeletionTime is a unix timestamp of when the block was marked to be deleted. + DeletionTime int64 `json:"deletion_time"` + + // Version of the file. + Version int `json:"version"` +} + +// ReadDeletionMark reads the given deletion mark file from /deletion-mark.json in bucket. +func ReadDeletionMark(ctx context.Context, bkt objstore.BucketReader, logger log.Logger, dir string) (*DeletionMark, error) { + deletionMarkFile := path.Join(dir, DeletionMarkFilename) + + r, err := bkt.Get(ctx, deletionMarkFile) + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return nil, ErrorDeletionMarkNotFound + } + return nil, errors.Wrapf(err, "get file: %s", deletionMarkFile) + } + + defer runutil.CloseWithLogOnErr(logger, r, "close bkt deletion-mark reader") + + metaContent, err := ioutil.ReadAll(r) + if err != nil { + return nil, errors.Wrapf(err, "read file: %s", deletionMarkFile) + } + + deletionMark := DeletionMark{} + if err := json.Unmarshal(metaContent, &deletionMark); err != nil { + return nil, errors.Wrapf(ErrorUnmarshalDeletionMark, "file: %s; err: %v", deletionMarkFile, err.Error()) + } + + if deletionMark.Version != DeletionMarkVersion1 { + return nil, errors.Errorf("unexpected deletion-mark file version %d", deletionMark.Version) + } + + return &deletionMark, nil +} diff --git a/pkg/block/metadata/deletionmark_test.go b/pkg/block/metadata/deletionmark_test.go new file mode 100644 index 00000000000..73f632e9af4 --- /dev/null +++ b/pkg/block/metadata/deletionmark_test.go @@ -0,0 +1,78 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore/inmem" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestReadDeletionMark(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-read-deletion-mark") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt := inmem.NewBucket() + { + blockWithoutDeletionMark := ulid.MustNew(uint64(1), nil) + _, err := ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithoutDeletionMark.String())) + + testutil.NotOk(t, err) + testutil.Equals(t, ErrorDeletionMarkNotFound, err) + } + { + blockWithPartialDeletionMark := ulid.MustNew(uint64(2), nil) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithPartialDeletionMark.String(), DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) + _, err = ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithPartialDeletionMark.String())) + + testutil.NotOk(t, err) + testutil.Equals(t, ErrorUnmarshalDeletionMark, errors.Cause(err)) + } + { + blockWithDifferentVersionDeletionMark := ulid.MustNew(uint64(3), nil) + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&DeletionMark{ + ID: blockWithDifferentVersionDeletionMark, + DeletionTime: time.Now().Unix(), + Version: 2, + })) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithDifferentVersionDeletionMark.String(), DeletionMarkFilename), &buf)) + _, err = ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithDifferentVersionDeletionMark.String())) + + testutil.NotOk(t, err) + testutil.Equals(t, "unexpected deletion-mark file version 2", err.Error()) + } + { + blockWithValidDeletionMark := ulid.MustNew(uint64(3), nil) + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&DeletionMark{ + ID: blockWithValidDeletionMark, + DeletionTime: time.Now().Unix(), + Version: 1, + })) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithValidDeletionMark.String(), DeletionMarkFilename), &buf)) + _, err = ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithValidDeletionMark.String())) + + testutil.Ok(t, err) + } +} diff --git a/pkg/compact/blocks_cleaner.go b/pkg/compact/blocks_cleaner.go new file mode 100644 index 00000000000..820346a1bf9 --- /dev/null +++ b/pkg/compact/blocks_cleaner.go @@ -0,0 +1,58 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compact + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" +) + +// BlocksCleaner is a struct that deletes blocks from bucket which are marked for deletion. +type BlocksCleaner struct { + logger log.Logger + ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter + bkt objstore.Bucket + deleteDelay time.Duration + blocksCleaned prometheus.Counter + blockCleanupFailures prometheus.Counter +} + +// NewBlocksCleaner creates a new BlocksCleaner. +func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, deleteDelay time.Duration, blocksCleaned prometheus.Counter, blockCleanupFailures prometheus.Counter) *BlocksCleaner { + return &BlocksCleaner{ + logger: logger, + ignoreDeletionMarkFilter: ignoreDeletionMarkFilter, + bkt: bkt, + deleteDelay: deleteDelay, + blocksCleaned: blocksCleaned, + blockCleanupFailures: blockCleanupFailures, + } +} + +// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to delete the blocks that are marked for deletion. +func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error { + level.Info(s.logger).Log("msg", "started cleaning of blocks marked for deletion") + + deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks() + for _, deletionMark := range deletionMarkMap { + if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() { + if err := block.Delete(ctx, s.logger, s.bkt, deletionMark.ID); err != nil { + s.blockCleanupFailures.Inc() + return errors.Wrap(err, "delete block") + } + s.blocksCleaned.Inc() + level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID) + } + } + + level.Info(s.logger).Log("msg", "cleaning of blocks marked for deletion done") + return nil +} diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go index af681a940e4..14fdfc4d7cb 100644 --- a/pkg/compact/clean.go +++ b/pkg/compact/clean.go @@ -21,7 +21,7 @@ const ( PartialUploadThresholdAge = 2 * 24 * time.Hour ) -func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter) { +func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter, blocksMarkedForDeletion prometheus.Counter) { level.Info(logger).Log("msg", "started cleaning of aborted partial uploads") _, partial, err := fetcher.Fetch(ctx) if err != nil { @@ -41,10 +41,11 @@ func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger } deleteAttempts.Inc() - if err := block.Delete(ctx, logger, bkt, id); err != nil { + if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err) return } + blocksMarkedForDeletion.Inc() level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge) } level.Info(logger).Log("msg", "cleaning of aborted partial uploads done") diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index 90b136be47e..74cfbaaf5ab 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -59,12 +59,18 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001"), &fakeChunk)) deleteAttempts := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, deleteAttempts) + blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, deleteAttempts, blocksMarkedForDeletion) testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts)) exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001")) testutil.Ok(t, err) - testutil.Equals(t, false, exists) + testutil.Equals(t, true, exists) + + exists, err = bkt.Exists(ctx, path.Join(shouldDeleteID.String(), metadata.DeletionMarkFilename)) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) + testutil.Equals(t, 1.0, promtest.ToFloat64(blocksMarkedForDeletion)) exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001")) testutil.Ok(t, err) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index df9f8492abe..7e6040d37b3 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -51,6 +51,7 @@ type Syncer struct { acceptMalformedIndex bool enableVerticalCompaction bool duplicateBlocksFilter *block.DeduplicateFilter + ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter } type syncerMetrics struct { @@ -63,9 +64,10 @@ type syncerMetrics struct { compactionRunsCompleted *prometheus.CounterVec compactionFailures *prometheus.CounterVec verticalCompactions *prometheus.CounterVec + blocksMarkedForDeletion prometheus.Counter } -func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { +func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter) *syncerMetrics { var m syncerMetrics m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -106,25 +108,28 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}) + m.blocksMarkedForDeletion = blocksMarkedForDeletion + return &m } // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion prometheus.Counter, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ - logger: logger, - reg: reg, - bkt: bkt, - fetcher: fetcher, - blocks: map[ulid.ULID]*metadata.Meta{}, - metrics: newSyncerMetrics(reg), - duplicateBlocksFilter: duplicateBlocksFilter, - blockSyncConcurrency: blockSyncConcurrency, - acceptMalformedIndex: acceptMalformedIndex, + logger: logger, + reg: reg, + bkt: bkt, + fetcher: fetcher, + blocks: map[ulid.ULID]*metadata.Meta{}, + metrics: newSyncerMetrics(reg, blocksMarkedForDeletion), + duplicateBlocksFilter: duplicateBlocksFilter, + ignoreDeletionMarkFilter: ignoreDeletionMarkFilter, + blockSyncConcurrency: blockSyncConcurrency, + acceptMalformedIndex: acceptMalformedIndex, // The syncer offers an option to enable vertical compaction, even if it's // not currently used by Thanos, because the compactor is also used by Cortex // which needs vertical compaction. @@ -195,6 +200,7 @@ func (s *Syncer) Groups() (res []*Group, err error) { s.metrics.compactionFailures.WithLabelValues(groupKey), s.metrics.verticalCompactions.WithLabelValues(groupKey), s.metrics.garbageCollectedBlocks, + s.metrics.blocksMarkedForDeletion, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -221,8 +227,19 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { begin := time.Now() - garbageIds := s.duplicateBlocksFilter.DuplicateIDs() - for _, id := range garbageIds { + duplicateIDs := s.duplicateBlocksFilter.DuplicateIDs() + deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks() + + // GarbageIDs contains the duplicateIDs, since these blocks can be replaced with other blocks. + // We also remove ids present in deletionMarkMap since these blocks are already marked for deletion. + garbageIDs := []ulid.ULID{} + for _, id := range duplicateIDs { + if _, exists := deletionMarkMap[id]; !exists { + garbageIDs = append(garbageIDs, id) + } + } + + for _, id := range garbageIDs { if ctx.Err() != nil { return ctx.Err() } @@ -230,14 +247,15 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { // Spawn a new context so we always delete a block in full on shutdown. delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - level.Info(s.logger).Log("msg", "deleting outdated block", "block", id) + level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id) - err := block.Delete(delCtx, s.logger, s.bkt, id) + err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id) cancel() if err != nil { s.metrics.garbageCollectionFailures.Inc() return retry(errors.Wrapf(err, "delete block %s from bucket", id)) } + s.metrics.blocksMarkedForDeletion.Inc() // Immediately update our in-memory state so no further call to SyncMetas is needed // after running garbage collection. @@ -266,6 +284,7 @@ type Group struct { compactionFailures prometheus.Counter verticalCompactions prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter } // newGroup returns a new compaction group. @@ -282,6 +301,7 @@ func newGroup( compactionFailures prometheus.Counter, verticalCompactions prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, + blocksMarkedForDeletion prometheus.Counter, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() @@ -300,6 +320,7 @@ func newGroup( compactionFailures: compactionFailures, verticalCompactions: verticalCompactions, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, } return g, nil } @@ -495,7 +516,7 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...str } // RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. -func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, issue347Err error) error { +func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error { ie, ok := errors.Cause(issue347Err).(Issue347Error) if !ok { return errors.Errorf("Given error is not an issue347 error: %v", issue347Err) @@ -546,10 +567,10 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, defer cancel() // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). - if err := block.Delete(delCtx, logger, bkt, ie.id); err != nil { + if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id); err != nil { return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id) } - + blocksMarkedForDeletion.Inc() return nil } @@ -747,10 +768,11 @@ func (cg *Group) deleteBlock(b string) error { // Spawn a new context so we always delete a block in full on shutdown. delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id) - if err := block.Delete(delCtx, cg.logger, cg.bkt, id); err != nil { + level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id); err != nil { return errors.Wrapf(err, "delete block %s from bucket", id) } + cg.blocksMarkedForDeletion.Inc() return nil } @@ -824,7 +846,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } if IsIssue347Error(err) { - if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil { + if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, err); err == nil { mtx.Lock() finishedAllGroups = false mtx.Unlock() diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 43ce2974341..5520a9a8487 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" @@ -94,7 +95,9 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { ) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, 1, false, false) + blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, 1, false, false) testutil.Ok(t, err) // Do one initial synchronization with the bucket. @@ -103,7 +106,16 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { var rem []ulid.ULID err = bkt.Iter(ctx, "", func(n string) error { - rem = append(rem, ulid.MustParse(n[:len(n)-1])) + id := ulid.MustParse(n[:len(n)-1]) + deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) + + exists, err := bkt.Exists(ctx, deletionMarkFile) + if err != nil { + return err + } + if !exists { + rem = append(rem, id) + } return nil }) testutil.Ok(t, err) @@ -164,13 +176,16 @@ func TestGroup_Compact_e2e(t *testing.T) { reg := prometheus.NewRegistry() + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 48*time.Hour) duplicateBlocksFilter := block.NewDeduplicateFilter() metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, + ignoreDeletionMarkFilter.Filter, duplicateBlocksFilter.Filter, ) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, 5, false, false) + blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, 5, false, false) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) @@ -182,6 +197,7 @@ func TestGroup_Compact_e2e(t *testing.T) { // Compaction on empty should not fail. testutil.Ok(t, bComp.Compact(ctx)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 0, MetricCount(sy.metrics.compactions)) testutil.Equals(t, 0, MetricCount(sy.metrics.compactionRunsStarted)) @@ -270,6 +286,7 @@ func TestGroup_Compact_e2e(t *testing.T) { testutil.Ok(t, bComp.Compact(ctx)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) + testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 4, MetricCount(sy.metrics.compactions)) testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[0].Thanos)))) diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 47f61f0cfbf..eb07507d9d7 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -10,13 +10,14 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/objstore" ) // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime. // A value of 0 disables the retention for its resolution. -func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher block.MetadataFetcher, retentionByResolution map[ResolutionLevel]time.Duration) error { +func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher block.MetadataFetcher, retentionByResolution map[ResolutionLevel]time.Duration, blocksMarkedForDeletion prometheus.Counter) error { level.Info(logger).Log("msg", "start optional retention") metas, _, err := fetcher.Fetch(ctx) if err != nil { @@ -31,10 +32,11 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk maxTime := time.Unix(m.MaxTime/1000, 0) if time.Now().After(maxTime.Add(retentionDuration)) { - level.Info(logger).Log("msg", "applying retention: deleting block", "id", id, "maxTime", maxTime.String()) - if err := block.Delete(ctx, logger, bkt, id); err != nil { + level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String()) + if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { return errors.Wrap(err, "delete block") } + blocksMarkedForDeletion.Inc() } } diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 371d3c4152a..5c9813c1172 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -7,12 +7,16 @@ import ( "bytes" "context" "encoding/json" + "path/filepath" "strings" "testing" "time" "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -244,17 +248,28 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil) testutil.Ok(t, err) - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, tt.retentionByResolution); (err != nil) != tt.wantErr { + blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, tt.retentionByResolution, blocksMarkedForDeletion); (err != nil) != tt.wantErr { t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr) } got := []string{} + gotMarkedBlocksCount := 0.0 testutil.Ok(t, bkt.Iter(context.TODO(), "", func(name string) error { - got = append(got, name) + exists, err := bkt.Exists(ctx, filepath.Join(name, metadata.DeletionMarkFilename)) + if err != nil { + return err + } + if !exists { + got = append(got, name) + return nil + } + gotMarkedBlocksCount += 1.0 return nil })) testutil.Equals(t, got, tt.want) + testutil.Equals(t, gotMarkedBlocksCount, promtest.ToFloat64(blocksMarkedForDeletion)) }) } } diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 3055bec8177..a865ac5c71d 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -342,6 +342,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { } // sync uploads the block if not exists in remote storage. +// TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error. func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { level.Info(s.logger).Log("msg", "upload new block", "id", meta.ULID) diff --git a/pkg/verifier/duplicated_compaction.go b/pkg/verifier/duplicated_compaction.go index c7cd94b4a79..029aa93caff 100644 --- a/pkg/verifier/duplicated_compaction.go +++ b/pkg/verifier/duplicated_compaction.go @@ -27,7 +27,7 @@ const DuplicatedCompactionIssueID = "duplicated_compaction" // until sync-delay passes. // The expected print of this are same overlapped blocks with exactly the same sources, time ranges and stats. // If repair is enabled, all but one duplicates are safely deleted. -func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher) error { +func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { if idMatcher != nil { return errors.Errorf("id matching is not supported by issue %s verifier", DuplicatedCompactionIssueID) } @@ -84,7 +84,7 @@ func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objst } for i, id := range toKill { - if err := BackupAndDelete(ctx, logger, bkt, backupBkt, id); err != nil { + if err := BackupAndDelete(ctx, logger, bkt, backupBkt, id, deleteDelay, metrics.blocksMarkedForDeletion); err != nil { return err } level.Info(logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1, "issue", DuplicatedCompactionIssueID) diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 828be214449..0982ec8ba8f 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -10,6 +10,7 @@ import ( "os" "path" "path/filepath" + "time" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -28,7 +29,7 @@ const IndexIssueID = "index_issue" // If the replacement was created successfully it is uploaded to the bucket and the input // block is deleted. // NOTE: This also verifies all indexes against chunks mismatches and duplicates. -func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher) error { +func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", IndexIssueID) metas, _, err := fetcher.Fetch(ctx) @@ -115,7 +116,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac } level.Info(logger).Log("msg", "safe deleting broken block", "id", id, "issue", IndexIssueID) - if err := BackupAndDeleteDownloaded(ctx, logger, filepath.Join(tmpdir, id.String()), bkt, backupBkt, id); err != nil { + if err := BackupAndDeleteDownloaded(ctx, logger, filepath.Join(tmpdir, id.String()), bkt, backupBkt, id, deleteDelay, metrics.blocksMarkedForDeletion); err != nil { return errors.Wrapf(err, "safe deleting old block %s failed", id) } level.Info(logger).Log("msg", "all good, continuing", "id", id, "issue", IndexIssueID) diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 61e291ac783..3fcce7a1518 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -6,6 +6,7 @@ package verifier import ( "context" "sort" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -21,7 +22,7 @@ const OverlappedBlocksIssueID = "overlapped_blocks" // OverlappedBlocksIssue checks bucket for blocks with overlapped time ranges. // No repair is available for this issue. -func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, _ objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher) error { +func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, _ objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error { if idMatcher != nil { return errors.Errorf("id matching is not supported by issue %s verifier", OverlappedBlocksIssueID) } diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index b6c882cbc98..b124317e290 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -9,11 +9,13 @@ import ( "io/ioutil" "os" "path/filepath" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -32,10 +34,12 @@ func TSDBBlockExistsInBucket(ctx context.Context, bkt objstore.Bucket, id ulid.U } // BackupAndDelete moves a TSDB block to a backup bucket and on success removes -// it from the source bucket. It returns error if block dir already exists in +// it from the source bucket. If deleteDelay is zero, block is removed from source bucket. +// else the block is marked for deletion. +// It returns error if block dir already exists in // the backup bucket (blocks should be immutable) or if any of the operations // fail. -func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objstore.Bucket, id ulid.ULID) error { +func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objstore.Bucket, id ulid.ULID, deleteDelay time.Duration, blocksMarkedForDeletion prometheus.Counter) error { // Does this TSDB block exist in backupBkt already? found, err := TSDBBlockExistsInBucket(ctx, backupBkt, id) if err != nil { @@ -68,20 +72,29 @@ func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objs } // Block uploaded, so we are ok to remove from src bucket. - level.Info(logger).Log("msg", "Deleting block", "id", id.String()) - if err := block.Delete(ctx, logger, bkt, id); err != nil { - return errors.Wrap(err, "delete from source") + if deleteDelay.Seconds() == 0 { + level.Info(logger).Log("msg", "Deleting block", "id", id.String()) + if err := block.Delete(ctx, logger, bkt, id); err != nil { + return errors.Wrap(err, "delete from source") + } + } + + level.Info(logger).Log("msg", "Marking block as deleted", "id", id.String()) + if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { + return errors.Wrap(err, "marking delete from source") } + blocksMarkedForDeletion.Inc() return nil } // BackupAndDeleteDownloaded works much like BackupAndDelete in that it will -// move a TSDB block from a bucket to a backup bucket. The bdir parameter +// move a TSDB block from a bucket to a backup bucket. If deleteDelay param is zero, block is removed from source bucket. +// else the block is marked for deletion. The bdir parameter // points to the location on disk where the TSDB block was previously // downloaded allowing this function to avoid downloading the TSDB block from // the source bucket again. An error is returned if any operation fails. -func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir string, bkt, backupBkt objstore.Bucket, id ulid.ULID) error { +func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir string, bkt, backupBkt objstore.Bucket, id ulid.ULID, deleteDelay time.Duration, blocksMarkedForDeletion prometheus.Counter) error { // Does this TSDB block exist in backupBkt already? found, err := TSDBBlockExistsInBucket(ctx, backupBkt, id) if err != nil { @@ -97,11 +110,19 @@ func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir stri } // Block uploaded, so we are ok to remove from src bucket. - level.Info(logger).Log("msg", "Deleting block", "id", id.String()) - if err := block.Delete(ctx, logger, bkt, id); err != nil { - return errors.Wrap(err, "delete from source") + if deleteDelay.Seconds() == 0 { + level.Info(logger).Log("msg", "Deleting block", "id", id.String()) + if err := block.Delete(ctx, logger, bkt, id); err != nil { + return errors.Wrap(err, "delete from source") + } + return nil } + level.Info(logger).Log("msg", "Marking block as deleted", "id", id.String()) + if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { + return errors.Wrap(err, "marking delete from source") + } + blocksMarkedForDeletion.Inc() return nil } diff --git a/pkg/verifier/verify.go b/pkg/verifier/verify.go index 06f54f97d36..77cd646b1bd 100644 --- a/pkg/verifier/verify.go +++ b/pkg/verifier/verify.go @@ -5,6 +5,7 @@ package verifier import ( "context" + "time" "github.com/thanos-io/thanos/pkg/block" @@ -12,43 +13,66 @@ import ( "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/objstore" ) +type verifierMetrics struct { + blocksMarkedForDeletion prometheus.Counter +} + +func newVerifierMetrics(reg prometheus.Registerer) *verifierMetrics { + var m verifierMetrics + + m.blocksMarkedForDeletion = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_verify_blocks_marked_for_deletion_total", + Help: "Total number of blocks marked for deletion by verify.", + }) + + return &m +} + // Issue is an function that does verification and repair only if repair arg is true. // It should log affected blocks using warn level logs. It should be safe for issue to run on healthy bucket. -type Issue func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher) error +type Issue func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool, fetcher *block.MetaFetcher, deleteDelay time.Duration, metrics *verifierMetrics) error // Verifier runs given issues to verify if bucket is healthy. type Verifier struct { - logger log.Logger - bkt objstore.Bucket - backupBkt objstore.Bucket - issues []Issue - repair bool - fetcher *block.MetaFetcher + logger log.Logger + bkt objstore.Bucket + backupBkt objstore.Bucket + issues []Issue + repair bool + fetcher *block.MetaFetcher + deleteDelay time.Duration + metrics *verifierMetrics } // New returns verifier that only logs affected blocks. -func New(logger log.Logger, bkt objstore.Bucket, fetcher *block.MetaFetcher, issues []Issue) *Verifier { +func New(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deleteDelay time.Duration, issues []Issue) *Verifier { return &Verifier{ - logger: logger, - bkt: bkt, - issues: issues, - fetcher: fetcher, - repair: false, + logger: logger, + bkt: bkt, + issues: issues, + fetcher: fetcher, + repair: false, + deleteDelay: deleteDelay, + metrics: newVerifierMetrics(reg), } } // NewWithRepair returns verifier that logs affected blocks and attempts to repair them. -func NewWithRepair(logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, fetcher *block.MetaFetcher, issues []Issue) *Verifier { +func NewWithRepair(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, backupBkt objstore.Bucket, fetcher *block.MetaFetcher, deleteDelay time.Duration, issues []Issue) *Verifier { return &Verifier{ - logger: logger, - bkt: bkt, - backupBkt: backupBkt, - issues: issues, - fetcher: fetcher, - repair: true, + logger: logger, + bkt: bkt, + backupBkt: backupBkt, + issues: issues, + fetcher: fetcher, + repair: true, + deleteDelay: deleteDelay, + metrics: newVerifierMetrics(reg), } } @@ -67,7 +91,7 @@ func (v *Verifier) Verify(ctx context.Context, idMatcher func(ulid.ULID) bool) e // TODO(blotka): Wrap bucket with BucketWithMetrics and print metrics after each issue (e.g how many blocks where touched). // TODO(bplotka): Implement disk "bucket" to allow this verify to work on local disk space as well. for _, issueFn := range v.issues { - err := issueFn(ctx, v.logger, v.bkt, v.backupBkt, v.repair, idMatcher, v.fetcher) + err := issueFn(ctx, v.logger, v.bkt, v.backupBkt, v.repair, idMatcher, v.fetcher, v.deleteDelay, v.metrics) if err != nil { return errors.Wrap(err, "verify") }