From 06357faad10408d88b5d5c410517389355d46171 Mon Sep 17 00:00:00 2001 From: Andre Branchizio Date: Thu, 11 Mar 2021 14:31:09 -0700 Subject: [PATCH] block: precalculate hashes if enabled and use them during compaction (downloading) (#3031) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * block: precalculate hashes if enabled and use them during compaction Added the possibility to ignore certain directories in objstore.{Download,DownloadDir}. Do not download files which have the same hash as in remote object storage. Wire up `--hash-func` so that writers could specify what hash function to use when uploading. There is no performance impact if no hash function has been explicitly specified. Clean up the removal of files logic in Thanos Compact to ensure we do not remove something that exists on disk already. Tested manually + new tests cover all of this more or less. Signed-off-by: Giedrius Statkevičius * block: expose GatherFileStats and use it Signed-off-by: Giedrius Statkevičius * Revert "block: expose GatherFileStats and use it" This reverts commit 259c70bfaaf85a66a82f9458f4e1c310ad43b1bf. Signed-off-by: Giedrius Statkevičius * block: do not calc hash for dirs, add locks Signed-off-by: Giedrius Statkevičius * docs/tools: update Signed-off-by: Giedrius Statkevičius * shipper: pass s.hashFunc Signed-off-by: Giedrius Statkevičius * Fix according to Bartek's comments Signed-off-by: Giedrius Statkevičius * compact: clean up comment Signed-off-by: Giedrius Statkevičius * block: close with log on error Signed-off-by: Giedrius Statkevičius * *: remove unused FNs Signed-off-by: Giedrius Statkevičius * compact: add e2e test for new hash functionality Signed-off-by: Giedrius Statkevičius * Fix according to Bartek's comments Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 1 + cmd/thanos/compact.go | 28 +++- cmd/thanos/config.go | 3 + cmd/thanos/downsample.go | 34 +++-- cmd/thanos/main_test.go | 7 +- cmd/thanos/receive.go | 7 + cmd/thanos/rule.go | 7 +- cmd/thanos/sidecar.go | 2 +- cmd/thanos/tools_bucket.go | 8 +- docs/components/compact.md | 6 + docs/components/receive.md | 6 + docs/components/rule.md | 6 + docs/components/sidecar.md | 6 + docs/components/tools.md | 12 ++ pkg/block/block.go | 66 ++++++-- pkg/block/block_test.go | 143 +++++++++++++++--- pkg/block/index_test.go | 2 +- pkg/block/indexheader/header_test.go | 12 +- .../indexheader/lazy_binary_reader_test.go | 21 +-- pkg/block/indexheader/reader_pool_test.go | 9 +- pkg/block/metadata/hash.go | 62 ++++++++ pkg/block/metadata/hash_test.go | 32 ++++ pkg/block/metadata/meta.go | 3 + pkg/compact/compact.go | 39 +++-- pkg/compact/compact_e2e_test.go | 10 +- pkg/objstore/objstore.go | 10 +- pkg/promclient/promclient_e2e_test.go | 3 + pkg/receive/multitsdb.go | 4 + pkg/receive/multitsdb_test.go | 3 + pkg/runutil/runutil.go | 39 +++++ pkg/runutil/runutil_test.go | 35 +++++ pkg/shipper/shipper.go | 5 +- pkg/shipper/shipper_e2e_test.go | 4 +- pkg/shipper/shipper_test.go | 8 +- pkg/store/bucket_e2e_test.go | 8 +- pkg/store/bucket_test.go | 32 ++-- pkg/testutil/e2eutil/prometheus.go | 37 ++++- pkg/verifier/index_issue.go | 2 +- pkg/verifier/safe_delete.go | 3 +- test/e2e/compact_test.go | 69 ++++++--- test/e2e/store_gateway_test.go | 11 +- 41 files changed, 654 insertions(+), 151 deletions(-) create mode 100644 pkg/block/metadata/hash.go create mode 100644 pkg/block/metadata/hash_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 40d498afaf7..35620b72184 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks - [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request. - [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction +- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does not download the files if they already exist on disk and with the same hash. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared. - [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 29e00584ffd..142ee0490ec 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -98,7 +98,7 @@ func runCompact( component component.Component, conf compactConfig, flagsMap map[string]string, -) error { +) (rerr error) { deleteDelay := time.Duration(conf.deleteDelay) halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "thanos_compact_halted", @@ -273,11 +273,16 @@ func runCompact( } ctx, cancel := context.WithCancel(context.Background()) + + defer func() { + if rerr != nil { + cancel() + } + }() // Instantiate the compactor with different time slices. Timestamps in TSDB // are in milliseconds. comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool()) if err != nil { - cancel() return errors.Wrap(err, "create compactor") } @@ -286,9 +291,12 @@ func runCompact( downsamplingDir = path.Join(conf.dataDir, "downsample") ) - if err := os.RemoveAll(downsamplingDir); err != nil { - cancel() - return errors.Wrap(err, "clean working downsample directory") + if err := os.MkdirAll(compactDir, os.ModePerm); err != nil { + return errors.Wrap(err, "create working compact directory") + } + + if err := os.MkdirAll(downsamplingDir, os.ModePerm); err != nil { + return errors.Wrap(err, "create working downsample directory") } planner := compact.NewPlanner(logger, levels, noCompactMarkerFilter) @@ -300,6 +308,7 @@ func runCompact( reg, blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), garbageCollectedBlocks, + metadata.HashFunc(conf.hashFunc), planner, ) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures) @@ -319,7 +328,6 @@ func runCompact( conf.compactionConcurrency, ) if err != nil { - cancel() return errors.Wrap(err, "create bucket compactor") } @@ -378,7 +386,7 @@ func runCompact( downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -386,7 +394,7 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } level.Info(logger).Log("msg", "downsampling iterations done") @@ -535,6 +543,7 @@ type compactConfig struct { webConf webConfig label string maxBlockIndexSize units.Base2Bytes + hashFunc string enableVerticalCompaction bool } @@ -612,6 +621,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size."). Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize) + cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").EnumVar(&cc.hashFunc, "SHA256", "") + cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd) cc.webConf.registerFlag(cmd) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index c0ae39bffd3..bfb8b10f68c 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -126,6 +126,7 @@ type shipperConfig struct { uploadCompacted bool ignoreBlockSize bool allowOutOfOrderUpload bool + hashFunc string } func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig { @@ -140,6 +141,8 @@ func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig "This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ "about order."). Default("false").Hidden().BoolVar(&sc.allowOutOfOrderUpload) + cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").EnumVar(&sc.hashFunc, "SHA256", "") return sc } diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 7d6fbff402d..c9187c78c2d 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -62,6 +62,7 @@ func RunDownsample( dataDir string, objStoreConfig *extflag.PathOrContent, comp component.Component, + hashFunc metadata.HashFunc, ) error { confContentYaml, err := objStoreConfig.Content() if err != nil { @@ -113,7 +114,7 @@ func RunDownsample( metrics.downsamples.WithLabelValues(groupKey) metrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -122,7 +123,7 @@ func RunDownsample( if err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -159,15 +160,18 @@ func downsampleBucket( bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string, -) error { - if err := os.RemoveAll(dir); err != nil { - return errors.Wrap(err, "clean working directory") - } + hashFunc metadata.HashFunc, +) (rerr error) { if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } defer func() { + // Leave the downsample directory for inspection if it is a halt error + // or if it is not then so that possibly we would not have to download everything again. + if rerr != nil { + return + } if err := os.RemoveAll(dir); err != nil { level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err) } @@ -195,6 +199,15 @@ func downsampleBucket( } } + ignoreDirs := []string{} + for ulid := range metas { + ignoreDirs = append(ignoreDirs, ulid.String()) + } + + if err := runutil.DeleteAll(dir, ignoreDirs...); err != nil { + level.Warn(logger).Log("msg", "failed deleting potentially outdated directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", dir) + } + metasULIDS := make([]ulid.ULID, 0, len(metas)) for k := range metas { metasULIDS = append(metasULIDS, k) @@ -224,7 +237,8 @@ func downsampleBucket( if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { continue } - if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil { + + if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1, hashFunc); err != nil { metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() return errors.Wrap(err, "downsampling to 5 min") } @@ -247,7 +261,7 @@ func downsampleBucket( if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { continue } - if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil { + if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2, hashFunc); err != nil { metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() return errors.Wrap(err, "downsampling to 60 min") } @@ -257,7 +271,7 @@ func downsampleBucket( return nil } -func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error { +func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64, hashFunc metadata.HashFunc) error { begin := time.Now() bdir := filepath.Join(dir, m.ULID.String()) @@ -301,7 +315,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu begin = time.Now() - err = block.Upload(ctx, logger, bkt, resdir) + err = block.Upload(ctx, logger, bkt, resdir, hashFunc) if err != nil { return errors.Wrapf(err, "upload downsampled block %s", id) } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 127c3236ac7..02928bb4349 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -17,6 +17,7 @@ import ( promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/objstore" @@ -42,9 +43,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { []labels.Labels{{{Name: "a", Value: "1"}}}, 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. labels.Labels{{Name: "e1", Value: "1"}}, - downsample.ResLevel0) + downsample.ResLevel0, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc)) } meta, err := block.DownloadMeta(ctx, logger, bkt, id) @@ -57,7 +58,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metas, _, err := metaFetcher.Fetch(ctx) testutil.Ok(t, err) - testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir)) + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, metadata.NoneFunc)) testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos)))) _, err = os.Stat(dir) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 0b63ac211ab..6d401a7d08b 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extkingpin" @@ -91,6 +92,9 @@ func registerReceive(app *extkingpin.App) { walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool() + hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").Enum("SHA256", "") + ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool() allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads", "If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+ @@ -166,6 +170,7 @@ func registerReceive(app *extkingpin.App) { time.Duration(*forwardTimeout), *allowOutOfOrderUpload, component.Receive, + metadata.HashFunc(*hashFunc), ) }) } @@ -207,6 +212,7 @@ func runReceive( forwardTimeout time.Duration, allowOutOfOrderUpload bool, comp component.SourceStoreAPI, + hashFunc metadata.HashFunc, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive") @@ -258,6 +264,7 @@ func runReceive( tenantLabelName, bkt, allowOutOfOrderUpload, + hashFunc, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 58916bd0a41..f45396dabfd 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -129,6 +129,9 @@ func registerRule(app *extkingpin.App) { "about order."). Default("false").Hidden().Bool() + hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").Enum("SHA256", "") + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { @@ -216,6 +219,7 @@ func registerRule(app *extkingpin.App) { *allowOutOfOrderUpload, *httpMethod, getFlagsMap(cmd.Flags()), + metadata.HashFunc(*hashFunc), ) }) } @@ -306,6 +310,7 @@ func runRule( allowOutOfOrderUpload bool, httpMethod string, flagsMap map[string]string, + hashFunc metadata.HashFunc, ) error { metrics := newRuleMetrics(reg) @@ -647,7 +652,7 @@ func runRule( } }() - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload) + s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload, hashFunc) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 13abbffa6ca..90ecd89e82b 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -270,7 +270,7 @@ func runSidecar( } s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, - conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload) + conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) return runutil.Repeat(30*time.Second, ctx.Done(), func() error { if uploaded, err := s.Sync(ctx); err != nil { diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 5c2f1114bf7..6c46597f248 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -494,9 +494,11 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag. httpAddr, httpGracePeriod := extkingpin.RegisterHTTPFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). Default("./data").String() + hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").Enum("SHA256", "") cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { - return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample) + return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc)) }) } @@ -786,6 +788,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat "WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.") blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings() tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String() + hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").Enum("SHA256", "") dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true) provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool() @@ -903,7 +907,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat } level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID) - if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil { + if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil { return errors.Wrap(err, "upload") } level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID) diff --git a/docs/components/compact.md b/docs/components/compact.md index 8937b359bc6..4fb6c8ae458 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -408,6 +408,12 @@ Flags: loaded, or compactor is ignoring the deletion because it's compacting the block at the same time. + --hash-func= Specify which hash function to use when + calculating the hashes of produced files. If no + function has been specified, it does not happen. + This permits avoiding downloading some files + twice albeit at some performance cost. Possible + values are: "", "SHA256". --selector.relabel-config-file= Path to YAML file that contains relabeling configuration that allows selecting blocks. It diff --git a/docs/components/receive.md b/docs/components/receive.md index 28ca10d19dd..47a71febdc0 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -185,5 +185,11 @@ Flags: --tsdb.no-lockfile Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup. + --hash-func= Specify which hash function to use when + calculating the hashes of produced files. If no + function has been specified, it does not + happen. This permits avoiding downloading some + files twice albeit at some performance cost. + Possible values are: "", "SHA256". ``` diff --git a/docs/components/rule.md b/docs/components/rule.md index 123aa24abb9..6a3a8622692 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -394,6 +394,12 @@ Flags: Interval between DNS resolutions. --query.http-method=POST HTTP method to use when sending queries. Possible options: [GET, POST] + --hash-func= Specify which hash function to use when + calculating the hashes of produced files. If no + function has been specified, it does not + happen. This permits avoiding downloading some + files twice albeit at some performance cost. + Possible values are: "", "SHA256". ``` diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index aad1da3fcf7..0fc691d066d 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -156,6 +156,12 @@ Flags: Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done. + --hash-func= Specify which hash function to use when + calculating the hashes of produced files. If no + function has been specified, it does not + happen. This permits avoiding downloading some + files twice albeit at some performance cost. + Possible values are: "", "SHA256". --min-time=0000-01-01T00:00:00Z Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened diff --git a/docs/components/tools.md b/docs/components/tools.md index af9cb8b33a4..87b3a17b81c 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -596,6 +596,12 @@ Flags: Server. --data-dir="./data" Data directory in which to cache blocks and process downsamplings. + --hash-func= Specify which hash function to use when + calculating the hashes of produced files. If no + function has been specified, it does not happen. + This permits avoiding downloading some files twice + albeit at some performance cost. Possible values + are: "", "SHA256". ``` @@ -732,6 +738,12 @@ Flags: flag). --tmp.dir="/tmp/thanos-rewrite" Working directory for temporary files + --hash-func= Specify which hash function to use when + calculating the hashes of produced files. If no + function has been specified, it does not happen. + This permits avoiding downloading some files + twice albeit at some performance cost. Possible + values are: "", "SHA256". --dry-run Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this. diff --git a/pkg/block/block.go b/pkg/block/block.go index 5ab3f7ea85d..88901682ac6 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -43,14 +43,44 @@ const ( DebugMetas = "debug/metas" ) -// Download downloads directory that is mean to be block directory. +// Download downloads directory that is mean to be block directory. If any of the files +// have a hash calculated in the meta file and it matches with what is in the destination path then +// we do not download it. We always re-download the meta file. func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { - if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil { + if err := os.MkdirAll(dst, 0777); err != nil { + return errors.Wrap(err, "create dir") + } + + if err := objstore.DownloadFile(ctx, logger, bucket, path.Join(id.String(), MetaFilename), path.Join(dst, MetaFilename)); err != nil { + return err + } + m, err := metadata.ReadFromDir(dst) + if err != nil { + return errors.Wrapf(err, "reading meta from %s", dst) + } + + ignoredPaths := []string{MetaFilename} + for _, fl := range m.Thanos.Files { + if fl.Hash == nil || fl.Hash.Func == metadata.NoneFunc || fl.RelPath == "" { + continue + } + actualHash, err := metadata.CalculateHash(filepath.Join(dst, fl.RelPath), fl.Hash.Func, logger) + if err != nil { + level.Info(logger).Log("msg", "failed to calculate hash when downloading; re-downloading", "relPath", fl.RelPath, "err", err) + continue + } + + if fl.Hash.Equal(&actualHash) { + ignoredPaths = append(ignoredPaths, fl.RelPath) + } + } + + if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, ignoredPaths...); err != nil { return err } chunksDir := filepath.Join(dst, ChunksDirname) - _, err := os.Stat(chunksDir) + _, err = os.Stat(chunksDir) if os.IsNotExist(err) { // This can happen if block is empty. We cannot easily upload empty directory, so create one here. return os.Mkdir(chunksDir, os.ModePerm) @@ -68,7 +98,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id // It also verifies basic features of Thanos block. // TODO(bplotka): Ensure bucket operations have reasonable backoff retries. // NOTE: Upload updates `meta.Thanos.File` section. -func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string) error { +func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { df, err := os.Stat(bdir) if err != nil { return err @@ -93,7 +123,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.New("empty external labels are not allowed for Thanos block.") } - meta.Thanos.Files, err = gatherFileStats(bdir) + meta.Thanos.Files, err = gatherFileStats(bdir, hf, logger) if err != nil { return errors.Wrap(err, "gather meta file stats") } @@ -279,26 +309,42 @@ func GetSegmentFiles(blockDir string) []string { } // TODO(bwplotka): Gather stats when dirctly uploading files. -func gatherFileStats(blockDir string) (res []metadata.File, _ error) { +func gatherFileStats(blockDir string, hf metadata.HashFunc, logger log.Logger) (res []metadata.File, _ error) { files, err := ioutil.ReadDir(filepath.Join(blockDir, ChunksDirname)) if err != nil { return nil, errors.Wrapf(err, "read dir %v", filepath.Join(blockDir, ChunksDirname)) } for _, f := range files { - res = append(res, metadata.File{ + mf := metadata.File{ RelPath: filepath.Join(ChunksDirname, f.Name()), SizeBytes: f.Size(), - }) + } + if hf != metadata.NoneFunc && !f.IsDir() { + h, err := metadata.CalculateHash(filepath.Join(blockDir, ChunksDirname, f.Name()), hf, logger) + if err != nil { + return nil, errors.Wrapf(err, "calculate hash %v", filepath.Join(ChunksDirname, f.Name())) + } + mf.Hash = &h + } + res = append(res, mf) } indexFile, err := os.Stat(filepath.Join(blockDir, IndexFilename)) if err != nil { return nil, errors.Wrapf(err, "stat %v", filepath.Join(blockDir, IndexFilename)) } - res = append(res, metadata.File{ + mf := metadata.File{ RelPath: indexFile.Name(), SizeBytes: indexFile.Size(), - }) + } + if hf != metadata.NoneFunc { + h, err := metadata.CalculateHash(filepath.Join(blockDir, IndexFilename), hf, logger) + if err != nil { + return nil, errors.Wrapf(err, "calculate hash %v", indexFile.Name()) + } + mf.Hash = &h + } + res = append(res, mf) metaFile, err := os.Stat(filepath.Join(blockDir, MetaFilename)) if err != nil { diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 72d140f2b91..b18453ee0a5 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -91,32 +91,32 @@ func TestUpload(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String()), os.ModePerm)) { // Wrong dir. - err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "not-existing")) + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "not-existing"), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Assert(t, strings.HasSuffix(err.Error(), "/not-existing: no such file or directory"), "") } { // Wrong existing dir (not a block). - err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test")) + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test"), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Equals(t, "not a block dir: ulid: bad data size when unmarshaling", err.Error()) } { // Empty block dir. - err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "") } e2eutil.Copy(t, path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename)) { // Missing chunks. - err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Assert(t, strings.HasSuffix(err.Error(), "/chunks: no such file or directory"), err.Error()) } @@ -124,7 +124,7 @@ func TestUpload(t *testing.T) { e2eutil.Copy(t, path.Join(tmpDir, b1.String(), ChunksDirname, "000001"), path.Join(tmpDir, "test", b1.String(), ChunksDirname, "000001")) { // Missing index file. - err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Assert(t, strings.HasSuffix(err.Error(), "/index: no such file or directory"), "") } @@ -132,14 +132,14 @@ func TestUpload(t *testing.T) { testutil.Ok(t, os.Remove(path.Join(tmpDir, "test", b1.String(), MetaFilename))) { // Missing meta.json file. - err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "") } e2eutil.Copy(t, path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename)) { // Full block. - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc)) testutil.Equals(t, 4, len(bkt.Objects())) testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) @@ -189,7 +189,7 @@ func TestUpload(t *testing.T) { } { // Test Upload is idempotent. - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc)) testutil.Equals(t, 4, len(bkt.Objects())) testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) @@ -203,9 +203,9 @@ func TestUpload(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, nil, 124) + }, 100, 0, 1000, nil, 124, metadata.NoneFunc) testutil.Ok(t, err) - err = Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String())) + err = Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc) testutil.NotOk(t, err) testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error()) testutil.Equals(t, 4, len(bkt.Objects())) @@ -228,9 +228,9 @@ func TestDelete(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b1.String()))) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc)) testutil.Equals(t, 4, len(bkt.Objects())) markedForDeletion := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"}) @@ -248,9 +248,9 @@ func TestDelete(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()))) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)) testutil.Equals(t, 5, len(bkt.Objects())) // Remove meta.json and check if delete can delete it. @@ -302,12 +302,12 @@ func TestMarkForDeletion(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) tcase.preUpload(t, id, bkt) - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()))) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()), metadata.NoneFunc)) c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, "", c) @@ -358,12 +358,12 @@ func TestMarkForNoCompact(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) tcase.preUpload(t, id, bkt) - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()))) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()), metadata.NoneFunc)) c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) err = MarkForNoCompact(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoCompactReason, "", c) @@ -373,6 +373,105 @@ func TestMarkForNoCompact(t *testing.T) { } } +// TestHashDownload uploads an empty block to in-memory storage +// and tries to download it to the same dir. It should not try +// to download twice. +func TestHashDownload(t *testing.T) { + defer testutil.TolerantVerifyLeak(t) + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-block-download") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + bkt := objstore.NewInMemBucket() + r := prometheus.NewRegistry() + instrumentedBkt := objstore.BucketWithMetrics("test", bkt, r) + + b1, err := e2eutil.CreateBlockWithTombstone(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 42, metadata.SHA256Func) + testutil.Ok(t, err) + + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), instrumentedBkt, path.Join(tmpDir, b1.String()), metadata.SHA256Func)) + testutil.Equals(t, 4, len(bkt.Objects())) + + m, err := DownloadMeta(ctx, log.NewNopLogger(), bkt, b1) + testutil.Ok(t, err) + + for _, fl := range m.Thanos.Files { + if fl.RelPath == MetaFilename { + continue + } + testutil.Assert(t, fl.Hash != nil, "expected a hash for %s but got nil", fl.RelPath) + } + + // Remove the hash from one file to check if we always download it. + m.Thanos.Files[1].Hash = nil + + metaEncoded := strings.Builder{} + testutil.Ok(t, m.Write(&metaEncoded)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(b1.String(), MetaFilename), strings.NewReader(metaEncoded.String()))) + + // Only downloads MetaFile and IndexFile. + { + err = Download(ctx, log.NewNopLogger(), instrumentedBkt, m.ULID, path.Join(tmpDir, b1.String())) + testutil.Ok(t, err) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 2 + thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 2 + thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 4 + `), `thanos_objstore_bucket_operations_total`)) + } + + // Ensures that we always download MetaFile. + { + testutil.Ok(t, os.Remove(path.Join(tmpDir, b1.String(), MetaFilename))) + err = Download(ctx, log.NewNopLogger(), instrumentedBkt, m.ULID, path.Join(tmpDir, b1.String())) + testutil.Ok(t, err) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 4 + thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 4 + thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 4 + `), `thanos_objstore_bucket_operations_total`)) + } + + // Remove chunks => gets redownloaded. + // Always downloads MetaFile. + // Finally, downloads the IndexFile since we have removed its hash. + { + testutil.Ok(t, os.RemoveAll(path.Join(tmpDir, b1.String(), ChunksDirname))) + err = Download(ctx, log.NewNopLogger(), instrumentedBkt, m.ULID, path.Join(tmpDir, b1.String())) + testutil.Ok(t, err) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 7 + thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 6 + thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 4 + `), `thanos_objstore_bucket_operations_total`)) + } +} + func TestUploadCleanup(t *testing.T) { defer testutil.TolerantVerifyLeak(t) @@ -389,13 +488,13 @@ func TestUploadCleanup(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) { errBkt := errBucket{Bucket: bkt, failSuffix: "/index"} - uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String())) + uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc) testutil.Assert(t, errors.Is(uploadErr, errUploadFailed)) // If upload of index fails, block is deleted. @@ -406,7 +505,7 @@ func TestUploadCleanup(t *testing.T) { { errBkt := errBucket{Bucket: bkt, failSuffix: "/meta.json"} - uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String())) + uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc) testutil.Assert(t, errors.Is(uploadErr, errUploadFailed)) // If upload of meta.json fails, nothing is cleaned up. diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go index 7be4e75ea16..6db6554f444 100644 --- a/pkg/block/index_test.go +++ b/pkg/block/index_test.go @@ -33,7 +33,7 @@ func TestRewrite(t *testing.T) { {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}}, - }, 150, 0, 1000, nil, 124) + }, 150, 0, 1000, nil, 124, metadata.NoneFunc) testutil.Ok(t, err) ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename)) diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 56730d309f4..f8c2057f8aa 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -57,10 +57,10 @@ func TestReaders(t *testing.T) { {{Name: "a", Value: "13"}}, {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}}, {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id1.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc)) // Copy block index version 1 for backward compatibility. /* The block here was produced at the commit @@ -89,7 +89,7 @@ func TestReaders(t *testing.T) { Source: metadata.TestSource, }, &m.BlockMeta) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc)) for _, id := range []ulid.ULID{id1, m.ULID} { t.Run(id.String(), func(t *testing.T) { @@ -325,7 +325,7 @@ func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *meta Source: metadata.TestSource, }, &m.BlockMeta) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()))) + testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc)) return m } @@ -400,9 +400,9 @@ func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) { } // Create a block. - id1, err := e2eutil.CreateBlock(ctx, tmpDir, seriesLabels, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + id1, err := e2eutil.CreateBlock(ctx, tmpDir, seriesLabels, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(b, err) - testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()))) + testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc)) // Create an index reader. reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling) diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 886c0516fe5..e34d9c6ecfb 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore/filesystem" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -53,9 +54,9 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) @@ -94,9 +95,9 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) // Write a corrupted index-header for the block. headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) @@ -134,9 +135,9 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) @@ -186,9 +187,9 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) @@ -237,9 +238,9 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 74bd319bd93..3e0e30ee56d 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore/filesystem" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -53,9 +54,9 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) for testName, testData := range tests { t.Run(testName, func(t *testing.T) { @@ -91,9 +92,9 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, nil) defer pool.Close() diff --git a/pkg/block/metadata/hash.go b/pkg/block/metadata/hash.go new file mode 100644 index 00000000000..39f6f98f237 --- /dev/null +++ b/pkg/block/metadata/hash.go @@ -0,0 +1,62 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/runutil" +) + +// HashFunc indicates what type of hash it is. +type HashFunc string + +const ( + // SHA256Func shows that SHA256 has been used to generate the hash. + SHA256Func HashFunc = "SHA256" + // NoneFunc shows that hashes should not be added. Used internally. + NoneFunc HashFunc = "" +) + +// ObjectHash stores the hash of an object in the object storage. +type ObjectHash struct { + Func HashFunc `json:"hashFunc"` + Value string `json:"value"` +} + +// Equal returns true if two hashes are equal. +func (oh *ObjectHash) Equal(other *ObjectHash) bool { + return oh.Value == other.Value +} + +// CalculateHash calculates the hash of the given type. +func CalculateHash(p string, hf HashFunc, logger log.Logger) (ObjectHash, error) { + switch hf { + case SHA256Func: + f, err := os.Open(p) + if err != nil { + return ObjectHash{}, errors.Wrap(err, "opening file") + } + defer runutil.CloseWithLogOnErr(logger, f, "closing %s", p) + + h := sha256.New() + + if _, err := io.Copy(h, f); err != nil { + return ObjectHash{}, errors.Wrap(err, "copying") + } + + return ObjectHash{ + Func: SHA256Func, + Value: hex.EncodeToString(h.Sum(nil)), + }, nil + } + return ObjectHash{}, fmt.Errorf("hash function %v is not supported", hf) + +} diff --git a/pkg/block/metadata/hash_test.go b/pkg/block/metadata/hash_test.go new file mode 100644 index 00000000000..bb07094f8bc --- /dev/null +++ b/pkg/block/metadata/hash_test.go @@ -0,0 +1,32 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestHashSmoke(t *testing.T) { + dir, err := ioutil.TempDir("", "testhash") + testutil.Ok(t, err) + t.Cleanup(func() { os.RemoveAll(dir) }) + f, err := ioutil.TempFile(dir, "hash") + testutil.Ok(t, err) + + _, err = f.Write([]byte("test")) + testutil.Ok(t, err) + + exp := ObjectHash{Func: SHA256Func, Value: "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"} + h, err := CalculateHash(f.Name(), SHA256Func, log.NewNopLogger()) + testutil.Ok(t, err) + testutil.Equals(t, exp, h) + + _, err = CalculateHash(f.Name(), NoneFunc, log.NewNopLogger()) + testutil.NotOk(t, err) +} diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index b47c28eba49..efdd8e0e754 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -116,6 +116,9 @@ type File struct { RelPath string `json:"rel_path"` // SizeBytes is optional (e.g meta.json does not show size). SizeBytes int64 `json:"size_bytes,omitempty"` + + // Hash is an optional hash of this file. Used for potentially avoiding an extra download. + Hash *ObjectHash `json:"hash,omitempty"` } type ThanosDownsample struct { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7c21b04ae18..54472b47fa7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -239,6 +240,7 @@ type DefaultGrouper struct { verticalCompactions *prometheus.CounterVec garbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter + hashFunc metadata.HashFunc planner Planner planned map[ulid.ULID]bool plannedMtx sync.Mutex @@ -253,6 +255,7 @@ func NewDefaultGrouper( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + hashFunc metadata.HashFunc, planner Planner, ) *DefaultGrouper { return &DefaultGrouper{ @@ -282,6 +285,7 @@ func NewDefaultGrouper( }, []string{"group"}), garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, + hashFunc: hashFunc, planner: planner, planned: map[ulid.ULID]bool{}, } @@ -331,6 +335,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.verticalCompactions.WithLabelValues(groupKey), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, + g.hashFunc, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -391,6 +396,7 @@ func (g *DefaultGrouper) splitGroupByPlans(group *Group) ([]*Group, error) { g.verticalCompactions.WithLabelValues(groupKey), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, + g.hashFunc, ) if err != nil { return nil, errors.Wrap(err, "create split compaction group") @@ -430,6 +436,7 @@ type Group struct { verticalCompactions prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter + hashFunc metadata.HashFunc } // NewGroup returns a new compaction group. @@ -449,6 +456,7 @@ func NewGroup( verticalCompactions prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, + hashFunc metadata.HashFunc, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() @@ -469,6 +477,7 @@ func NewGroup( verticalCompactions: verticalCompactions, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, + hashFunc: hashFunc, } return g, nil } @@ -585,7 +594,9 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp subDir := filepath.Join(dir, cg.Key()+"-"+cg.Hash()) defer func() { - if IsHaltError(rerr) { + // Leave the compact directory for inspection if it is a halt error + // or if it is not then so that possibly we would not have to download everything again. + if rerr != nil { return } if err := os.RemoveAll(subDir); err != nil { @@ -593,9 +604,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp } }() - if err := os.RemoveAll(subDir); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") - } if err := os.MkdirAll(subDir, 0777); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } @@ -763,7 +771,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, } level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String())); err != nil { + if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil { return retry(errors.Wrapf(err, "upload of %s failed", resid)) } @@ -904,7 +912,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() - if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { + if err := block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc); err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) @@ -977,7 +985,10 @@ func NewBucketCompactor( // Compact runs compaction over bucket. func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { defer func() { - if IsHaltError(rerr) { + // Do not remove the compactDir if an error has occurred + // because potentially on the next run we would not have to download + // everything again. + if rerr != nil { return } if err := os.RemoveAll(c.compactDir); err != nil { @@ -1030,11 +1041,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { }() } - // Clean up the compaction temporary directory at the beginning of every compaction loop. - if err := os.RemoveAll(c.compactDir); err != nil { - return errors.Wrap(err, "clean up the compaction temporary directory") - } - level.Info(c.logger).Log("msg", "start sync of metas") if err := c.sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync") @@ -1052,6 +1058,15 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { return errors.Wrap(err, "build compaction groups") } + ignoreDirs := []string{} + for _, gr := range groups { + ignoreDirs = append(ignoreDirs, gr.Key()) + } + + if err := runutil.DeleteAll(c.compactDir, ignoreDirs...); err != nil { + level.Warn(c.logger).Log("msg", "failed deleting non-compaction group directories/files, some disk space usage might have leaked. Continuing", "err", err, "dir", c.compactDir) + } + level.Info(c.logger).Log("msg", "start of compactions") // Send all groups found during this pass to the compaction workers. diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 3ee6dca17bd..85c59875099 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -135,7 +135,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, nil) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc, nil) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -199,7 +199,7 @@ func TestGroup_Compact_e2e(t *testing.T) { planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, planner) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc, planner) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2) testutil.Ok(t, err) @@ -419,7 +419,7 @@ func TestGroup_GroupConcurrency_e2e(t *testing.T) { planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) - grouper := NewDefaultGrouper(logger, bkt, false, true, reg, blocksMarkedForDeletion, garbageCollectedBlocks, planner) + grouper := NewDefaultGrouper(logger, bkt, false, true, reg, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc, planner) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2) testutil.Ok(t, err) extLabels := labels.Labels{{Name: "e1", Value: "1"}} @@ -525,7 +525,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( if b.numSamples == 0 { id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) } else { - id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res) + id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) } testutil.Ok(t, err) @@ -533,7 +533,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( testutil.Ok(t, err) metas = append(metas, meta) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()))) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) } return metas } diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 8ac4ed94b2f..6aabf2c5880 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -223,7 +223,7 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } // DownloadDir downloads all object found in the directory into the local directory. -func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) error { +func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, ignoredPaths ...string) error { if err := os.MkdirAll(dst, 0777); err != nil { return errors.Wrap(err, "create dir") } @@ -231,7 +231,13 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, var downloadedFiles []string if err := bkt.Iter(ctx, src, func(name string) error { if strings.HasSuffix(name, DirDelim) { - return DownloadDir(ctx, logger, bkt, name, filepath.Join(dst, filepath.Base(name))) + return DownloadDir(ctx, logger, bkt, originalSrc, name, filepath.Join(dst, filepath.Base(name)), ignoredPaths...) + } + for _, ignoredPath := range ignoredPaths { + if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { + level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) + return nil + } } if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { return err diff --git a/pkg/promclient/promclient_e2e_test.go b/pkg/promclient/promclient_e2e_test.go index 4e60b816fd7..254fa987849 100644 --- a/pkg/promclient/promclient_e2e_test.go +++ b/pkg/promclient/promclient_e2e_test.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -92,6 +93,7 @@ func TestSnapshot_e2e(t *testing.T) { timestamp.FromTime(now.Add(-4*time.Hour)), nil, 0, + metadata.NoneFunc, ) testutil.Ok(t, err) @@ -162,6 +164,7 @@ func TestQueryRange_e2e(t *testing.T) { timestamp.FromTime(now), nil, 0, + metadata.NoneFunc, ) testutil.Ok(t, err) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index ed297db7c87..13240aa9aef 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -42,6 +42,7 @@ type MultiTSDB struct { mtx *sync.RWMutex tenants map[string]*tenant allowOutOfOrderUpload bool + hashFunc metadata.HashFunc } // NewMultiTSDB creates new MultiTSDB. @@ -55,6 +56,7 @@ func NewMultiTSDB( tenantLabelName string, bucket objstore.Bucket, allowOutOfOrderUpload bool, + hashFunc metadata.HashFunc, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() @@ -71,6 +73,7 @@ func NewMultiTSDB( tenantLabelName: tenantLabelName, bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, + hashFunc: hashFunc, } } @@ -293,6 +296,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant metadata.ReceiveSource, false, t.allowOutOfOrderUpload, + t.hashFunc, ) } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 97b17551762..8e6eeeaaf06 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "golang.org/x/sync/errgroup" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -42,6 +43,7 @@ func TestMultiTSDB(t *testing.T) { "tenant_id", nil, false, + metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -109,6 +111,7 @@ func TestMultiTSDB(t *testing.T) { "tenant_id", nil, false, + metadata.NoneFunc, ) defer func() { testutil.Ok(t, m.Close()) }() diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 3f817a62ee4..97171bce82a 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -54,6 +54,7 @@ import ( "io" "io/ioutil" "os" + "path/filepath" "time" "github.com/go-kit/kit/log" @@ -158,3 +159,41 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a .. *err = merr.Err() } + +// DeleteAll deletes all files and directories inside the given +// dir except for the ignoreDirs directories. +func DeleteAll(dir string, ignoreDirs ...string) error { + entries, err := ioutil.ReadDir(dir) + if err != nil { + return errors.Wrap(err, "read dir") + } + var groupErrs errutil.MultiError + + for _, d := range entries { + if !d.IsDir() { + if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { + groupErrs.Add(err) + } + continue + } + + var found bool + for _, id := range ignoreDirs { + if id == d.Name() { + found = true + break + } + } + + if !found { + if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil { + groupErrs.Add(err) + } + } + } + + if groupErrs != nil { + return errors.Wrap(groupErrs.Err(), "delete file/dir") + } + return nil +} diff --git a/pkg/runutil/runutil_test.go b/pkg/runutil/runutil_test.go index 7cad147b80f..908a3398a3d 100644 --- a/pkg/runutil/runutil_test.go +++ b/pkg/runutil/runutil_test.go @@ -5,7 +5,9 @@ package runutil import ( "io" + "io/ioutil" "os" + "path/filepath" "strings" "testing" @@ -122,3 +124,36 @@ func TestCloseMoreThanOnce(t *testing.T) { CloseWithLogOnErr(lc, r, "should be called") testutil.Equals(t, true, lc.WasCalled) } + +func TestClearsDirectoriesFilesProperly(t *testing.T) { + dir, err := ioutil.TempDir("", "example") + testutil.Ok(t, err) + + t.Cleanup(func() { + os.RemoveAll(dir) + }) + + f, err := os.Create(filepath.Join(dir, "test123")) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8"), os.ModePerm)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8"), os.ModePerm)) + f, err = os.Create(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + testutil.Ok(t, DeleteAll(dir, "01EHBQRN4RF0HSRR1772KW0TN9", "01EHBQRN4RF0HSRR1772KW0TN8")) + + _, err = os.Stat(filepath.Join(dir, "test123")) + testutil.Assert(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN9")) + testutil.Assert(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW1TN8/")) + testutil.Assert(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(dir, "01EHBQRN4RF0HSRR1772KW0TN8/")) + testutil.Ok(t, err) +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index df66622774b..345a2df32ee 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -81,6 +81,7 @@ type Shipper struct { uploadCompacted bool allowOutOfOrderUploads bool + hashFunc metadata.HashFunc } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -95,6 +96,7 @@ func New( source metadata.SourceType, uploadCompacted bool, allowOutOfOrderUploads bool, + hashFunc metadata.HashFunc, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -112,6 +114,7 @@ func New( source: source, allowOutOfOrderUploads: allowOutOfOrderUploads, uploadCompacted: uploadCompacted, + hashFunc: hashFunc, } } @@ -363,7 +366,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { if err := meta.WriteToDir(s.logger, updir); err != nil { return errors.Wrap(err, "write meta file") } - return block.Upload(ctx, s.logger, s.bucket, updir) + return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc) } // blockMetasFromOldest returns the block meta of each block found in dir diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index f76ea5f76ea..0395ac3833e 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -223,7 +223,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 5d1a3615dfd..595d8a3ecf5 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -32,7 +32,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -129,7 +129,7 @@ func TestIterBlockMetas(t *testing.T) { }, }.WriteToDir(log.NewNopLogger(), path.Join(dir, id3.String()))) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) metas, err := shipper.blockMetasFromOldest() testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { @@ -164,7 +164,7 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) _, err = shipper.blockMetasFromOldest() testutil.Ok(b, err) @@ -180,7 +180,7 @@ func TestShipperAddsSegmentFiles(t *testing.T) { inmemory := objstore.NewInMemBucket() lbls := []labels.Label{{Name: "test", Value: "test"}} - s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false) + s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false, metadata.NoneFunc) id := ulid.MustNew(1, nil) blockDir := path.Join(dir, id.String()) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 1d646746e96..990d5c1e5aa 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -90,9 +90,9 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o // Create two blocks per time slot. Only add 10 samples each so only one chunk // gets created each. This way we can easily verify we got 10 chunks per series below. - id1, err := e2eutil.CreateBlock(ctx, dir, series[:4], 10, mint, maxt, extLset, 0) + id1, err := e2eutil.CreateBlock(ctx, dir, series[:4], 10, mint, maxt, extLset, 0, metadata.NoneFunc) testutil.Ok(t, err) - id2, err := e2eutil.CreateBlock(ctx, dir, series[4:], 10, mint, maxt, extLset, 0) + id2, err := e2eutil.CreateBlock(ctx, dir, series[4:], 10, mint, maxt, extLset, 0, metadata.NoneFunc) testutil.Ok(t, err) dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) @@ -103,8 +103,8 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o meta.Thanos.Labels = map[string]string{"ext2": "value2"} testutil.Ok(t, meta.WriteToDir(logger, dir2)) - testutil.Ok(t, block.Upload(ctx, logger, bkt, dir1)) - testutil.Ok(t, block.Upload(ctx, logger, bkt, dir2)) + testutil.Ok(t, block.Upload(ctx, logger, bkt, dir1, metadata.NoneFunc)) + testutil.Ok(t, block.Upload(ctx, logger, bkt, dir2, metadata.NoneFunc)) testutil.Ok(t, os.RemoveAll(dir1)) testutil.Ok(t, os.RemoveAll(dir2)) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 1d092a73f18..ddad30d5d41 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -636,21 +636,21 @@ func TestBucketStore_Sharding(t *testing.T) { bkt := objstore.NewInMemBucket() series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} - id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0) + id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()), metadata.NoneFunc)) - id2, err := e2eutil.CreateBlock(ctx, dir, series, 10, 1000, 2000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0) + id2, err := e2eutil.CreateBlock(ctx, dir, series, 10, 1000, 2000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id2.String()))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id2.String()), metadata.NoneFunc)) - id3, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "b"}, {Name: "region", Value: "r1"}}, 0) + id3, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "b"}, {Name: "region", Value: "r1"}}, 0, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id3.String()))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id3.String()), metadata.NoneFunc)) - id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r2"}}, 0) + id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r2"}}, 0, metadata.NoneFunc) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id4.String()))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id4.String()), metadata.NoneFunc)) if ok := t.Run("new_runs", func(t *testing.T) { testSharding(t, "", bkt, id1, id2, id3, id4) @@ -1051,7 +1051,7 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in Source: metadata.TestSource, }, nil) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()))) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) return id } @@ -1248,7 +1248,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer testutil.Ok(t, err) testutil.Ok(t, meta.WriteToDir(logger, filepath.Join(blockDir, id.String()))) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()))) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) } ibkt := objstore.WithNoopInstr(bkt) @@ -1424,7 +1424,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()))) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) b1 = &bucketBlock{ indexCache: indexCache, @@ -1463,7 +1463,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()))) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) b2 = &bucketBlock{ indexCache: indexCache, @@ -1726,7 +1726,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { instrBkt := objstore.WithNoopInstr(bkt) logger := log.NewNopLogger() - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()))) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil) @@ -1872,7 +1872,7 @@ func TestBlockWithLargeChunks(t *testing.T) { logger := log.NewNopLogger() instrBkt := objstore.WithNoopInstr(bkt) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, b.String()))) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, b.String()), metadata.NoneFunc)) // Instance a real bucket store we'll use to query the series. fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil) @@ -2273,7 +2273,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { blockMeta, err := metadata.InjectThanos(logger, filepath.Join(tmpDir, blockID.String()), thanosMeta, nil) testutil.Ok(b, err) - testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) // Create a chunk pool with buckets between 1KB and 32KB. chunkPool, err := pool.NewBucketedBytes(1024, 32*1024, 2, 1e10) @@ -2336,7 +2336,7 @@ func BenchmarkBlockSeries(b *testing.B) { blockMeta, err := metadata.InjectThanos(logger, filepath.Join(tmpDir, blockID.String()), thanosMeta, nil) testutil.Ok(b, err) - testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()))) + testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) // Create chunk pool and partitioner using the same production settings. chunkPool, err := NewDefaultChunkBytesPool(64 * 1024 * 1024 * 1024) diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 59f0552384a..42ca211aaa9 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -343,8 +343,9 @@ func CreateBlock( mint, maxt int64, extLset labels.Labels, resolution int64, + hashFunc metadata.HashFunc, ) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false) + return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc) } // CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block. @@ -356,8 +357,9 @@ func CreateBlockWithTombstone( mint, maxt int64, extLset labels.Labels, resolution int64, + hashFunc metadata.HashFunc, ) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true) + return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true, hashFunc) } // CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each. @@ -372,8 +374,9 @@ func CreateBlockWithBlockDelay( blockDelay time.Duration, extLset labels.Labels, resolution int64, + hashFunc metadata.HashFunc, ) (ulid.ULID, error) { - blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false) + blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc) if err != nil { return ulid.ULID{}, errors.Wrap(err, "block creation") } @@ -407,6 +410,7 @@ func createBlock( extLset labels.Labels, resolution int64, tombstones bool, + hashFunc metadata.HashFunc, ) (id ulid.ULID, err error) { headOpts := tsdb.DefaultHeadOptions() headOpts.ChunkDirRoot = filepath.Join(dir, "chunks") @@ -476,10 +480,37 @@ func createBlock( } blockDir := filepath.Join(dir, id.String()) + + files := []metadata.File{} + if hashFunc != metadata.NoneFunc { + paths := []string{} + if err := filepath.Walk(blockDir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + paths = append(paths, path) + return nil + }); err != nil { + return id, errors.Wrapf(err, "walking %s", dir) + } + + for _, p := range paths { + pHash, err := metadata.CalculateHash(p, metadata.SHA256Func, log.NewNopLogger()) + if err != nil { + return id, errors.Wrapf(err, "calculating hash of %s", blockDir+p) + } + files = append(files, metadata.File{ + RelPath: strings.TrimPrefix(p, blockDir+"/"), + Hash: &pHash, + }) + } + } + if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource, + Files: files, }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index e42def264d8..e19d51a44fc 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -111,7 +111,7 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool } level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String())); err != nil { + if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil { return errors.Wrapf(err, "upload of %s failed", resid) } diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index 3d3704ae38a..afdba79519c 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -15,6 +15,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -135,7 +136,7 @@ func backupDownloaded(ctx context.Context, logger log.Logger, bdir string, backu // Upload the on disk TSDB block. level.Info(logger).Log("msg", "Uploading block to backup bucket", "id", id.String()) - if err := block.Upload(ctx, logger, backupBkt, bdir); err != nil { + if err := block.Upload(ctx, logger, backupBkt, bdir, metadata.NoneFunc); err != nil { return errors.Wrap(err, "upload to backup") } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 560558775a5..a1e206f08b6 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -28,6 +28,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/s3" @@ -58,13 +59,14 @@ type blockDesc struct { maxt int64 markedForNoCompact bool + hashFunc metadata.HashFunc } -func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration) (ulid.ULID, error) { +func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration, hf metadata.HashFunc) (ulid.ULID, error) { if delay == 0*time.Second { - return e2eutil.CreateBlock(ctx, dir, b.series, 120, b.mint, b.maxt, b.extLset, 0) + return e2eutil.CreateBlock(ctx, dir, b.series, 120, b.mint, b.maxt, b.extLset, 0, hf) } - return e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0) + return e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0, hf) } func TestCompactWithStoreGateway(t *testing.T) { @@ -78,6 +80,8 @@ func TestCompactWithStoreGateway(t *testing.T) { now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") testutil.Ok(t, err) + var blocksWithHashes []ulid.ULID + // Simulate real scenario, including more complex cases like overlaps if needed. // TODO(bwplotka): Add blocks to downsample and test delayed delete. blocks := []blockDesc{ @@ -104,10 +108,11 @@ func TestCompactWithStoreGateway(t *testing.T) { blocks = append(blocks, // Non overlapping blocks, ready for compaction. blockDesc{ - series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, - extLset: labels.FromStrings("case", "compaction-ready", "replica", "1"), - mint: timestamp.FromTime(now), - maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("case", "compaction-ready", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + hashFunc: metadata.SHA256Func, }, blockDesc{ series: []labels.Labels{labels.FromStrings("a", "1", "b", "3")}, @@ -345,13 +350,17 @@ func TestCompactWithStoreGateway(t *testing.T) { rawBlockIDs := map[ulid.ULID]struct{}{} for _, b := range blocks { - id, err := b.Create(ctx, dir, justAfterConsistencyDelay) + id, err := b.Create(ctx, dir, justAfterConsistencyDelay, b.hashFunc) testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) rawBlockIDs[id] = struct{}{} if b.markedForNoCompact { testutil.Ok(t, block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) } + + if b.hashFunc != metadata.NoneFunc { + blocksWithHashes = append(blocksWithHashes, id) + } } { // On top of that, add couple of other tricky cases with different meta. @@ -363,33 +372,33 @@ func TestCompactWithStoreGateway(t *testing.T) { } // New Partial block. - id, err := malformedBase.Create(ctx, dir, 0*time.Second) + id, err := malformedBase.Create(ctx, dir, 0*time.Second, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // New Partial block + deletion mark. - id, err = malformedBase.Create(ctx, dir, 0*time.Second) + id, err = malformedBase.Create(ctx, dir, 0*time.Second, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after consistency delay. - id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after consistency delay + deletion mark. - id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after consistency delay + old deletion mark ready to be deleted. - id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) deletionMark, err := json.Marshal(metadata.DeletionMark{ @@ -403,13 +412,13 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after delete threshold. - id, err = malformedBase.Create(ctx, dir, 50*time.Hour) + id, err = malformedBase.Create(ctx, dir, 50*time.Hour, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after delete threshold + deletion mark. - id, err = malformedBase.Create(ctx, dir, 50*time.Hour) + id, err = malformedBase.Create(ctx, dir, 50*time.Hour, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) @@ -571,7 +580,21 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, s.Stop(c)) } - t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) { + // Sequential because we want to check that Thanos Compactor does not + // touch files it does not need to. + // Dedup enabled; compactor should work as expected. + { + // Predownload block dirs with hashes. We should not try downloading them again. + p := filepath.Join(s.SharedDir(), "data", "compact", "working") + + for _, id := range blocksWithHashes { + m, err := block.DownloadMeta(ctx, logger, bkt, id) + testutil.Ok(t, err) + + delete(m.Thanos.Labels, "replica") + testutil.Ok(t, block.Download(ctx, logger, bkt, id, filepath.Join(p, "compact", compact.DefaultGroupKey(m.Thanos), id.String()))) + } + // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") testutil.Ok(t, err) @@ -601,6 +624,18 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_halted")) + + bucketMatcher, err := labels.NewMatcher(labels.MatchEqual, "bucket", bucket) + testutil.Ok(t, err) + operationMatcher, err := labels.NewMatcher(labels.MatchEqual, "operation", "get") + testutil.Ok(t, err) + testutil.Ok(t, c.WaitSumMetricsWithOptions(e2e.Equals(478), + []string{"thanos_objstore_bucket_operations_total"}, e2e.WithLabelMatchers( + bucketMatcher, + operationMatcher, + )), + ) + // Make sure compactor does not modify anything else over time. testutil.Ok(t, s.Stop(c)) @@ -619,7 +654,7 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+6-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) - }) + } t.Run("dedup enabled; no delete delay; compactor should work and remove things as expected", func(t *testing.T) { c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica", "--delete-delay=0s") diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 332b54eadfd..1cf4d61341f 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/s3" @@ -78,13 +79,13 @@ func TestStoreGateway(t *testing.T) { t.Cleanup(cancel) now := time.Now() - id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0) + id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) testutil.Ok(t, err) - id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0) + id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0, metadata.NoneFunc) testutil.Ok(t, err) - id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0) + id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0, metadata.NoneFunc) testutil.Ok(t, err) - id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0) + id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0, metadata.NoneFunc) testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout) bkt, err := s3.NewBucketWithConfig(l, s3.Config{ @@ -182,7 +183,7 @@ func TestStoreGateway(t *testing.T) { testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(4+1), "thanos_bucket_store_series_blocks_queried")) }) t.Run("upload block id5, similar to id1", func(t *testing.T) { - id5, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset4, 0) + id5, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset4, 0, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String()))