Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into no-null-bucket-blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobcolvin-8451 committed Mar 19, 2020
2 parents caf4f95 + a87ed1b commit fd59e8e
Show file tree
Hide file tree
Showing 66 changed files with 5,175 additions and 289 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ website/docs-pre-processed/

tmp/bin
examples/tmp/

# Ignore the MacOS Trash (DS-Store)
.DS_Store
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2238](https://github.com/thanos-io/thanos/pull/2238) Ruler: Fixed Issue #2204 bug in alert queue signalling filled up queue and alerts were dropped
- [#2231](https://github.com/thanos-io/thanos/pull/2231) Bucket Web - Sort chunks by thanos.downsample.resolution for better grouping
- [#2254](https://github.com/thanos-io/thanos/pull/2254) Bucket: Fix metrics registered multiple times in bucket replicate

### Added

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.

### Changed

- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02

Expand Down
2 changes: 2 additions & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Full list of triage persons is displayed below:
| Ben Ye | `@yeya24` | [@yeya24](https://github.com/yeya24) |
| Martin Chodur | `@FUSAKLA` | [@fusakla](https://github.com/fusakla) |
| Michael Dai | `@jojohappy` | [@jojohappy](https://github.com/jojohappy) |
| Kemal Akkoyun | `@kakkoyun` | [@kakkoyun](https://github.com/kakkoyun) |
| Xiang Dai | `@daixiang0` | [@daixiang0](https://github.com/daixiang0) |

Please reach any of the maintainer on slack or email if you want to help as well.

Expand Down
26 changes: 24 additions & 2 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
registerBucketInspect(m, cmd, name, objStoreConfig)
registerBucketWeb(m, cmd, name, objStoreConfig)
registerBucketReplicate(m, cmd, name, objStoreConfig)
registerBucketDownsample(m, cmd, name, objStoreConfig)
}

func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {
Expand All @@ -84,6 +85,12 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
deleteDelay := modelDuration(cmd.Flag("delete-delay", "Duration after which blocks marked for deletion would be deleted permanently from source bucket by compactor component. "+
"If delete-delay is non zero, blocks will be marked for deletion and compactor component is required to delete blocks from source bucket. "+
"If delete-delay is 0, blocks will be deleted straight away. Use this if you want to get rid of or move the block immediately. "+
"Note that deleting blocks immediately can cause query failures, if store gateway still has the block loaded, "+
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("0s"))
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -139,9 +146,9 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
}

if *repair {
v = verifier.NewWithRepair(logger, bkt, backupBkt, fetcher, issues)
v = verifier.NewWithRepair(logger, reg, bkt, backupBkt, fetcher, time.Duration(*deleteDelay), issues)
} else {
v = verifier.New(logger, bkt, fetcher, issues)
v = verifier.New(logger, reg, bkt, fetcher, time.Duration(*deleteDelay), issues)
}

var idMatcher func(ulid.ULID) bool = nil
Expand Down Expand Up @@ -424,6 +431,21 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na

}

func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {

comp := component.Downsample
cmd := root.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

httpAddr, httpGracePeriod := regHTTPFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()

m[name+" "+comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp)
}
}

// refresh metadata from remote storage periodically and update UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error {
confContentYaml, err := objStoreConfig.Content()
Expand Down
53 changes: 48 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,13 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
retention5m := modelDuration(cmd.Flag("retention.resolution-5m", "How long to retain samples of resolution 1 (5 minutes) in bucket. Setting this to 0d will retain samples of this resolution forever").Default("0d"))
retention1h := modelDuration(cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").Default("0d"))

// TODO(kakkoyun): https://github.com/thanos-io/thanos/issues/2266.
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs. Only works when --wait flag specified.").
Default("5m").Duration()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Hidden().Default("false").Bool()

Expand All @@ -122,6 +126,13 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

deleteDelay := modelDuration(cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket. "+
"If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
"If delete-delay is 0, blocks will be deleted straight away. "+
"Note that deleting blocks immediately can cause query failures, if store gateway still has the block loaded, "+
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h"))

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -131,6 +142,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*dataDir,
objStoreConfig,
time.Duration(*consistencyDelay),
time.Duration(*deleteDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
Expand All @@ -146,6 +158,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*blockSyncConcurrency,
*compactionConcurrency,
selectorRelabelConf,
*waitInterval,
)
}
}
Expand All @@ -159,6 +172,7 @@ func runCompact(
dataDir string,
objStoreConfig *extflag.PathOrContent,
consistencyDelay time.Duration,
deleteDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
Expand All @@ -170,6 +184,7 @@ func runCompact(
blockSyncConcurrency int,
concurrency int,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand All @@ -188,6 +203,25 @@ func runCompact(
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_cleaned_total",
Help: "Total number of blocks deleted in compactor.",
})
blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_block_cleanup_failures_total",
Help: "Failures encountered while deleting blocks in compactor.",
})
blocksMarkedForDeletion := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
})
_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_delete_delay_seconds",
Help: "Configured delete delay in seconds.",
}, func() float64 {
return deleteDelay.Seconds()
})

downsampleMetrics := newDownsampleMetrics(reg)

httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -239,18 +273,22 @@ func runCompact(
}
}()

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
duplicateBlocksFilter.Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false)
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -284,6 +322,7 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency)
if err != nil {
cancel()
Expand Down Expand Up @@ -325,11 +364,15 @@ func runCompact(
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
return nil
}

Expand All @@ -348,7 +391,7 @@ func runCompact(
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
return runutil.Repeat(waitInterval, ctx.Done(), func() error {
err := compactMainFn()
if err == nil {
iterations.Inc()
Expand Down Expand Up @@ -454,7 +497,7 @@ func generateIndexCacheFile(
cachePath := filepath.Join(bdir, block.IndexCacheFilename)
cache := path.Join(meta.ULID.String(), block.IndexCacheFilename)

ok, err := objstore.Exists(ctx, bkt, cache)
ok, err := bkt.Exists(ctx, cache)
if ok {
return nil
}
Expand Down
20 changes: 1 addition & 19 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -31,25 +30,8 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Downsample
cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

httpAddr, httpGracePeriod := regHTTPFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp)
}
}

type DownsampleMetrics struct {
downsamples *prometheus.CounterVec
downsampleFailures *prometheus.CounterVec
Expand All @@ -70,7 +52,7 @@ func newDownsampleMetrics(reg *prometheus.Registry) *DownsampleMetrics {
return m
}

func runDownsample(
func RunDownsample(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
Expand Down
7 changes: 4 additions & 3 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func main() {
registerRule(cmds, app)
registerCompact(cmds, app)
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app)
registerReceive(cmds, app)
registerChecks(cmds, app, "check")

Expand Down Expand Up @@ -183,7 +182,8 @@ func main() {
reloadCh := make(chan struct{}, 1)

if err := cmds[cmd](&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {
level.Error(logger).Log("err", errors.Wrapf(err, "%s command failed", cmd))
// Use %+v for github.com/pkg/errors error to print with stack.
level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "preparing %s command failed", cmd)))
os.Exit(1)
}

Expand All @@ -208,7 +208,8 @@ func main() {
}

if err := g.Run(); err != nil {
level.Error(logger).Log("msg", "running command failed", "err", err)
// Use %+v for github.com/pkg/errors error to print with stack.
level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
os.Exit(1)
}
level.Info(logger).Log("msg", "exiting")
Expand Down
11 changes: 11 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent.").
Default("0s"))

ignoreDeletionMarksDelay := modelDuration(cmd.Flag("ignore-deletion-marks-delay", "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away. "+
"If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. "+
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -120,6 +127,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*advertiseCompatibilityLabel,
*enableIndexHeader,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
)
}
}
Expand Down Expand Up @@ -153,6 +161,7 @@ func runStore(
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -225,11 +234,13 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
block.NewDeduplicateFilter().Filter,
)
if err != nil {
Expand Down
Loading

0 comments on commit fd59e8e

Please sign in to comment.