Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block: precalculate hashes if enabled and use them during compaction (downloading) #3031

Merged
merged 16 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks
- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request.
- [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction
- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does not download the files if they already exist on disk and with the same hash. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared.
- [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support.

### Fixed
Expand Down
28 changes: 20 additions & 8 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func runCompact(
component component.Component,
conf compactConfig,
flagsMap map[string]string,
) error {
) (rerr error) {
deleteDelay := time.Duration(conf.deleteDelay)
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_halted",
Expand Down Expand Up @@ -273,11 +273,16 @@ func runCompact(
}

ctx, cancel := context.WithCancel(context.Background())

defer func() {
if rerr != nil {
cancel()
}
}()
// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool())
if err != nil {
cancel()
return errors.Wrap(err, "create compactor")
}

Expand All @@ -286,9 +291,12 @@ func runCompact(
downsamplingDir = path.Join(conf.dataDir, "downsample")
)

if err := os.RemoveAll(downsamplingDir); err != nil {
cancel()
return errors.Wrap(err, "clean working downsample directory")
if err := os.MkdirAll(compactDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working compact directory")
}

if err := os.MkdirAll(downsamplingDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working downsample directory")
}

grouper := compact.NewDefaultGrouper(
Expand All @@ -299,6 +307,7 @@ func runCompact(
reg,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
metadata.HashFunc(conf.hashFunc),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand All @@ -317,7 +326,6 @@ func runCompact(
conf.compactionConcurrency,
)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
}

Expand Down Expand Up @@ -376,15 +384,15 @@ func runCompact(
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
Expand Down Expand Up @@ -533,6 +541,7 @@ type compactConfig struct {
webConf webConfig
label string
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
}

Expand Down Expand Up @@ -610,6 +619,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&cc.hashFunc, "SHA256", "")

cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd)

cc.webConf.registerFlag(cmd)
Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type shipperConfig struct {
uploadCompacted bool
ignoreBlockSize bool
allowOutOfOrderUpload bool
hashFunc string
}

func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig {
Expand All @@ -140,6 +141,8 @@ func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig
"This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+
"about order.").
Default("false").Hidden().BoolVar(&sc.allowOutOfOrderUpload)
cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&sc.hashFunc, "SHA256", "")
return sc
}

Expand Down
34 changes: 24 additions & 10 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func RunDownsample(
dataDir string,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
hashFunc metadata.HashFunc,
) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -113,7 +114,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -122,7 +123,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -159,15 +160,18 @@ func downsampleBucket(
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean working directory")
}
hashFunc metadata.HashFunc,
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
) (rerr error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
// Leave the downsample directory for inspection if it is a halt error
// or if it is not then so that possibly we would not have to download everything again.
if rerr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it's disk full issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it's not a problem because we have a consistent order of work with #3843 in place +

sort.Slice(res, func(i, j int) bool {
return res[i].Key() < res[j].Key()
})
this exists already. This means that we are sure that after a retry, we are going to restart the work on the same blocks. If the blocks would disappear then accordingly we would get rid of them and then Thanos Compactor can continue its work. Does it make sense, @bwplotka ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍🏽

return
}
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err)
}
Expand Down Expand Up @@ -195,6 +199,15 @@ func downsampleBucket(
}
}

ignoreDirs := []string{}
for ulid := range metas {
ignoreDirs = append(ignoreDirs, ulid.String())
}

if err := runutil.DeleteAll(dir, ignoreDirs...); err != nil {
level.Warn(logger).Log("msg", "failed deleting potentially outdated directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", dir)
}

metasULIDS := make([]ulid.ULID, 0, len(metas))
for k := range metas {
metasULIDS = append(metasULIDS, k)
Expand Down Expand Up @@ -224,7 +237,8 @@ func downsampleBucket(
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {

if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
Expand All @@ -247,7 +261,7 @@ func downsampleBucket(
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 60 min")
}
Expand All @@ -257,7 +271,7 @@ func downsampleBucket(
return nil
}

func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error {
func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64, hashFunc metadata.HashFunc) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())

Expand Down Expand Up @@ -301,7 +315,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu

begin = time.Now()

err = block.Upload(ctx, logger, bkt, resdir)
err = block.Upload(ctx, logger, bkt, resdir, hashFunc)
if err != nil {
return errors.Wrapf(err, "upload downsampled block %s", id)
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
Expand All @@ -42,9 +43,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
Expand All @@ -57,7 +58,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, metadata.NoneFunc))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/thanos-io/thanos/pkg/extkingpin"

Expand Down Expand Up @@ -91,6 +92,9 @@ func registerReceive(app *extkingpin.App) {
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool()

hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()
allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads",
"If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+
Expand Down Expand Up @@ -166,6 +170,7 @@ func registerReceive(app *extkingpin.App) {
time.Duration(*forwardTimeout),
*allowOutOfOrderUpload,
component.Receive,
metadata.HashFunc(*hashFunc),
)
})
}
Expand Down Expand Up @@ -207,6 +212,7 @@ func runReceive(
forwardTimeout time.Duration,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
hashFunc metadata.HashFunc,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive")
Expand Down Expand Up @@ -258,6 +264,7 @@ func runReceive(
tenantLabelName,
bkt,
allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func registerRule(app *extkingpin.App) {
"about order.").
Default("false").Hidden().Bool()

hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand Down Expand Up @@ -216,6 +219,7 @@ func registerRule(app *extkingpin.App) {
*allowOutOfOrderUpload,
*httpMethod,
getFlagsMap(cmd.Flags()),
metadata.HashFunc(*hashFunc),
)
})
}
Expand Down Expand Up @@ -306,6 +310,7 @@ func runRule(
allowOutOfOrderUpload bool,
httpMethod string,
flagsMap map[string]string,
hashFunc metadata.HashFunc,
) error {
metrics := newRuleMetrics(reg)

Expand Down Expand Up @@ -647,7 +652,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload, hashFunc)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func runSidecar(
}

s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,11 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.
httpAddr, httpGracePeriod := extkingpin.RegisterHTTPFlags(cmd)
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample)
return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc))
})
}

Expand Down Expand Up @@ -786,6 +788,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String()
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
Expand Down Expand Up @@ -903,7 +907,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}

level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil {
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
Expand Down
6 changes: 6 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ Flags:
loaded, or compactor is ignoring the deletion
because it's compacting the block at the same
time.
--hash-func= Specify which hash function to use when
calculating the hashes of produced files. If no
function has been specified, it does not happen.
This permits avoiding downloading some files
twice albeit at some performance cost. Possible
values are: "", "SHA256".
--selector.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration that allows selecting blocks. It
Expand Down
Loading