Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: use token bucket to balance download requests. #49887

Merged
merged 17 commits into from
Jan 8, 2024
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ go_library(
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
Expand Down
60 changes: 45 additions & 15 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ type Client struct {
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters

databases map[string]*utils.Database
ddlJobs []*model.Job
concurrencyPerStore uint
databases map[string]*utils.Database
ddlJobs []*model.Job

// store tables need to rebase info like auto id and random id and so on after create table
rebasedTablesMap map[UniqueTableName]bool
Expand Down Expand Up @@ -532,23 +533,27 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke

func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool) {
storeWorkerPoolMap := make(map[uint64]chan struct{})
storeStatisticMap := make(map[uint64]*int64)
stores, err := conn.GetAllTiKVStoresWithRetry(ctx, rc.pdClient, util.SkipTiFlash)
if err != nil {
log.Fatal("failed to get stores", zap.Error(err))
}
concurrencyPerStore := 512
for _, store := range stores {
ch := make(chan struct{}, concurrencyPerStore)
for i := 0; i < concurrencyPerStore; i += 1 {
ch <- struct{}{}
concurrencyPerStore := rc.GetConcurrencyPerStore()
useTokenBucket := false
if rc.granularity == string(CoarseGrained) {
// coarse-grained make split & scatter pipeline fast enough
// so we can use a new token bucket way to speed up download.
// ToDo remove it when token bucket is stable enough.
log.Info("use token bucket to control download and ingest flow")
useTokenBucket = true
for _, store := range stores {
ch := utils.BuildWorkerTokenChannel(concurrencyPerStore)
storeWorkerPoolMap[store.Id] = ch
}
storeWorkerPoolMap[store.Id] = ch
storeStatisticMap[store.Id] = new(int64)
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, storeStatisticMap, rc.rewriteMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, storeWorkerPoolMap, rc.rewriteMode, concurrencyPerStore, useTokenBucket)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -656,10 +661,27 @@ func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string)

// SetConcurrency sets the concurrency of dbs tables files.
func (rc *Client) SetConcurrency(c uint) {
log.Info("new worker pool", zap.Uint("currency-count", c))
log.Info("download worker pool", zap.Uint("size", c))
rc.workerPool = utils.NewWorkerPool(c, "file")
}

// SetConcurrencyPerStore sets the concurrency of download files for each store.
func (rc *Client) SetConcurrencyPerStore(c uint) {
log.Info("per-store download worker pool", zap.Uint("size", c))
rc.concurrencyPerStore = c
}

func (rc *Client) GetTotalDownloadConcurrency() uint {
if rc.storeCount <= 0 {
log.Fatal("uninitialize store count", zap.Int("storeCount", rc.storeCount))
}
return rc.concurrencyPerStore * uint(rc.storeCount)
}

func (rc *Client) GetConcurrencyPerStore() uint {
return rc.concurrencyPerStore
}

// EnableOnline sets the mode of restore to online.
func (rc *Client) EnableOnline() {
rc.isOnline = true
Expand Down Expand Up @@ -1256,7 +1278,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, nil, rc.rewriteMode)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, false, nil, rc.rewriteMode, 128, false)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -1459,7 +1481,7 @@ LOOPFORTABLE:
// breaking here directly is also a reasonable behavior.
break LOOPFORTABLE
}
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
restoreFn := func() error {
filesGroups := getGroupFiles(filesReplica, rc.fileImporter.supportMultiIngest)
for _, filesGroup := range filesGroups {
if importErr := func(fs []*backuppb.File) error {
Expand All @@ -1485,7 +1507,15 @@ LOOPFORTABLE:
}
}
return nil
})
}
if rc.granularity == string(CoarseGrained) {
eg.Go(restoreFn)
} else {
// if we are not use coarse granularity which means
// we still pipeline split & scatter regions and import sst files
// just keep the consistency as before.
rc.workerPool.ApplyOnErrorGroup(eg, restoreFn)
}
}
}

Expand Down
Loading
Loading