Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objstore: Download and Upload block files in parallel #5475

Merged
merged 13 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: Export metrics regarding size of remote write requests.
- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants.
- [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard.
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
alanprot marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
4 changes: 4 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func runCompact(
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
conf.blockFilesConcurrency,
)
tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
Expand Down Expand Up @@ -630,6 +631,7 @@ type compactConfig struct {
waitInterval time.Duration
disableDownsampling bool
blockMetaFetchConcurrency int
blockFilesConcurrency int
blockViewerSyncBlockInterval time.Duration
blockViewerSyncBlockTimeout time.Duration
cleanupBlocksInterval time.Duration
Expand Down Expand Up @@ -688,6 +690,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage.").
Default("1").IntVar(&cc.blockFilesConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("block-viewer.global.sync-block-timeout", "Maximum time for syncing the blocks between local and remote view for /global Block Viewer UI.").
Expand Down
4 changes: 4 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ usage: thanos compact [<flags>]
Continuously compacts blocks in an object store bucket.
Flags:
--block-files-concurrency=1
Number of goroutines to use when
fetching/uploading block files from object
storage.
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
golang.org/x/text v0.3.7
google.golang.org/api v0.78.0
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2262,8 +2262,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
16 changes: 8 additions & 8 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
// Download downloads directory that is mean to be block directory. If any of the files
// have a hash calculated in the meta file and it matches with what is in the destination path then
// we do not download it. We always re-download the meta file.
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error {
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string, options ...objstore.DownloadOption) error {
if err := os.MkdirAll(dst, 0750); err != nil {
return errors.Wrap(err, "create dir")
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id
}
}

if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, ignoredPaths...); err != nil {
if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, append(options, objstore.WithDownloadIgnoredPaths(ignoredPaths...))...); err != nil {
return err
}

Expand All @@ -94,21 +94,21 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id

// Upload uploads a TSDB block to the object storage. It verifies basic
// features of Thanos block.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, true)
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error {
return upload(ctx, logger, bkt, bdir, hf, true, options...)
}

// UploadPromBlock uploads a TSDB block to the object storage. It assumes
// the block is used in Prometheus so it doesn't check Thanos external labels.
func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, false)
func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error {
return upload(ctx, logger, bkt, bdir, hf, false, options...)
}

// upload uploads block from given block dir that ends with block id.
// It makes sure cleanup is done on error to avoid partial block uploads.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
// NOTE: Upload updates `meta.Thanos.File` section.
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error {
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadOption) error {
df, err := os.Stat(bdir)
if err != nil {
return err
Expand Down Expand Up @@ -145,7 +145,7 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "encode meta file")
}

if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {
if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ type DefaultGrouper struct {
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
blockFilesConcurrency int
}

// NewDefaultGrouper makes a new DefaultGrouper.
Expand All @@ -245,6 +246,7 @@ func NewDefaultGrouper(
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
) *DefaultGrouper {
return &DefaultGrouper{
bkt: bkt,
Expand Down Expand Up @@ -275,6 +277,7 @@ func NewDefaultGrouper(
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
}
}

Expand Down Expand Up @@ -304,6 +307,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
g.blockFilesConcurrency,
)
if err != nil {
return nil, errors.Wrap(err, "create compaction group")
Expand Down Expand Up @@ -342,6 +346,7 @@ type Group struct {
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
blockFilesConcurrency int
}

// NewGroup returns a new compaction group.
Expand All @@ -362,10 +367,16 @@ func NewGroup(
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
) (*Group, error) {
if logger == nil {
logger = log.NewNopLogger()
}

if blockFilesConcurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), blockFilesConcurrency level must be > 0", blockFilesConcurrency)
}

g := &Group{
logger: logger,
bkt: bkt,
Expand All @@ -383,6 +394,7 @@ func NewGroup(
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
}
return g, nil
}
Expand Down Expand Up @@ -1007,7 +1019,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}

tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
Expand Down Expand Up @@ -1109,7 +1121,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin = time.Now()

tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error {
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc)
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency))
return err
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc)
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 1)
groups, err := grouper.Groups(sy.Metas())
testutil.Ok(t, err)

Expand Down Expand Up @@ -214,7 +214,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Ok(t, err)

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 1)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
testutil.Ok(t, err)

Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestRetentionProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "")
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1)

type groupedResult map[string]float64

Expand Down Expand Up @@ -376,7 +376,7 @@ func TestCompactProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "")
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1)

for _, tcase := range []struct {
testName string
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestDownsampleProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "")
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1)

for _, tcase := range []struct {
testName string
Expand Down
Loading