Skip to content

Commit

Permalink
br-stream: add region context in applyRequest (pingcap#32657)
Browse files Browse the repository at this point in the history
* br-stream: add region context in applyRequest
  • Loading branch information
3pointer committed Mar 9, 2022
1 parent 88cedcc commit 5fbe645
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 11 deletions.
16 changes: 11 additions & 5 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,26 +1436,32 @@ func (rc *Client) RestoreKVFiles(ctx context.Context, rules map[int64]*RewriteRu
}

eg, ectx := errgroup.WithContext(ctx)
skipFile := 0
for _, file := range files {
filesReplica := file
fileReplica := file
// get rewrite rule from table id
rule, ok := rules[filesReplica.TableId]
rule, ok := rules[fileReplica.TableId]
if !ok {
// TODO handle new created table
// For this version we do not handle new created table after full backup.
// in next version we will perform rewrite and restore meta key to restore new created tables.
// so we can simply skip the file that doesn't have the rule here.
log.Info("skip file due to table id not matched", zap.String("file", file.Path))
log.Debug("skip file due to table id not matched", zap.String("file", fileReplica.Path), zap.Int64("tableId", fileReplica.TableId))
skipFile ++
continue
}
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
fileStart := time.Now()
defer func() {
log.Info("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
log.Info("import files done", zap.String("name", fileReplica.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, filesReplica, rule, rc.restoreTs)
return rc.fileImporter.ImportKVFiles(ectx, fileReplica, rule, rc.restoreTs)
})
}
log.Info("total skip files due to table id not matched", zap.Int("count", skipFile))
if skipFile > 0 {
log.Debug("table id in full backup storage", zap.Any("tables", rules))
}

if err = eg.Wait(); err != nil {
summary.CollectFailureUnit("file", err)
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,17 @@ func (importer *FileImporter) downloadAndApplyKVFile(
RestoreTs: restoreTs,
}

reqCtx := &kvrpcpb.Context{
RegionId: regionInfo.Region.GetId(),
RegionEpoch: regionInfo.Region.GetRegionEpoch(),
Peer: leader,
}

req := &import_sstpb.ApplyRequest{
Meta: meta,
StorageBackend: importer.backend,
RewriteRule: rule,
Context: reqCtx,
}
log.Debug("apply kv file", logutil.Leader(leader))
_, err := importer.importClient.ApplyKVFile(ctx, leader.GetStoreId(), req)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
func PaginateScanRegion(
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
) ([]*RegionInfo, error) {
if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
return nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, "startKey >= endKey, startKey: %s, endkey: %s",
if len(endKey) != 0 && bytes.Compare(startKey, endKey) > 0 {
return nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, "startKey > endKey, startKey: %s, endkey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func RewriteFileKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, en
startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
// fall back to encoded key
log.Info("cannot find rewrite rule with raw key format",
log.Debug("cannot find rewrite rule with raw key format",
logutil.Key("startKey", file.GetStartKey()),
zap.Reflect("rewrite data", rewriteRules.Data))
startKey, rule = rewriteEncodedKey(file.GetStartKey(), rewriteRules)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ func initRewriteRules(client *restore.Client, tables map[string]*metautil.Table)
// compare table exists in cluster and map[table]table.Info to get rewrite rules.
rules := make(map[int64]*restore.RewriteRules)
for _, t := range tables {
if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok {
// skip system table for now
continue
}
newTableInfo, err := client.GetTableSchema(client.GetDomain(), t.DB.Name, t.Info.Name)
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ replace github.com/pingcap/tidb/parser => ./parser
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible

replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220222060450-3c677934c555
replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220222060450-3c677934c555 h1:SOFQ0xR6YPvXnpe0Lj97FxYvOKVgc7aTEIMCh8XwX1U=
github.com/pingcap/kvproto v0.0.0-20220222060450-3c677934c555/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d h1:Zg8MIlra7ASn8g8rHeG3cPv4bdQRMvOxAlG+S8l3AYg=
github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down

0 comments on commit 5fbe645

Please sign in to comment.