Skip to content

Commit

Permalink
store, compact, bucket: schedule block deletion by adding deletion-ma…
Browse files Browse the repository at this point in the history
…rk.json file

Signed-off-by: khyatisoneji <khyatisoneji5@gmail.com>
  • Loading branch information
khyatisoneji committed Mar 16, 2020
1 parent d23b6a3 commit ef5a32c
Show file tree
Hide file tree
Showing 26 changed files with 732 additions and 112 deletions.
10 changes: 8 additions & 2 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
deleteDelay := modelDuration(cmd.Flag("delete-delay", "Duration after which blocks marked for deletion would be deleted permanently from source bucket by compactor component. "+
"If delete-delay is non zero, blocks will be marked for deletion and compactor component is required to delete blocks from source bucket. "+
"If delete-delay is 0, blocks will be deleted straight away. Use this if you want to get rid of or move the block immediately. "+
"Note that deleting blocks immediately can cause query failures, if store gateway still has the block loaded, "+
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("0s"))
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -139,9 +145,9 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
}

if *repair {
v = verifier.NewWithRepair(logger, bkt, backupBkt, fetcher, issues)
v = verifier.NewWithRepair(logger, reg, bkt, backupBkt, fetcher, time.Duration(*deleteDelay), issues)
} else {
v = verifier.New(logger, bkt, fetcher, issues)
v = verifier.New(logger, reg, bkt, fetcher, time.Duration(*deleteDelay), issues)
}

var idMatcher func(ulid.ULID) bool = nil
Expand Down
43 changes: 40 additions & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

deleteDelay := modelDuration(cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket. "+
"If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
"If delete-delay is 0, blocks will be deleted straight away. "+
"Note that deleting blocks immediately can cause query failures, if store gateway still has the block loaded, "+
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h"))

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -131,6 +138,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*dataDir,
objStoreConfig,
time.Duration(*consistencyDelay),
time.Duration(*deleteDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
Expand Down Expand Up @@ -159,6 +167,7 @@ func runCompact(
dataDir string,
objStoreConfig *extflag.PathOrContent,
consistencyDelay time.Duration,
deleteDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
Expand Down Expand Up @@ -188,6 +197,25 @@ func runCompact(
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_cleaned_total",
Help: "Total number of blocks deleted in compactor.",
})
blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_block_cleanup_failures_total",
Help: "Failures encountered while deleting blocks in compactor.",
})
blocksMarkedForDeletion := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
})
_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_delete_delay_seconds",
Help: "Configured delete delay in seconds.",
}, func() float64 {
return deleteDelay.Seconds()
})

downsampleMetrics := newDownsampleMetrics(reg)

httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -239,18 +267,22 @@ func runCompact(
}
}()

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
duplicateBlocksFilter.Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false)
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -284,6 +316,7 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency)
if err != nil {
cancel()
Expand Down Expand Up @@ -325,11 +358,15 @@ func runCompact(
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

ignoreDeletionMarksDelay := modelDuration(cmd.Flag("ignore-deletion-marks-delay", "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away. "+
"If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. "+
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -120,6 +127,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*advertiseCompatibilityLabel,
*enableIndexHeader,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
)
}
}
Expand Down Expand Up @@ -153,6 +161,7 @@ func runStore(
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -225,11 +234,13 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
block.NewDeduplicateFilter().Filter,
)
if err != nil {
Expand Down
24 changes: 17 additions & 7 deletions docs/components/bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ config:
Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets
by adding a new command within `/cmd/thanos/bucket.go`.


## Deployment

## Flags

[embedmd]:# (flags/bucket.txt $)
[embedmd]: # "flags/bucket.txt $"

```$
usage: thanos bucket [<flags>] <command> [<args> ...]
Expand Down Expand Up @@ -95,7 +96,8 @@ Example:
$ thanos bucket web --objstore.config-file="..."
```

[embedmd]:# (flags/bucket_web.txt)
[embedmd]: # "flags/bucket_web.txt"

```txt
usage: thanos bucket web [<flags>]
Expand Down Expand Up @@ -167,7 +169,8 @@ Example:
$ thanos bucket verify --objstore.config-file="..."
```

[embedmd]:# (flags/bucket_verify.txt)
[embedmd]: # "flags/bucket_verify.txt"

```txt
usage: thanos bucket verify [<flags>]
Expand Down Expand Up @@ -219,7 +222,11 @@ Flags:
Block IDs to verify (and optionally repair) only. If
none is specified, all blocks will be verified.
Repeated field
--delete-delay=0s Duration after which blocks marked for deletion would be deleted permanently from source bucket by compactor component.
If delete-delay is non zero, blocks will be marked for deletion and compactor component is required to delete blocks from source bucket.
If delete-delay is 0, blocks will be deleted straight away. Use this if you want to get rid of or move the block immediately.
Note that deleting blocks immediately can cause query failures, if store gateway still has the block
loaded, or compactor is ignoring the deletion because it's compacting the block at the same time.
```

### ls
Expand All @@ -232,7 +239,8 @@ Example:
$ thanos bucket ls -o json --objstore.config-file="..."
```

[embedmd]:# (flags/bucket_ls.txt)
[embedmd]: # "flags/bucket_ls.txt"

```txt
usage: thanos bucket ls [<flags>]
Expand Down Expand Up @@ -273,11 +281,13 @@ Flags:
`bucket inspect` is used to inspect buckets in a detailed way using stdout in ASCII table format.

Example:

```
$ thanos bucket inspect -l environment=\"prod\" --objstore.config-file="..."
```

[embedmd]:# (flags/bucket_inspect.txt)
[embedmd]: # "flags/bucket_inspect.txt"

```txt
usage: thanos bucket inspect [<flags>]
Expand Down
28 changes: 21 additions & 7 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ It is generally not semantically concurrency safe and must be deployed as a sing

It is also responsible for downsampling of data:

* creating 5m downsampling for blocks larger than **40 hours** (2d, 2w)
* creating 1h downsampling for blocks larger than **10 days** (2w).
- creating 5m downsampling for blocks larger than **40 hours** (2d, 2w)
- creating 1h downsampling for blocks larger than **10 days** (2w).

Example:

Expand All @@ -35,9 +35,9 @@ On-disk data is safe to delete between restarts and should be the first attempt
Resolution - distance between data points on your graphs. E.g.
* raw - the same as scrape interval at the moment of data ingestion
* 5m - data point is every 5 minutes
* 1h - data point is every 1h
- raw - the same as scrape interval at the moment of data ingestion
- 5m - data point is every 5 minutes
- 1h - data point is every 1h
Keep in mind, that the initial goal of downsampling is not saving disk space (Read further for elaboration on storage space consumption). The goal of downsampling is providing an opportunity to get fast results for range queries of big time intervals like months or years. In other words, if you set `--retention.resolution-raw` less then `--retention.resolution-5m` and `--retention.resolution-1h` - you might run into a problem of not being able to "zoom in" to your historical data.

Expand All @@ -64,9 +64,18 @@ your Prometheus instances, so that the compactor will be able to group blocks by
By _persistent_, we mean that one Prometheus instance must keep the same labels if it restarts, so that the compactor will keep
compacting blocks from an instance even when a Prometheus instance goes down for some time.

## Block Deletion

Depending on the Object Storage provider like S3, GCS, Ceph etc; we can divide the storages into strongly consistent or eventually consistent.
Since there are no consistency guarantees provided by some Object Storage providers, we have to make sure that we have a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage.

In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading
`deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

## Flags

[embedmd]:# (flags/compact.txt $)
[embedmd]: # "flags/compact.txt $"

```$
usage: thanos compact [<flags>]
Expand Down Expand Up @@ -144,5 +153,10 @@ Flags:
selecting blocks. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--delete-delay=48h Time before a block marked for deletion is deleted from bucket.
If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket.
If delete-delay is 0, blocks will be deleted straight away.
Use this if you want to get rid of or move the block immediately.
Note that deleting blocks immediately can cause query failures, if store gateway still has the block
loaded, or compactor is ignoring the deletion because it's compacting the block at the same time.
```
5 changes: 4 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ Flags:
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--consistency-delay=30m Minimum age of all blocks before they are being read.

--ignore-deletion-marks-delay=24h
Duration after which the blocks marked for deletion will be filtered out while fetching blocks.
The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away.
If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. Default is 24h, half of the default value for --delete-delay on compactor.
```

## Time based partitioning
Expand Down
4 changes: 2 additions & 2 deletions docs/proposals/201901-read-write-operations-bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ We schedule deletions instead of doing them straight away for 3 reasons:
* In further delete steps, starting with meta.json first ensures integrity mark being deleted first, so in case of deletion process being stopped, we can treat this block as partial block (rule 4th) and delete it gracefully.

Along with this, we also store information about when the block was scheduled to be deleted so that it can be deleted at a later point in time.
To do so, we create a file `compactor-meta.json` where we store information about when the block was scheduled to be deleted.
To do so, we create a file `deletion-mark.json` where we store information about when the block was scheduled to be deleted.
Storing the information in a file makes it resilient to failures that result in restarts.

There might be exception for malformed blocks that blocks compaction or reader operations. Since we may need to unblock the system
Expand All @@ -189,7 +189,7 @@ immediately the block can be forcibly removed meaning that query failures may oc
This is to make sure we don't forcibly remove block which is still loaded on reader side.

We check the `compactor-meta.json` file to identify if the block has to be deleted. After 15 minutes of marking the block to be deleted, we are ok to delete the whole block directory.
We check the `deletion-mark.json` file to identify if the block has to be deleted. After 15 minutes of marking the block to be deleted, we are ok to delete the whole block directory.

## Risks

Expand Down
30 changes: 30 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package block

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -14,6 +15,7 @@ import (
"path"
"path/filepath"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -126,6 +128,34 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
return err
}

// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := objstore.Exists(ctx, bkt, deletionMarkFile)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile)
}
if deletionMarkExists {
return errors.Errorf("file %s already exists in bucket", deletionMarkFile)
}

deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
if err != nil {
return errors.Wrap(err, "json encode deletion mark")
}

if err := bkt.Upload(ctx, deletionMarkFile, bytes.NewReader(deletionMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", deletionMarkFile)
}

level.Info(logger).Log("msg", "block has been marked for deletion", "block", id)
return nil
}

// Delete removes directory that is meant to be block directory.
// NOTE: Always prefer this method for deleting blocks.
// * We have to delete block's files in the certain order (meta.json first)
Expand Down
Loading

0 comments on commit ef5a32c

Please sign in to comment.