Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: use token bucket to balance download requests. #49887

Merged
merged 17 commits into from
Jan 8, 2024
9 changes: 4 additions & 5 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,17 +538,17 @@ func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBack
log.Fatal("failed to get stores", zap.Error(err))
}
concurrencyPerStore := rc.GetConcurrencyPerStore()
for _, store := range stores {
ch := utils.BuildWorkerTokenChannel(concurrencyPerStore)
storeWorkerPoolMap[store.Id] = ch
}
useTokenBucket := false
if rc.granularity == string(CoarseGrained) {
// coarse-grained make split & scatter pipeline fast enough
// so we can use a new token bucket way to speed up download.
// ToDo remove it when token bucket is stable enough.
log.Info("use token bucket to control download and ingest flow")
useTokenBucket = true
for _, store := range stores {
ch := utils.BuildWorkerTokenChannel(concurrencyPerStore)
storeWorkerPoolMap[store.Id] = ch
}
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
Expand Down Expand Up @@ -663,7 +663,6 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string)
func (rc *Client) SetConcurrency(c uint) {
log.Info("download worker pool", zap.Uint("size", c))
rc.workerPool = utils.NewWorkerPool(c, "file")
rc.concurrencyPerStore = c
}

// SetConcurrencyPerStore sets the concurrency of download files for each store.
Expand Down
20 changes: 16 additions & 4 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,18 +978,30 @@ func (importer *FileImporter) downloadSSTV2(
for _, p := range regionInfo.Region.GetPeers() {
peer := p
eg.Go(func() error {
importer.storeWorkerPoolRWLock.Lock()
importer.storeWorkerPoolRWLock.RLock()
workerCh, ok := importer.storeWorkerPoolMap[peer.GetStoreId()]
// handle the case that the store is new-scaled in the cluster
if !ok {
workerCh = utils.BuildWorkerTokenChannel(importer.concurrencyPerStore)
importer.storeWorkerPoolRWLock.RUnlock()
importer.storeWorkerPoolRWLock.Lock()
// Notice: worker channel can't replaced, because it is still used after unlock.
if workerCh, ok = importer.storeWorkerPoolMap[peer.GetStoreId()]; !ok {
workerCh = utils.BuildWorkerTokenChannel(importer.concurrencyPerStore)
importer.storeWorkerPoolMap[peer.GetStoreId()] = workerCh
}
importer.storeWorkerPoolMap[peer.GetStoreId()] = workerCh
3pointer marked this conversation as resolved.
Show resolved Hide resolved
importer.storeWorkerPoolRWLock.Unlock()
} else {
importer.storeWorkerPoolRWLock.RUnlock()
}
importer.storeWorkerPoolRWLock.Unlock()
defer func() {
workerCh <- struct{}{}
}()
<-workerCh
select {
case <-ectx.Done():
return ectx.Err()
case <-workerCh:
}
for _, file := range files {
req, ok := downloadReqsMap[file.Name]
if !ok {
Expand Down
Loading