-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
restore: use token bucket to balance download requests. #49887
Merged
Merged
Changes from 15 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
d7d32b7
restore: use token bucket to balance download SSTs
3pointer 5e77369
update config logic
3pointer 333fb78
update
3pointer 70f3bbf
update granlurity control logic
3pointer 71ae2a1
lint
3pointer 4bc2b0d
lint
3pointer b38c4f9
add integration test
3pointer 8ca03ea
keep consistency as before
3pointer cac1281
Apply suggestions from code review
3pointer 1faf7cb
Merge branch 'perStoreConcurrency' of https://github.com/3pointer/tid…
3pointer ba75968
address comments
3pointer 8815582
address comment
3pointer bb4ee99
Merge branch 'master' into patch_per_store_concurrency
3pointer d088afa
fix bazel
3pointer 1b63aea
update
3pointer ac95cdd
address comment
3pointer 498160f
update
3pointer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,8 +98,9 @@ type Client struct { | |
tlsConf *tls.Config | ||
keepaliveConf keepalive.ClientParameters | ||
|
||
databases map[string]*utils.Database | ||
ddlJobs []*model.Job | ||
concurrencyPerStore uint | ||
databases map[string]*utils.Database | ||
ddlJobs []*model.Job | ||
|
||
// store tables need to rebase info like auto id and random id and so on after create table | ||
rebasedTablesMap map[UniqueTableName]bool | ||
|
@@ -532,23 +533,27 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke | |
|
||
func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) { | ||
storeWorkerPoolMap := make(map[uint64]chan struct{}) | ||
storeStatisticMap := make(map[uint64]*int64) | ||
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash) | ||
if err != nil { | ||
log.Fatal("failed to get stores", zap.Error(err)) | ||
} | ||
concurrencyPerStore := 512 | ||
concurrencyPerStore := rc.GetConcurrencyPerStore() | ||
for _, store := range stores { | ||
ch := make(chan struct{}, concurrencyPerStore) | ||
for i := 0; i < concurrencyPerStore; i += 1 { | ||
ch <- struct{}{} | ||
} | ||
ch := utils.BuildWorkerTokenChannel(concurrencyPerStore) | ||
storeWorkerPoolMap[store.Id] = ch | ||
storeStatisticMap[store.Id] = new(int64) | ||
} | ||
useTokenBucket := false | ||
if rc.granularity == string(CoarseGrained) { | ||
// coarse-grained make split & scatter pipeline fast enough | ||
// so we can use a new token bucket way to speed up download. | ||
// ToDo remove it when token bucket is stable enough. | ||
log.Info("use token bucket to control download and ingest flow") | ||
useTokenBucket = true | ||
} | ||
|
||
metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode) | ||
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) | ||
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, storeStatisticMap, rc.rewriteMode) | ||
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, rc.rewriteMode, concurrencyPerStore, useTokenBucket) | ||
} | ||
|
||
func (rc *Client) SetRawKVClient(c *RawKVBatchClient) { | ||
|
@@ -656,8 +661,26 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) | |
|
||
// SetConcurrency sets the concurrency of dbs tables files. | ||
func (rc *Client) SetConcurrency(c uint) { | ||
log.Info("new worker pool", zap.Uint("currency-count", c)) | ||
log.Info("download worker pool", zap.Uint("size", c)) | ||
rc.workerPool = utils.NewWorkerPool(c, "file") | ||
rc.concurrencyPerStore = c | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it need to be removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
} | ||
|
||
// SetConcurrencyPerStore sets the concurrency of download files for each store. | ||
func (rc *Client) SetConcurrencyPerStore(c uint) { | ||
log.Info("per-store download worker pool", zap.Uint("size", c)) | ||
rc.concurrencyPerStore = c | ||
} | ||
|
||
func (rc *Client) GetTotalDownloadConcurrency() uint { | ||
if rc.storeCount <= 0 { | ||
log.Fatal("uninitialize store count", zap.Int("storeCount", rc.storeCount)) | ||
} | ||
return rc.concurrencyPerStore * uint(rc.storeCount) | ||
} | ||
|
||
func (rc *Client) GetConcurrencyPerStore() uint { | ||
return rc.concurrencyPerStore | ||
} | ||
|
||
// EnableOnline sets the mode of restore to online. | ||
|
@@ -1256,7 +1279,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient, | |
rc.SetRateLimit(42) | ||
rc.SetConcurrency(concurrency) | ||
rc.hasSpeedLimited = false | ||
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, nil, rc.rewriteMode) | ||
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, rc.rewriteMode, 128, false) | ||
return rc.setSpeedLimit(ctx, rc.rateLimit) | ||
} | ||
|
||
|
@@ -1459,7 +1482,7 @@ LOOPFORTABLE: | |
// breaking here directly is also a reasonable behavior. | ||
break LOOPFORTABLE | ||
} | ||
rc.workerPool.ApplyOnErrorGroup(eg, func() error { | ||
restoreFn := func() error { | ||
filesGroups := getGroupFiles(filesReplica, rc.fileImporter.supportMultiIngest) | ||
for _, filesGroup := range filesGroups { | ||
if importErr := func(fs []*backuppb.File) error { | ||
|
@@ -1485,7 +1508,15 @@ LOOPFORTABLE: | |
} | ||
} | ||
return nil | ||
}) | ||
} | ||
if rc.granularity == string(CoarseGrained) { | ||
eg.Go(restoreFn) | ||
} else { | ||
// if we are not use coarse granularity which means | ||
// we still pipeline split & scatter regions and import sst files | ||
// just keep the consistency as before. | ||
rc.workerPool.ApplyOnErrorGroup(eg, restoreFn) | ||
} | ||
} | ||
} | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually only when useToKenBucket == true, the storeWorkerPoolMap need to be initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but I think it's okay to keep it here.