From 254c9d0c3b3a6b1800ed24a69392972c55a39677 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 5 Jul 2022 16:18:18 -0700 Subject: [PATCH 01/13] Parallel Chunks Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- cmd/thanos/compact.go | 4 ++ docs/components/compact.md | 3 ++ pkg/block/block.go | 4 +- pkg/compact/compact.go | 14 ++++- pkg/compact/compact_e2e_test.go | 4 +- pkg/compact/compact_test.go | 6 +-- pkg/objstore/objstore.go | 92 ++++++++++++++++++++++++++------- 7 files changed, 101 insertions(+), 26 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 18c2be2104..03e871ad35 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -349,6 +349,7 @@ func runCompact( compactMetrics.garbageCollectedBlocks, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), metadata.HashFunc(conf.hashFunc), + conf.blockFetchConcurrency, ) tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter) planner := compact.WithLargeTotalIndexSizeFilter( @@ -630,6 +631,7 @@ type compactConfig struct { waitInterval time.Duration disableDownsampling bool blockMetaFetchConcurrency int + blockFetchConcurrency int blockViewerSyncBlockInterval time.Duration blockViewerSyncBlockTimeout time.Duration cleanupBlocksInterval time.Duration @@ -688,6 +690,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) + cmd.Flag("block-fetch-concurrency", "Number of goroutines to use when fetching block files from object storage."). + Default("1").IntVar(&cc.blockFetchConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) cmd.Flag("block-viewer.global.sync-block-timeout", "Maximum time for syncing the blocks between local and remote view for /global Block Viewer UI."). diff --git a/docs/components/compact.md b/docs/components/compact.md index fdc34c7c0d..7e36a26b5d 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -279,6 +279,9 @@ usage: thanos compact [] Continuously compacts blocks in an object store bucket. Flags: + --block-fetch-concurrency=1 + Number of goroutines to use when fetching block + files from object storage. --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. diff --git a/pkg/block/block.go b/pkg/block/block.go index fa2b670626..35b9efb581 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -45,7 +45,7 @@ const ( // Download downloads directory that is mean to be block directory. If any of the files // have a hash calculated in the meta file and it matches with what is in the destination path then // we do not download it. We always re-download the meta file. -func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error { +func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string, options ...objstore.DownloadDirOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } @@ -74,7 +74,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id } } - if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, ignoredPaths...); err != nil { + if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, append(options, objstore.WithDownloadIgnoredPaths(ignoredPaths...))...); err != nil { return err } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 6465f16927..88ecbca0c1 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -232,6 +232,7 @@ type DefaultGrouper struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc + blockFetchConcurrency int } // NewDefaultGrouper makes a new DefaultGrouper. @@ -245,6 +246,7 @@ func NewDefaultGrouper( garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, + blockFetchConcurrency int, ) *DefaultGrouper { return &DefaultGrouper{ bkt: bkt, @@ -275,6 +277,7 @@ func NewDefaultGrouper( garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, hashFunc: hashFunc, + blockFetchConcurrency: blockFetchConcurrency, } } @@ -304,6 +307,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, g.hashFunc, + g.blockFetchConcurrency, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -342,6 +346,7 @@ type Group struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc + blockFetchConcurrency int } // NewGroup returns a new compaction group. @@ -362,10 +367,16 @@ func NewGroup( blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, + blockFetchConcurrency int, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() } + + if blockFetchConcurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), blockFetchConcurrency level must be > 0", blockFetchConcurrency) + } + g := &Group{ logger: logger, bkt: bkt, @@ -383,6 +394,7 @@ func NewGroup( blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, hashFunc: hashFunc, + blockFetchConcurrency: blockFetchConcurrency, } return g, nil } @@ -1007,7 +1019,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir) + err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFetchConcurrency)) return err }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 9501fc120c..40cbcbde25 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -139,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 1) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -214,7 +214,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Ok(t, err) planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 1) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true) testutil.Ok(t, err) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index adf9201b3e..7af7b2ec6a 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -210,7 +210,7 @@ func TestRetentionProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "") + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1) type groupedResult map[string]float64 @@ -376,7 +376,7 @@ func TestCompactProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "") + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1) for _, tcase := range []struct { testName string @@ -498,7 +498,7 @@ func TestDownsampleProgressCalculate(t *testing.T) { var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"}) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "") + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1) for _, tcase := range []struct { testName string diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index c2f3c12643..a6ed4a2f6e 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -12,6 +12,7 @@ import ( "path" "path/filepath" "strings" + "sync" "time" "github.com/go-kit/log" @@ -19,6 +20,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -119,6 +121,39 @@ func ApplyIterOptions(options ...IterOption) IterParams { return out } +// DownloadOption configures the provided params. +type DownloadDirOption func(params *DownloadDirParams) + +// DownloadParams holds the Download() parameters and is used by objstore clients implementations. +type DownloadDirParams struct { + concurrency int + ignoredPaths []string +} + +// WithDownloadIgnoredPaths is an option to set the paths to not be downloaded. +func WithDownloadIgnoredPaths(ignoredPaths ...string) DownloadDirOption { + return func(params *DownloadDirParams) { + params.ignoredPaths = ignoredPaths + } +} + +// WithFetchConcurrency is an option to set the concurrency of the download operation. +func WithFetchConcurrency(concurrency int) DownloadDirOption { + return func(params *DownloadDirParams) { + params.concurrency = concurrency + } +} + +func ApplyDownloadOptions(options ...DownloadDirOption) DownloadDirParams { + out := DownloadDirParams{ + concurrency: 1, + } + for _, opt := range options { + opt(&out) + } + return out +} + type ObjectAttributes struct { // Size is the object size in bytes. Size int64 `json:"size"` @@ -254,34 +289,55 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } // DownloadDir downloads all object found in the directory into the local directory. -func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, ignoredPaths ...string) error { +func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadDirOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } + opts := ApplyDownloadOptions(options...) + + g, ctx := errgroup.WithContext(ctx) + guard := make(chan struct{}, opts.concurrency) var downloadedFiles []string - if err := bkt.Iter(ctx, src, func(name string) error { - dst := filepath.Join(dst, filepath.Base(name)) - if strings.HasSuffix(name, DirDelim) { - if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, ignoredPaths...); err != nil { + var m sync.Mutex + + err := bkt.Iter(ctx, src, func(name string) error { + guard <- struct{}{} + g.Go(func() error { + defer func() { <-guard }() + dst := filepath.Join(dst, filepath.Base(name)) + if strings.HasSuffix(name, DirDelim) { + if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil { + return err + } + m.Lock() + defer m.Unlock() + downloadedFiles = append(downloadedFiles, dst) + return nil + } + for _, ignoredPath := range opts.ignoredPaths { + if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { + level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) + return nil + } + } + if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { return err } + + m.Lock() + defer m.Unlock() downloadedFiles = append(downloadedFiles, dst) return nil - } - for _, ignoredPath := range ignoredPaths { - if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { - level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) - return nil - } - } - if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { - return err - } - - downloadedFiles = append(downloadedFiles, dst) + }) return nil - }); err != nil { + }) + + if err == nil { + err = g.Wait() + } + + if err != nil { downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory. // Best-effort cleanup if the download failed. for _, f := range downloadedFiles { From 21abb113c151dd90a2f42c8b0ef272e9775ab112 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 14:57:15 -0700 Subject: [PATCH 02/13] test Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- pkg/objstore/objstore_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index cd683028cc..24bf08fad8 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "io" + "io/ioutil" "os" "testing" @@ -92,6 +93,20 @@ func TestTracingReader(t *testing.T) { testutil.Equals(t, int64(11), size) } +func TestDownloadDirConcurrency(t *testing.T) { + m := BucketWithMetrics("", NewInMemBucket(), nil) + tempDir := t.TempDir() + + testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) + testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) + testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) + + testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10))) + i, err := ioutil.ReadDir(tempDir) + testutil.Ok(t, err) + testutil.Assert(t, len(i) == 3) +} + func TestTimingTracingReader(t *testing.T) { m := BucketWithMetrics("", NewInMemBucket(), nil) r := bytes.NewReader([]byte("hello world")) From 87ba903d9c23b17f02af5d3381f96bc6cd3306f1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 15:08:09 -0700 Subject: [PATCH 03/13] Changelog Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa61f45647..e69be8843c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: Export metrics regarding size of remote write requests. - [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. - [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard. +- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-fetch-concurrency` allowing to configure number of go routines for download block files during compaction. ### Changed From d02489b9ab7dfb1ad73f40f38b65a17a64ac1697 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 16:39:43 -0700 Subject: [PATCH 04/13] making ApplyDownloadOptions private Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- pkg/objstore/objstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index a6ed4a2f6e..b271f37fdb 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -144,7 +144,7 @@ func WithFetchConcurrency(concurrency int) DownloadDirOption { } } -func ApplyDownloadOptions(options ...DownloadDirOption) DownloadDirParams { +func applyDownloadOptions(options ...DownloadDirOption) DownloadDirParams { out := DownloadDirParams{ concurrency: 1, } @@ -293,7 +293,7 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } - opts := ApplyDownloadOptions(options...) + opts := applyDownloadOptions(options...) g, ctx := errgroup.WithContext(ctx) guard := make(chan struct{}, opts.concurrency) From 618c68b336341237f6f69fa95f6d6ae658afaaa3 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 20:40:15 -0700 Subject: [PATCH 05/13] upload concurrency Signed-off-by: alanprot Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- cmd/thanos/compact.go | 8 ++--- docs/components/compact.md | 7 ++-- pkg/block/block.go | 12 +++---- pkg/compact/compact.go | 28 +++++++-------- pkg/objstore/objstore.go | 74 +++++++++++++++++++++++++++++--------- 5 files changed, 86 insertions(+), 43 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 03e871ad35..c23189ad3b 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -349,7 +349,7 @@ func runCompact( compactMetrics.garbageCollectedBlocks, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), metadata.HashFunc(conf.hashFunc), - conf.blockFetchConcurrency, + conf.blockFilesConcurrency, ) tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter) planner := compact.WithLargeTotalIndexSizeFilter( @@ -631,7 +631,7 @@ type compactConfig struct { waitInterval time.Duration disableDownsampling bool blockMetaFetchConcurrency int - blockFetchConcurrency int + blockFilesConcurrency int blockViewerSyncBlockInterval time.Duration blockViewerSyncBlockTimeout time.Duration cleanupBlocksInterval time.Duration @@ -690,8 +690,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) - cmd.Flag("block-fetch-concurrency", "Number of goroutines to use when fetching block files from object storage."). - Default("1").IntVar(&cc.blockFetchConcurrency) + cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage."). + Default("1").IntVar(&cc.blockFilesConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) cmd.Flag("block-viewer.global.sync-block-timeout", "Maximum time for syncing the blocks between local and remote view for /global Block Viewer UI."). diff --git a/docs/components/compact.md b/docs/components/compact.md index 7e36a26b5d..e86c698fd3 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -279,9 +279,10 @@ usage: thanos compact [] Continuously compacts blocks in an object store bucket. Flags: - --block-fetch-concurrency=1 - Number of goroutines to use when fetching block - files from object storage. + --block-files-concurrency=1 + Number of goroutines to use when + fetching/uploading block files from object + storage. --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. diff --git a/pkg/block/block.go b/pkg/block/block.go index 35b9efb581..a7f4c51127 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -94,21 +94,21 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id // Upload uploads a TSDB block to the object storage. It verifies basic // features of Thanos block. -func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { - return upload(ctx, logger, bkt, bdir, hf, true) +func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadDirOption) error { + return upload(ctx, logger, bkt, bdir, hf, true, options...) } // UploadPromBlock uploads a TSDB block to the object storage. It assumes // the block is used in Prometheus so it doesn't check Thanos external labels. -func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { - return upload(ctx, logger, bkt, bdir, hf, false) +func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadDirOption) error { + return upload(ctx, logger, bkt, bdir, hf, false, options...) } // upload uploads block from given block dir that ends with block id. // It makes sure cleanup is done on error to avoid partial block uploads. // TODO(bplotka): Ensure bucket operations have reasonable backoff retries. // NOTE: Upload updates `meta.Thanos.File` section. -func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error { +func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadDirOption) error { df, err := os.Stat(bdir) if err != nil { return err @@ -145,7 +145,7 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "encode meta file") } - if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil { + if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil { return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks")) } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 88ecbca0c1..eb3398ab5f 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -232,7 +232,7 @@ type DefaultGrouper struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc - blockFetchConcurrency int + blockFilesConcurrency int } // NewDefaultGrouper makes a new DefaultGrouper. @@ -246,7 +246,7 @@ func NewDefaultGrouper( garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, - blockFetchConcurrency int, + blockFilesConcurrency int, ) *DefaultGrouper { return &DefaultGrouper{ bkt: bkt, @@ -277,7 +277,7 @@ func NewDefaultGrouper( garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, hashFunc: hashFunc, - blockFetchConcurrency: blockFetchConcurrency, + blockFilesConcurrency: blockFilesConcurrency, } } @@ -307,7 +307,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, g.hashFunc, - g.blockFetchConcurrency, + g.blockFilesConcurrency, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -346,7 +346,7 @@ type Group struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc - blockFetchConcurrency int + blockFilesConcurrency int } // NewGroup returns a new compaction group. @@ -367,14 +367,14 @@ func NewGroup( blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, - blockFetchConcurrency int, + blockFilesConcurrency int, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() } - if blockFetchConcurrency <= 0 { - return nil, errors.Errorf("invalid concurrency level (%d), blockFetchConcurrency level must be > 0", blockFetchConcurrency) + if blockFilesConcurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), blockFilesConcurrency level must be > 0", blockFilesConcurrency) } g := &Group{ @@ -394,7 +394,7 @@ func NewGroup( blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, hashFunc: hashFunc, - blockFetchConcurrency: blockFetchConcurrency, + blockFilesConcurrency: blockFilesConcurrency, } return g, nil } @@ -913,7 +913,7 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada } // RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. -func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error { +func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, g *Group, issue347Err error) error { ie, ok := errors.Cause(issue347Err).(Issue347Error) if !ok { return errors.Errorf("Given error is not an issue347 error: %v", issue347Err) @@ -953,7 +953,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, } level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil { + if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc, objstore.WithUploadConcurrency(g.blockFilesConcurrency)); err != nil { return retry(errors.Wrapf(err, "upload of %s failed", resid)) } @@ -1019,7 +1019,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFetchConcurrency)) + err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) return err }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { @@ -1121,7 +1121,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { - err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc) + err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) return err }) if err != nil { @@ -1245,7 +1245,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { } if IsIssue347Error(err) { - if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, err); err == nil { + if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, g, err); err == nil { mtx.Lock() finishedAllGroups = false mtx.Unlock() diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index b271f37fdb..4988f0cb35 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -144,7 +144,7 @@ func WithFetchConcurrency(concurrency int) DownloadDirOption { } } -func applyDownloadOptions(options ...DownloadDirOption) DownloadDirParams { +func applyDownloadDirOptions(options ...DownloadDirOption) DownloadDirParams { out := DownloadDirParams{ concurrency: 1, } @@ -154,6 +154,32 @@ func applyDownloadOptions(options ...DownloadDirOption) DownloadDirParams { return out } +// DownloadOption configures the provided params. +type UploadDirOption func(params *UploadDirParams) + +// UploadDirParams holds the UploadDir() parameters and is used by objstore clients implementations. +type UploadDirParams struct { + concurrency int + ignoredPaths []string +} + +// WithUploadConcurrency is an option to set the concurrency of the download operation. +func WithUploadConcurrency(concurrency int) UploadDirOption { + return func(params *UploadDirParams) { + params.concurrency = concurrency + } +} + +func applyUploadDirOptions(options ...UploadDirOption) UploadDirParams { + out := UploadDirParams{ + concurrency: 1, + } + for _, opt := range options { + opt(&out) + } + return out +} + type ObjectAttributes struct { // Size is the object size in bytes. Size int64 `json:"size"` @@ -207,29 +233,45 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser { // UploadDir uploads all files in srcdir to the bucket with into a top-level directory // named dstdir. It is a caller responsibility to clean partial upload in case of failure. -func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string) error { +func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadDirOption) error { df, err := os.Stat(srcdir) + opts := applyUploadDirOptions(options...) + g, ctx := errgroup.WithContext(ctx) + guard := make(chan struct{}, opts.concurrency) + if err != nil { return errors.Wrap(err, "stat dir") } if !df.IsDir() { return errors.Errorf("%s is not a directory", srcdir) } - return filepath.WalkDir(srcdir, func(src string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() { - return nil - } - srcRel, err := filepath.Rel(srcdir, src) - if err != nil { - return errors.Wrap(err, "getting relative path") - } + err = filepath.WalkDir(srcdir, func(src string, d fs.DirEntry, err error) error { + guard <- struct{}{} + g.Go(func() error { + defer func() { <-guard }() + if err != nil { + return err + } + if d.IsDir() { + return nil + } + srcRel, err := filepath.Rel(srcdir, src) + if err != nil { + return errors.Wrap(err, "getting relative path") + } + + dst := path.Join(dstdir, filepath.ToSlash(srcRel)) + return UploadFile(ctx, logger, bkt, src, dst) + }) - dst := path.Join(dstdir, filepath.ToSlash(srcRel)) - return UploadFile(ctx, logger, bkt, src, dst) + return nil }) + + if err == nil { + err = g.Wait() + } + + return err } // UploadFile uploads the file with the given name to the bucket. @@ -293,7 +335,7 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } - opts := applyDownloadOptions(options...) + opts := applyDownloadDirOptions(options...) g, ctx := errgroup.WithContext(ctx) guard := make(chan struct{}, opts.concurrency) From d563a98fb6533b93f0bc07adad03e8f17a5355a1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 21:01:57 -0700 Subject: [PATCH 06/13] Upload Test Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- pkg/objstore/objstore.go | 3 +-- pkg/objstore/objstore_test.go | 44 +++++++++++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 4988f0cb35..ceb8060389 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -159,8 +159,7 @@ type UploadDirOption func(params *UploadDirParams) // UploadDirParams holds the UploadDir() parameters and is used by objstore clients implementations. type UploadDirParams struct { - concurrency int - ignoredPaths []string + concurrency int } // WithUploadConcurrency is an option to set the concurrency of the download operation. diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index 24bf08fad8..950fd82eb1 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -9,10 +9,12 @@ import ( "io" "io/ioutil" "os" + "strings" "testing" "github.com/go-kit/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "go.uber.org/atomic" @@ -93,18 +95,56 @@ func TestTracingReader(t *testing.T) { testutil.Equals(t, int64(11), size) } -func TestDownloadDirConcurrency(t *testing.T) { - m := BucketWithMetrics("", NewInMemBucket(), nil) +func TestDownloadUploadDirConcurrency(t *testing.T) { + r := prometheus.NewRegistry() + m := BucketWithMetrics("", NewInMemBucket(), r) tempDir := t.TempDir() testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="iter"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="upload"} 3 + `), `thanos_objstore_bucket_operations_total`)) + testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10))) i, err := ioutil.ReadDir(tempDir) testutil.Ok(t, err) testutil.Assert(t, len(i) == 3) + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get"} 3 + thanos_objstore_bucket_operations_total{bucket="",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="iter"} 1 + thanos_objstore_bucket_operations_total{bucket="",operation="upload"} 3 + `), `thanos_objstore_bucket_operations_total`)) + + testutil.Ok(t, UploadDir(context.Background(), log.NewNopLogger(), m, tempDir, "/dir-copy", WithUploadConcurrency(10))) + + testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` + # HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket. + # TYPE thanos_objstore_bucket_operations_total counter + thanos_objstore_bucket_operations_total{bucket="",operation="attributes"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="delete"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="exists"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="get"} 3 + thanos_objstore_bucket_operations_total{bucket="",operation="get_range"} 0 + thanos_objstore_bucket_operations_total{bucket="",operation="iter"} 1 + thanos_objstore_bucket_operations_total{bucket="",operation="upload"} 6 + `), `thanos_objstore_bucket_operations_total`)) } func TestTimingTracingReader(t *testing.T) { From 8db8fb37ea67ee31ad2f77ce9b280bde9fb8f902 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 21:04:44 -0700 Subject: [PATCH 07/13] update change log Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e69be8843c..6465c91529 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. - [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard. - [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-fetch-concurrency` allowing to configure number of go routines for download block files during compaction. +- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download block files during compaction. ### Changed From 08615dea8e66bc206e17d668b1cd272f25378b3a Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Jul 2022 21:07:47 -0700 Subject: [PATCH 08/13] Change comments Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + pkg/objstore/objstore.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6465c91529..83101487c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard. - [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-fetch-concurrency` allowing to configure number of go routines for download block files during compaction. - [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download block files during compaction. +- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. ### Changed diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index ceb8060389..06c343e774 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -121,10 +121,10 @@ func ApplyIterOptions(options ...IterOption) IterParams { return out } -// DownloadOption configures the provided params. +// DownloadDirOption configures the provided params. type DownloadDirOption func(params *DownloadDirParams) -// DownloadParams holds the Download() parameters and is used by objstore clients implementations. +// DownloadDirParams holds the DownloadDir() parameters and is used by objstore clients implementations. type DownloadDirParams struct { concurrency int ignoredPaths []string @@ -154,7 +154,7 @@ func applyDownloadDirOptions(options ...DownloadDirOption) DownloadDirParams { return out } -// DownloadOption configures the provided params. +// UploadDirOption configures the provided params. type UploadDirOption func(params *UploadDirParams) // UploadDirParams holds the UploadDir() parameters and is used by objstore clients implementations. @@ -162,7 +162,7 @@ type UploadDirParams struct { concurrency int } -// WithUploadConcurrency is an option to set the concurrency of the download operation. +// WithUploadConcurrency is an option to set the concurrency of the upload operation. func WithUploadConcurrency(concurrency int) UploadDirOption { return func(params *UploadDirParams) { params.concurrency = concurrency From 9bfa1305237ec2b9f161828a8440739ebf1236ff Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 7 Jul 2022 23:10:55 -0700 Subject: [PATCH 09/13] Address comments Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- pkg/compact/compact.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index eb3398ab5f..acf75c5e52 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -913,7 +913,7 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada } // RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. -func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, g *Group, issue347Err error) error { +func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, blockFilesConcurrency int, issue347Err error) error { ie, ok := errors.Cause(issue347Err).(Issue347Error) if !ok { return errors.Errorf("Given error is not an issue347 error: %v", issue347Err) @@ -953,7 +953,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, } level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc, objstore.WithUploadConcurrency(g.blockFilesConcurrency)); err != nil { + if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc, objstore.WithUploadConcurrency(blockFilesConcurrency)); err != nil { return retry(errors.Wrapf(err, "upload of %s failed", resid)) } @@ -1245,7 +1245,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { } if IsIssue347Error(err) { - if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, g, err); err == nil { + if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, g.blockFilesConcurrency, err); err == nil { mtx.Lock() finishedAllGroups = false mtx.Unlock() From 538f3599b09569b1f866efcf27fbab28444176c0 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 8 Jul 2022 13:00:35 -0700 Subject: [PATCH 10/13] Remove duplicate entries on changelog Signed-off-by: Alan Protasio Signed-off-by: alanprot Signed-off-by: Alan Protasio --- CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83101487c9..c2f14d0b62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,10 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5440](https://github.com/thanos-io/thanos/pull/5440) HTTP metrics: export number of in-flight HTTP requests. - [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: Export metrics regarding size of remote write requests. - [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. -- [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard. -- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-fetch-concurrency` allowing to configure number of go routines for download block files during compaction. -- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download block files during compaction. -- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. +- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for upload/download block files during compaction. ### Changed From a63fa1f0af175a1378577781980e73478bb09207 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 11 Jul 2022 11:39:51 -0700 Subject: [PATCH 11/13] Addressing Comments Signed-off-by: alanprot Signed-off-by: Alan Protasio --- pkg/block/block.go | 8 +++--- pkg/compact/compact.go | 6 ++--- pkg/objstore/objstore.go | 56 +++++++++++++++++++--------------------- 3 files changed, 33 insertions(+), 37 deletions(-) diff --git a/pkg/block/block.go b/pkg/block/block.go index a7f4c51127..0b6c201434 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -45,7 +45,7 @@ const ( // Download downloads directory that is mean to be block directory. If any of the files // have a hash calculated in the meta file and it matches with what is in the destination path then // we do not download it. We always re-download the meta file. -func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string, options ...objstore.DownloadDirOption) error { +func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string, options ...objstore.DownloadOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } @@ -94,13 +94,13 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id // Upload uploads a TSDB block to the object storage. It verifies basic // features of Thanos block. -func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadDirOption) error { +func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error { return upload(ctx, logger, bkt, bdir, hf, true, options...) } // UploadPromBlock uploads a TSDB block to the object storage. It assumes // the block is used in Prometheus so it doesn't check Thanos external labels. -func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadDirOption) error { +func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error { return upload(ctx, logger, bkt, bdir, hf, false, options...) } @@ -108,7 +108,7 @@ func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket // It makes sure cleanup is done on error to avoid partial block uploads. // TODO(bplotka): Ensure bucket operations have reasonable backoff retries. // NOTE: Upload updates `meta.Thanos.File` section. -func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadDirOption) error { +func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadOption) error { df, err := os.Stat(bdir) if err != nil { return err diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index acf75c5e52..354d1580d8 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -913,7 +913,7 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada } // RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. -func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, blockFilesConcurrency int, issue347Err error) error { +func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error { ie, ok := errors.Cause(issue347Err).(Issue347Error) if !ok { return errors.Errorf("Given error is not an issue347 error: %v", issue347Err) @@ -953,7 +953,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, } level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc, objstore.WithUploadConcurrency(blockFilesConcurrency)); err != nil { + if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil { return retry(errors.Wrapf(err, "upload of %s failed", resid)) } @@ -1245,7 +1245,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { } if IsIssue347Error(err) { - if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, g.blockFilesConcurrency, err); err == nil { + if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, err); err == nil { mtx.Lock() finishedAllGroups = false mtx.Unlock() diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 06c343e774..3146606698 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -121,31 +121,31 @@ func ApplyIterOptions(options ...IterOption) IterParams { return out } -// DownloadDirOption configures the provided params. -type DownloadDirOption func(params *DownloadDirParams) +// DownloadOption configures the provided params. +type DownloadOption func(params *downloadParams) -// DownloadDirParams holds the DownloadDir() parameters and is used by objstore clients implementations. -type DownloadDirParams struct { +// downloadParams holds the DownloadDir() parameters and is used by objstore clients implementations. +type downloadParams struct { concurrency int ignoredPaths []string } // WithDownloadIgnoredPaths is an option to set the paths to not be downloaded. -func WithDownloadIgnoredPaths(ignoredPaths ...string) DownloadDirOption { - return func(params *DownloadDirParams) { +func WithDownloadIgnoredPaths(ignoredPaths ...string) DownloadOption { + return func(params *downloadParams) { params.ignoredPaths = ignoredPaths } } // WithFetchConcurrency is an option to set the concurrency of the download operation. -func WithFetchConcurrency(concurrency int) DownloadDirOption { - return func(params *DownloadDirParams) { +func WithFetchConcurrency(concurrency int) DownloadOption { + return func(params *downloadParams) { params.concurrency = concurrency } } -func applyDownloadDirOptions(options ...DownloadDirOption) DownloadDirParams { - out := DownloadDirParams{ +func applyDownloadOptions(options ...DownloadOption) downloadParams { + out := downloadParams{ concurrency: 1, } for _, opt := range options { @@ -154,23 +154,23 @@ func applyDownloadDirOptions(options ...DownloadDirOption) DownloadDirParams { return out } -// UploadDirOption configures the provided params. -type UploadDirOption func(params *UploadDirParams) +// UploadOption configures the provided params. +type UploadOption func(params *uploadParams) -// UploadDirParams holds the UploadDir() parameters and is used by objstore clients implementations. -type UploadDirParams struct { +// uploadParams holds the UploadDir() parameters and is used by objstore clients implementations. +type uploadParams struct { concurrency int } // WithUploadConcurrency is an option to set the concurrency of the upload operation. -func WithUploadConcurrency(concurrency int) UploadDirOption { - return func(params *UploadDirParams) { +func WithUploadConcurrency(concurrency int) UploadOption { + return func(params *uploadParams) { params.concurrency = concurrency } } -func applyUploadDirOptions(options ...UploadDirOption) UploadDirParams { - out := UploadDirParams{ +func applyUploadOptions(options ...UploadOption) uploadParams { + out := uploadParams{ concurrency: 1, } for _, opt := range options { @@ -232,11 +232,11 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser { // UploadDir uploads all files in srcdir to the bucket with into a top-level directory // named dstdir. It is a caller responsibility to clean partial upload in case of failure. -func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadDirOption) error { +func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error { df, err := os.Stat(srcdir) - opts := applyUploadDirOptions(options...) + opts := applyUploadOptions(options...) g, ctx := errgroup.WithContext(ctx) - guard := make(chan struct{}, opts.concurrency) + g.SetLimit(opts.concurrency) if err != nil { return errors.Wrap(err, "stat dir") @@ -245,9 +245,7 @@ func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdi return errors.Errorf("%s is not a directory", srcdir) } err = filepath.WalkDir(srcdir, func(src string, d fs.DirEntry, err error) error { - guard <- struct{}{} g.Go(func() error { - defer func() { <-guard }() if err != nil { return err } @@ -330,30 +328,28 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } // DownloadDir downloads all object found in the directory into the local directory. -func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadDirOption) error { +func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } - opts := applyDownloadDirOptions(options...) + opts := applyDownloadOptions(options...) g, ctx := errgroup.WithContext(ctx) - guard := make(chan struct{}, opts.concurrency) + g.SetLimit(opts.concurrency) var downloadedFiles []string var m sync.Mutex err := bkt.Iter(ctx, src, func(name string) error { - guard <- struct{}{} g.Go(func() error { - defer func() { <-guard }() dst := filepath.Join(dst, filepath.Base(name)) if strings.HasSuffix(name, DirDelim) { if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil { return err } m.Lock() - defer m.Unlock() downloadedFiles = append(downloadedFiles, dst) + m.Unlock() return nil } for _, ignoredPath := range opts.ignoredPaths { @@ -367,8 +363,8 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi } m.Lock() - defer m.Unlock() downloadedFiles = append(downloadedFiles, dst) + m.Unlock() return nil }) return nil From 472610e965dde3ed62305ba11625d342b517a972 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 11 Jul 2022 18:51:43 +0000 Subject: [PATCH 12/13] update golang.org/x/sync Signed-off-by: alanprot Signed-off-by: Alan Protasio --- go.mod | 2 +- go.sum | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index cdac721665..de76a21fd1 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,7 @@ require ( golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f golang.org/x/text v0.3.7 google.golang.org/api v0.78.0 google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e diff --git a/go.sum b/go.sum index 0ac474c08e..1f4147d0b8 100644 --- a/go.sum +++ b/go.sum @@ -2262,8 +2262,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From 0a4ffb1d15c516f631d089e08551bebd2abc0570 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 11 Jul 2022 12:21:17 -0700 Subject: [PATCH 13/13] Adding Commentts Signed-off-by: Alan Protasio --- CHANGELOG.md | 3 ++- pkg/objstore/objstore.go | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2f14d0b62..f5966ef4a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5440](https://github.com/thanos-io/thanos/pull/5440) HTTP metrics: export number of in-flight HTTP requests. - [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: Export metrics regarding size of remote write requests. - [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. -- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for upload/download block files during compaction. +- [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard. +- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. ### Changed diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 3146606698..8bf665d105 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -235,6 +235,9 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser { func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error { df, err := os.Stat(srcdir) opts := applyUploadOptions(options...) + + // The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first + // time Wait returns, whichever occurs first. g, ctx := errgroup.WithContext(ctx) g.SetLimit(opts.concurrency) @@ -334,6 +337,8 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi } opts := applyDownloadOptions(options...) + // The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first + // time Wait returns, whichever occurs first. g, ctx := errgroup.WithContext(ctx) g.SetLimit(opts.concurrency)