Skip to content

Commit

Permalink
Merge pull request pingcap#10 from NingLin-P/add-concurrency-arg
Browse files Browse the repository at this point in the history
add concurrency arg
  • Loading branch information
NingLin-P authored Sep 29, 2019
2 parents ffb6822 + aa11d18 commit 166c0e8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
21 changes: 19 additions & 2 deletions cmd/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func NewBackupCommand() *cobra.Command {

bp.PersistentFlags().Uint64P(
"ratelimit", "", 0, "The rate limit of the backup task, MB/s per node")
bp.PersistentFlags().Uint32P(
"concurrency", "", 4, "The size of thread pool on each node that execute the backup task")
return bp
}

Expand Down Expand Up @@ -53,7 +55,15 @@ func newFullBackupCommand() *cobra.Command {
return err
}

err = client.BackupRange([]byte(""), []byte(""), u, backupTS, rate)
concurrency, err := command.Flags().GetUint32("concurrency")
if err != nil {
return err
}
if concurrency == 0 {
return errors.New("at least one thread required")
}

err = client.BackupRange([]byte(""), []byte(""), u, backupTS, rate, concurrency)
if err != nil {
return err
}
Expand Down Expand Up @@ -106,7 +116,14 @@ func newTableBackupCommand() *cobra.Command {
if err != nil {
return err
}
err = client.BackupTable(db, table, u, backupTS, rate)
concurrency, err := command.Flags().GetUint32("concurrency")
if err != nil {
return err
}
if concurrency == 0 {
return errors.New("at least one thread required")
}
err = client.BackupTable(db, table, u, backupTS, rate, concurrency)
if err != nil {
return err
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/raw/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (bc *BackupClient) BackupTable(
path string,
backupTS uint64,
rateLimit uint64,
concurrency uint32,
) error {
session, err := session.CreateSession(bc.backer.GetTiKV())
if err != nil {
Expand Down Expand Up @@ -161,7 +162,7 @@ func (bc *BackupClient) BackupTable(
ranges := buildTableRanges(tableInfo)
for _, r := range ranges {
start, end := r.Range()
err = bc.BackupRange(start, end, path, backupTS, rateLimit)
err = bc.BackupRange(start, end, path, backupTS, rateLimit, concurrency)
if err != nil {
return err
}
Expand Down Expand Up @@ -204,13 +205,15 @@ func (bc *BackupClient) BackupRange(
path string,
backupTS uint64,
rateMBs uint64,
concurrency uint32,
) error {
// The unit of rate limit in protocol is bytes per second.
rateLimit := rateMBs * 1024 * 1024
log.Info("backup started",
zap.Binary("StartKey", startKey),
zap.Binary("EndKey", endKey),
zap.Uint64("RateLimit", rateMBs))
zap.Uint64("RateLimit", rateMBs),
zap.Uint32("Concurrency", concurrency))
start := time.Now()
ctx, cancel := context.WithCancel(bc.ctx)
defer cancel()
Expand All @@ -227,6 +230,7 @@ func (bc *BackupClient) BackupRange(
EndVersion: backupTS,
Path: path,
RateLimit: rateLimit,
Concurrency: concurrency,
}
push := newPushDown(ctx, bc.backer, len(allStores))
results, err := push.pushBackup(req, allStores...)
Expand All @@ -237,7 +241,7 @@ func (bc *BackupClient) BackupRange(

// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(startKey, endKey, backupTS, path, results)
err = bc.fineGrainedBackup(startKey, endKey, backupTS, path, rateLimit, concurrency, results)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,6 +294,8 @@ func (bc *BackupClient) fineGrainedBackup(
startKey, endKey []byte,
backupTS uint64,
path string,
rateLimit uint64,
concurrency uint32,
rangeTree RangeTree,
) error {
bo := tikv.NewBackoffer(bc.ctx, backupFineGrainedMaxBackoff)
Expand Down Expand Up @@ -317,7 +323,7 @@ func (bc *BackupClient) fineGrainedBackup(
defer wg.Done()
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(boFork, rg, backupTS, path, respCh)
bc.handleFineGrained(boFork, rg, backupTS, path, rateLimit, concurrency, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -446,6 +452,8 @@ func (bc *BackupClient) handleFineGrained(
rg Range,
backupTS uint64,
path string,
rateLimit uint64,
concurrency uint32,
respCh chan<- *backup.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(rg.StartKey)
Expand All @@ -460,6 +468,8 @@ func (bc *BackupClient) handleFineGrained(
StartVersion: backupTS,
EndVersion: backupTS,
Path: path,
RateLimit: rateLimit,
Concurrency: concurrency,
}
lockResolver := bc.backer.GetLockResolver()
err := bc.backer.SendBackup(
Expand Down

0 comments on commit 166c0e8

Please sign in to comment.