Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: use token bucket to balance download requests. #49887

Merged
merged 17 commits into from
Jan 8, 2024
59 changes: 45 additions & 14 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ type Client struct {
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters

databases map[string]*utils.Database
ddlJobs []*model.Job
concurrencyPerStore uint
databases map[string]*utils.Database
ddlJobs []*model.Job

// store tables need to rebase info like auto id and random id and so on after create table
rebasedTablesMap map[UniqueTableName]bool
Expand Down Expand Up @@ -532,23 +533,27 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke

func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) {
storeWorkerPoolMap := make(map[uint64]chan struct{})
storeStatisticMap := make(map[uint64]*int64)
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash)
if err != nil {
log.Fatal("failed to get stores", zap.Error(err))
}
concurrencyPerStore := 512
concurrencyPerStore := rc.GetConcurrencyPerStore()
for _, store := range stores {
ch := make(chan struct{}, concurrencyPerStore)
for i := 0; i < concurrencyPerStore; i += 1 {
ch <- struct{}{}
}
ch := utils.BuildWorkerTokenChannel(concurrencyPerStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually only when useToKenBucket == true, the storeWorkerPoolMap need to be initialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I think it's okay to keep it here.

storeWorkerPoolMap[store.Id] = ch
storeStatisticMap[store.Id] = new(int64)
}
useTokenBucket := false
if rc.granularity == string(CoarseGrained) {
// coarse-grained make split & scatter pipeline fast enough
// so we can use a new token bucket way to speed up download.
// ToDo remove it when token bucket is stable enough.
log.Info("use token bucket to control download and ingest flow")
useTokenBucket = true
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, storeStatisticMap, rc.rewriteMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, rc.rewriteMode, concurrencyPerStore, useTokenBucket)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -656,8 +661,26 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string)

// SetConcurrency sets the concurrency of dbs tables files.
func (rc *Client) SetConcurrency(c uint) {
log.Info("new worker pool", zap.Uint("currency-count", c))
log.Info("download worker pool", zap.Uint("size", c))
rc.workerPool = utils.NewWorkerPool(c, "file")
rc.concurrencyPerStore = c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

// SetConcurrencyPerStore sets the concurrency of download files for each store.
func (rc *Client) SetConcurrencyPerStore(c uint) {
log.Info("per-store download worker pool", zap.Uint("size", c))
rc.concurrencyPerStore = c
}

func (rc *Client) GetTotalDownloadConcurrency() uint {
if rc.storeCount <= 0 {
log.Fatal("uninitialize store count", zap.Int("storeCount", rc.storeCount))
}
return rc.concurrencyPerStore * uint(rc.storeCount)
}

func (rc *Client) GetConcurrencyPerStore() uint {
return rc.concurrencyPerStore
}

// EnableOnline sets the mode of restore to online.
Expand Down Expand Up @@ -1256,7 +1279,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, nil, rc.rewriteMode)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, rc.rewriteMode, 128, false)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -1459,7 +1482,7 @@ LOOPFORTABLE:
// breaking here directly is also a reasonable behavior.
break LOOPFORTABLE
}
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
restoreFn := func() error {
filesGroups := getGroupFiles(filesReplica, rc.fileImporter.supportMultiIngest)
for _, filesGroup := range filesGroups {
if importErr := func(fs []*backuppb.File) error {
Expand All @@ -1485,7 +1508,15 @@ LOOPFORTABLE:
}
}
return nil
})
}
if rc.granularity == string(CoarseGrained) {
eg.Go(restoreFn)
} else {
// if we are not use coarse granularity which means
// we still pipeline split & scatter regions and import sst files
// just keep the consistency as before.
rc.workerPool.ApplyOnErrorGroup(eg, restoreFn)
}
}
}

Expand Down
Loading
Loading