Skip to content

Commit

Permalink
Introduce TSDB blocks compactor (#1942)
Browse files Browse the repository at this point in the history
* Experimental compactor component

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Upgraded Thanos dependency

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added TSDB blocks compactor component

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Removed the quit chan from the compactor, since we can just use the context

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed linter

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Little changes after code review

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Splitted newCompactor() signature to multiple lines

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Hide TSDB compactor config from config file reference doc

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Refactoring after review

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jan 21, 2020
1 parent 6774c62 commit a0da2c8
Show file tree
Hide file tree
Showing 14 changed files with 1,877 additions and 102 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ instructions below to upgrade your Postgres.
* [FEATURE] Added readiness probe endpoint`/ready` to queriers. #1934
* [FEATURE] EXPERIMENTAL: Added `/series` API endpoint support with TSDB blocks storage. #1830
* [FEATURE] Added "multi" KV store that can interact with two other KV stores, primary one for all reads and writes, and secondary one, which only receives writes. Primary/secondary store can be modified in runtime via runtime-config mechanism (previously "overrides"). #1749
* [FEATURE] EXPERIMENTAL: Added TSDB blocks `compactor` component, which iterates over users blocks stored in the bucket and compact them according to the configured block ranges. #1942
* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled. #1978
* [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service. #1923
* [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917
Expand Down
263 changes: 263 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package compactor

import (
"context"
"flag"
"fmt"
"path"
"strings"
"sync"
"time"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
)

// Config holds the Compactor config.
type Config struct {
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
DataDir string `yaml:"data_dir"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
CompactionRetries int `yaml:"compaction_retries"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
// it in tests.
retryMinBackoff time.Duration `yaml:"-"`
retryMaxBackoff time.Duration `yaml:"-"`
}

// RegisterFlags registers the Compactor flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.BlockRanges = cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
cfg.retryMinBackoff = 10 * time.Second
cfg.retryMaxBackoff = time.Minute

f.Var(&cfg.BlockRanges, "compactor.block-ranges", "Comma separated list of compaction ranges expressed in the time duration format")
f.DurationVar(&cfg.ConsistencyDelay, "compactor.consistency-delay", 30*time.Minute, fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval))
f.IntVar(&cfg.BlockSyncConcurrency, "compactor.block-sync-concurrency", 20, "Number of goroutines to use when syncing block metadata from object storage")
f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions")
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
}

// Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
type Compactor struct {
compactorCfg Config
storageCfg cortex_tsdb.Config
logger log.Logger

// Underlying compactor used to compact TSDB blocks.
tsdbCompactor tsdb.Compactor

// Client used to run operations on the bucket storing blocks.
bucketClient objstore.Bucket

// Wait group used to wait until the internal go routine completes.
runner sync.WaitGroup

// Context used to run compaction and its cancel function to
// safely interrupt it on shutdown.
ctx context.Context
cancelCtx context.CancelFunc

// Metrics.
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionRunsFailed prometheus.Counter
}

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.Config, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
ctx, cancelCtx := context.WithCancel(context.Background())

bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg, "compactor", logger)
if err != nil {
cancelCtx()
return nil, errors.Wrap(err, "failed to create the bucket client")
}

tsdbCompactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool())
if err != nil {
cancelCtx()
return nil, errors.Wrap(err, "failed to create TSDB compactor")
}

return newCompactor(ctx, cancelCtx, compactorCfg, storageCfg, bucketClient, tsdbCompactor, logger, registerer)
}

func newCompactor(
ctx context.Context,
cancelCtx context.CancelFunc,
compactorCfg Config,
storageCfg cortex_tsdb.Config,
bucketClient objstore.Bucket,
tsdbCompactor tsdb.Compactor,
logger log.Logger,
registerer prometheus.Registerer,
) (*Compactor, error) {
c := &Compactor{
compactorCfg: compactorCfg,
storageCfg: storageCfg,
logger: logger,
bucketClient: bucketClient,
tsdbCompactor: tsdbCompactor,
ctx: ctx,
cancelCtx: cancelCtx,
compactionRunsStarted: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Help: "Total number of compaction runs started.",
}),
compactionRunsCompleted: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_completed_total",
Help: "Total number of compaction runs successfully completed.",
}),
compactionRunsFailed: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_failed_total",
Help: "Total number of compaction runs failed.",
}),
}

// Register metrics.
if registerer != nil {
registerer.MustRegister(c.compactionRunsStarted, c.compactionRunsCompleted, c.compactionRunsFailed)
}

// Start the compactor loop.
c.runner.Add(1)
go c.run()

return c, nil
}

// Shutdown the compactor and waits until done. This may take some time
// if there's a on-going compaction.
func (c *Compactor) Shutdown() {
c.cancelCtx()
c.runner.Wait()
}

func (c *Compactor) run() {
defer c.runner.Done()

// Run an initial compaction before starting the interval.
c.compactUsersWithRetries(c.ctx)

ticker := time.NewTicker(c.compactorCfg.CompactionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.compactUsersWithRetries(c.ctx)
case <-c.ctx.Done():
return
}
}
}

func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
retries := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
})

c.compactionRunsStarted.Inc()

for retries.Ongoing() {
if success := c.compactUsers(ctx); success {
c.compactionRunsCompleted.Inc()
return
}

retries.Wait()
}

c.compactionRunsFailed.Inc()
}

func (c *Compactor) compactUsers(ctx context.Context) bool {
level.Info(c.logger).Log("msg", "discovering users from bucket")
users, err := c.discoverUsers(ctx)
if err != nil {
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
return false
}
level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users))

for _, userID := range users {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if ctx.Err() != nil {
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
return false
}

level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)

if err = c.compactUser(ctx, userID); err != nil {
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
continue
}

level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID)
}

return true
}

func (c *Compactor) compactUser(ctx context.Context, userID string) error {
bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)

syncer, err := compact.NewSyncer(
c.logger,
nil, // TODO(pracucci) we should pass the prometheus registerer, but we would need to inject the user label to each metric, otherwise we have clashing metrics
bucket,
c.compactorCfg.ConsistencyDelay,
c.compactorCfg.BlockSyncConcurrency,
false, // Do not accept malformed indexes
true, // Enable vertical compaction
[]*relabel.Config{})
if err != nil {
return errors.Wrap(err, "failed to create syncer")
}

compactor, err := compact.NewBucketCompactor(
c.logger,
syncer,
c.tsdbCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
bucket,
// No compaction concurrency. Due to how Cortex works we don't
// expect to have multiple block groups per tenant, so setting
// a value higher than 1 would be useless.
1,
)
if err != nil {
return errors.Wrap(err, "failed to create bucket compactor")
}

return compactor.Compact(ctx)
}

func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
var users []string

err := c.bucketClient.Iter(ctx, "", func(entry string) error {
users = append(users, strings.TrimSuffix(entry, "/"))
return nil
})

return users, err
}
Loading

0 comments on commit a0da2c8

Please sign in to comment.