Skip to content

Commit

Permalink
block: precalculate hashes if enabled and use them during compaction
Browse files Browse the repository at this point in the history
Added the possibility to ignore certain directories in
objstore.{Download,DownloadDir}. Do not download files which have the
same hash as in remote object storage. Wire up `--hash-func` so that
writers could specify what hash function to use when uploading. There is
no performance impact if no hash function has been explicitly specified.
Clean up the removal of files logic in Thanos Compact to ensure we do
not remove something that exists on disk already.

Tested manually + new tests cover all of this more or less.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Nov 22, 2020
1 parent 1c2c5bd commit 85c07fa
Show file tree
Hide file tree
Showing 42 changed files with 553 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
### Added

- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--add-hashes`. If enabled, writers calculate SHA256 hashes of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does a smart thing and does not download the files if they already exist on disk. This also means that the data directory passed to Thanos Compact is only *cleared once at boot*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration.

### Fixed

Expand Down
35 changes: 27 additions & 8 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func runCompact(
component component.Component,
conf compactConfig,
flagsMap map[string]string,
) error {
) (rerr error) {
deleteDelay := time.Duration(conf.deleteDelay)
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_halted",
Expand Down Expand Up @@ -266,11 +266,16 @@ func runCompact(
}

ctx, cancel := context.WithCancel(context.Background())

defer func() {
if rerr != nil {
cancel()
}
}()
// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool())
if err != nil {
cancel()
return errors.Wrap(err, "create compactor")
}

Expand All @@ -279,11 +284,23 @@ func runCompact(
downsamplingDir = path.Join(conf.dataDir, "downsample")
)

// Clean-up and create a fresh state at the beginning.
if err := os.RemoveAll(downsamplingDir); err != nil {
cancel()
return errors.Wrap(err, "clean working downsample directory")
}

if err := os.RemoveAll(compactDir); err != nil {
return errors.Wrap(err, "clean working compact directory")
}

if err := os.Mkdir(compactDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working compact directory")
}

if err := os.Mkdir(downsamplingDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working downsample directory")
}

grouper := compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -292,6 +309,7 @@ func runCompact(
reg,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
metadata.HashFunc(conf.hashFunc),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand All @@ -310,7 +328,6 @@ func runCompact(
conf.compactionConcurrency,
)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
}

Expand Down Expand Up @@ -351,11 +368,9 @@ func runCompact(
// Do it once at the beginning to ensure that it runs at least once before
// the main loop.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}
if err := cleanPartialMarked(); err != nil {
cancel()
return errors.Wrap(err, "cleaning partial and marked blocks")
}

Expand All @@ -378,15 +393,15 @@ func runCompact(
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
Expand Down Expand Up @@ -539,6 +554,7 @@ type compactConfig struct {
webConf webConfig
label string
maxBlockIndexSize units.Base2Bytes
hashFunc string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -608,6 +624,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&cc.hashFunc, "SHA256", "")

cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd)

cc.webConf.registerFlag(cmd)
Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type shipperConfig struct {
uploadCompacted bool
ignoreBlockSize bool
allowOutOfOrderUpload bool
hashFunc string
}

func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig {
Expand All @@ -140,6 +141,8 @@ func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig
"This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+
"about order.").
Default("false").Hidden().BoolVar(&sc.allowOutOfOrderUpload)
cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&sc.hashFunc, "SHA256", "")
return sc
}

Expand Down
41 changes: 31 additions & 10 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func RunDownsample(
dataDir string,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
hashFunc metadata.HashFunc,
) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -112,7 +113,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -121,7 +122,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -151,22 +152,37 @@ func RunDownsample(
return nil
}

// removeAllNonMetaDirs removes all subdirectories and all files under the given dir
// that do not correspond to any metas. This is needed in the case when
// the downsampling process resumes without restarting the whole process
// but the blocks do not exist in the remote object storage anymore.
func removeAllNonMetaDirs(metas map[ulid.ULID]*metadata.Meta, dir string) error {
ignoreDirs := []string{}
for ulid := range metas {
ignoreDirs = append(ignoreDirs, ulid.String())
}
return runutil.DeleteAllExceptDirs(dir, ignoreDirs)
}

func downsampleBucket(
ctx context.Context,
logger log.Logger,
metrics *DownsampleMetrics,
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean working directory")
}
hashFunc metadata.HashFunc,
) (rerr error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
// Leave the downsample directory for inspection if it is a halt error
// or if it is not then so that possibly we would not have to download everything again.
if rerr != nil {
return
}
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err)
}
Expand Down Expand Up @@ -194,6 +210,10 @@ func downsampleBucket(
}
}

if err := removeAllNonMetaDirs(metas, dir); err != nil {
level.Warn(logger).Log("msg", "failed deleting potentially outdated directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", dir)
}

for _, m := range metas {
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
Expand All @@ -213,7 +233,8 @@ func downsampleBucket(
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {

if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
Expand All @@ -236,7 +257,7 @@ func downsampleBucket(
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 60 min")
}
Expand All @@ -246,7 +267,7 @@ func downsampleBucket(
return nil
}

func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error {
func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64, hashFunc metadata.HashFunc) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())

Expand Down Expand Up @@ -290,7 +311,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu

begin = time.Now()

err = block.Upload(ctx, logger, bkt, resdir)
err = block.Upload(ctx, logger, bkt, resdir, hashFunc)
if err != nil {
return errors.Wrapf(err, "upload downsampled block %s", id)
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
Expand All @@ -42,9 +43,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
downsample.ResLevel0, false)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
Expand All @@ -57,7 +58,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, metadata.NoneFunc))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -88,6 +89,9 @@ func registerReceive(app *extkingpin.App) {
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool()

hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()
allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads",
"If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+
Expand Down Expand Up @@ -168,6 +172,7 @@ func registerReceive(app *extkingpin.App) {
time.Duration(*forwardTimeout),
*allowOutOfOrderUpload,
component.Receive,
metadata.HashFunc(*hashFunc),
)
})
}
Expand Down Expand Up @@ -207,6 +212,7 @@ func runReceive(
forwardTimeout time.Duration,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
hashFunc metadata.HashFunc,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive")
Expand Down Expand Up @@ -258,6 +264,7 @@ func runReceive(
tenantLabelName,
bkt,
allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func registerRule(app *extkingpin.App) {
"about order.").
Default("false").Hidden().Bool()

hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand Down Expand Up @@ -215,6 +218,7 @@ func registerRule(app *extkingpin.App) {
*allowOutOfOrderUpload,
*httpMethod,
getFlagsMap(cmd.Flags()),
metadata.HashFunc(*hashFunc),
)
})
}
Expand Down Expand Up @@ -305,6 +309,7 @@ func runRule(
allowOutOfOrderUpload bool,
httpMethod string,
flagsMap map[string]string,
hashFunc metadata.HashFunc,
) error {
metrics := newRuleMetrics(reg)

Expand Down Expand Up @@ -646,7 +651,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload, hashFunc)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func runSidecar(
}

s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,11 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.
httpAddr, httpGracePeriod := extkingpin.RegisterHTTPFlags(cmd)
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample)
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc))
})
}

Expand Down
6 changes: 6 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ Flags:
loaded, or compactor is ignoring the deletion
because it's compacting the block at the same
time.
--hash-func= Specify which hash function to use when
calculating the hashes of produced files. If no
function has been specified, it does not happen.
This permits avoiding downloading some files
twice albeit at some performance cost. Possible
values are: "", "SHA256".
--selector.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration that allows selecting blocks. It
Expand Down
Loading

0 comments on commit 85c07fa

Please sign in to comment.