Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block: precalculate hashes if enabled and use them during compaction (downloading) #3031

Merged
merged 16 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
### Added

- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does a smart thing and does not download the files if they already exist on disk. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.

### Fixed
Expand Down
33 changes: 27 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func runCompact(
component component.Component,
conf compactConfig,
flagsMap map[string]string,
) error {
) (rerr error) {
deleteDelay := time.Duration(conf.deleteDelay)
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_halted",
Expand Down Expand Up @@ -273,11 +273,16 @@ func runCompact(
}

ctx, cancel := context.WithCancel(context.Background())

defer func() {
if rerr != nil {
cancel()
}
}()
// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool())
if err != nil {
cancel()
return errors.Wrap(err, "create compactor")
}

Expand All @@ -286,11 +291,23 @@ func runCompact(
downsamplingDir = path.Join(conf.dataDir, "downsample")
)

// Clean-up and create a fresh state at the beginning.
if err := os.RemoveAll(downsamplingDir); err != nil {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
cancel()
return errors.Wrap(err, "clean working downsample directory")
}

if err := os.RemoveAll(compactDir); err != nil {
return errors.Wrap(err, "clean working compact directory")
}

if err := os.Mkdir(compactDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working compact directory")
}

if err := os.Mkdir(downsamplingDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working downsample directory")
}

grouper := compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -299,6 +316,7 @@ func runCompact(
reg,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
metadata.HashFunc(conf.hashFunc),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand All @@ -317,7 +335,6 @@ func runCompact(
conf.compactionConcurrency,
)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
}

Expand Down Expand Up @@ -376,15 +393,15 @@ func runCompact(
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
Expand Down Expand Up @@ -532,6 +549,7 @@ type compactConfig struct {
webConf webConfig
label string
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
}

Expand Down Expand Up @@ -607,6 +625,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&cc.hashFunc, "SHA256", "")

cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd)

cc.webConf.registerFlag(cmd)
Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type shipperConfig struct {
uploadCompacted bool
ignoreBlockSize bool
allowOutOfOrderUpload bool
hashFunc string
}

func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig {
Expand All @@ -140,6 +141,8 @@ func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig
"This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+
"about order.").
Default("false").Hidden().BoolVar(&sc.allowOutOfOrderUpload)
cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&sc.hashFunc, "SHA256", "")
return sc
}

Expand Down
41 changes: 31 additions & 10 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func RunDownsample(
dataDir string,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
hashFunc metadata.HashFunc,
) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -112,7 +113,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -121,7 +122,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -151,22 +152,37 @@ func RunDownsample(
return nil
}

// removeAllNonMetaDirs removes all subdirectories and all files under the given dir
// that do not correspond to any metas. This is needed in the case when
// the downsampling process resumes without restarting the whole process
// but the blocks do not exist in the remote object storage anymore.
func removeAllNonMetaDirs(metas map[ulid.ULID]*metadata.Meta, dir string) error {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
ignoreDirs := []string{}
for ulid := range metas {
ignoreDirs = append(ignoreDirs, ulid.String())
}
return runutil.DeleteAllExceptDirs(dir, ignoreDirs)
}

func downsampleBucket(
ctx context.Context,
logger log.Logger,
metrics *DownsampleMetrics,
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean working directory")
}
hashFunc metadata.HashFunc,
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
) (rerr error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
// Leave the downsample directory for inspection if it is a halt error
// or if it is not then so that possibly we would not have to download everything again.
if rerr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it's disk full issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it's not a problem because we have a consistent order of work with #3843 in place +

sort.Slice(res, func(i, j int) bool {
return res[i].Key() < res[j].Key()
})
this exists already. This means that we are sure that after a retry, we are going to restart the work on the same blocks. If the blocks would disappear then accordingly we would get rid of them and then Thanos Compactor can continue its work. Does it make sense, @bwplotka ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍🏽

return
}
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err)
}
Expand Down Expand Up @@ -194,6 +210,10 @@ func downsampleBucket(
}
}

if err := removeAllNonMetaDirs(metas, dir); err != nil {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(logger).Log("msg", "failed deleting potentially outdated directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", dir)
}

for _, m := range metas {
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
Expand All @@ -213,7 +233,8 @@ func downsampleBucket(
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {

if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
Expand All @@ -236,7 +257,7 @@ func downsampleBucket(
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 60 min")
}
Expand All @@ -246,7 +267,7 @@ func downsampleBucket(
return nil
}

func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error {
func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64, hashFunc metadata.HashFunc) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())

Expand Down Expand Up @@ -290,7 +311,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu

begin = time.Now()

err = block.Upload(ctx, logger, bkt, resdir)
err = block.Upload(ctx, logger, bkt, resdir, hashFunc)
if err != nil {
return errors.Wrapf(err, "upload downsampled block %s", id)
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
Expand All @@ -42,9 +43,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
downsample.ResLevel0, false)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
Expand All @@ -57,7 +58,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, metadata.NoneFunc))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -88,6 +89,9 @@ func registerReceive(app *extkingpin.App) {
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool()

hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()
allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads",
"If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+
Expand Down Expand Up @@ -168,6 +172,7 @@ func registerReceive(app *extkingpin.App) {
time.Duration(*forwardTimeout),
*allowOutOfOrderUpload,
component.Receive,
metadata.HashFunc(*hashFunc),
)
})
}
Expand Down Expand Up @@ -207,6 +212,7 @@ func runReceive(
forwardTimeout time.Duration,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
hashFunc metadata.HashFunc,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive")
Expand Down Expand Up @@ -258,6 +264,7 @@ func runReceive(
tenantLabelName,
bkt,
allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func registerRule(app *extkingpin.App) {
"about order.").
Default("false").Hidden().Bool()

hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand Down Expand Up @@ -215,6 +218,7 @@ func registerRule(app *extkingpin.App) {
*allowOutOfOrderUpload,
*httpMethod,
getFlagsMap(cmd.Flags()),
metadata.HashFunc(*hashFunc),
)
})
}
Expand Down Expand Up @@ -305,6 +309,7 @@ func runRule(
allowOutOfOrderUpload bool,
httpMethod string,
flagsMap map[string]string,
hashFunc metadata.HashFunc,
) error {
metrics := newRuleMetrics(reg)

Expand Down Expand Up @@ -646,7 +651,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload, hashFunc)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func runSidecar(
}

s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,11 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.
httpAddr, httpGracePeriod := extkingpin.RegisterHTTPFlags(cmd)
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample)
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc))
})
}

Expand Down Expand Up @@ -775,6 +777,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String()
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
Expand Down Expand Up @@ -892,7 +896,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}

level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil {
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
Expand Down
Loading