Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br-stream: add region context in applyRequest #32657

Merged
merged 6 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,26 +1430,32 @@ func (rc *Client) RestoreKVFiles(ctx context.Context, rules map[int64]*RewriteRu
}

eg, ectx := errgroup.WithContext(ctx)
skipFile := 0
for _, file := range files {
filesReplica := file
fileReplica := file
// get rewrite rule from table id
rule, ok := rules[filesReplica.TableId]
rule, ok := rules[fileReplica.TableId]
if !ok {
// TODO handle new created table
// For this version we do not handle new created table after full backup.
// in next version we will perform rewrite and restore meta key to restore new created tables.
// so we can simply skip the file that doesn't have the rule here.
log.Info("skip file due to table id not matched", zap.String("file", file.Path))
log.Debug("skip file due to table id not matched", zap.String("file", fileReplica.Path), zap.Int64("tableId", fileReplica.TableId))
skipFile ++
continue
}
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
fileStart := time.Now()
defer func() {
log.Info("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
log.Info("import files done", zap.String("name", fileReplica.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, filesReplica, rule, rc.restoreTs)
return rc.fileImporter.ImportKVFiles(ectx, fileReplica, rule, rc.restoreTs)
})
}
log.Info("total skip files due to table id not matched", zap.Int("count", skipFile))
if skipFile > 0 {
log.Debug("table id in full backup storage", zap.Any("tables", rules))
}

if err = eg.Wait(); err != nil {
summary.CollectFailureUnit("file", err)
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,17 @@ func (importer *FileImporter) downloadAndApplyKVFile(
RestoreTs: restoreTs,
}

reqCtx := &kvrpcpb.Context{
RegionId: regionInfo.Region.GetId(),
RegionEpoch: regionInfo.Region.GetRegionEpoch(),
Peer: leader,
}

req := &import_sstpb.ApplyRequest{
Meta: meta,
StorageBackend: importer.backend,
RewriteRule: rule,
Context: reqCtx,
}
log.Debug("apply kv file", logutil.Leader(leader))
_, err := importer.importClient.ApplyKVFile(ctx, leader.GetStoreId(), req)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
func PaginateScanRegion(
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
) ([]*RegionInfo, error) {
if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
return nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, "startKey >= endKey, startKey: %s, endkey: %s",
if len(endKey) != 0 && bytes.Compare(startKey, endKey) > 0 {
return nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, "startKey > endKey, startKey: %s, endkey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func RewriteFileKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, en
startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
// fall back to encoded key
log.Info("cannot find rewrite rule with raw key format",
log.Debug("cannot find rewrite rule with raw key format",
logutil.Key("startKey", file.GetStartKey()),
zap.Reflect("rewrite data", rewriteRules.Data))
startKey, rule = rewriteEncodedKey(file.GetStartKey(), rewriteRules)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ func initRewriteRules(client *restore.Client, tables map[string]*metautil.Table)
// compare table exists in cluster and map[table]table.Info to get rewrite rules.
rules := make(map[int64]*restore.RewriteRules)
for _, t := range tables {
if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok {
// skip system table for now
continue
}
newTableInfo, err := client.GetTableSchema(client.GetDomain(), t.DB.Name, t.Info.Name)
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ replace github.com/pingcap/tidb/parser => ./parser
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible

replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220222060450-3c677934c555
replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220222060450-3c677934c555 h1:SOFQ0xR6YPvXnpe0Lj97FxYvOKVgc7aTEIMCh8XwX1U=
github.com/pingcap/kvproto v0.0.0-20220222060450-3c677934c555/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d h1:Zg8MIlra7ASn8g8rHeG3cPv4bdQRMvOxAlG+S8l3AYg=
github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down