Skip to content

Commit

Permalink
Merge branch 'master' into zimuxia/expr-err
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Mar 12, 2024
2 parents 12cfc4b + a1d5a5a commit 1de619d
Show file tree
Hide file tree
Showing 40 changed files with 451 additions and 195 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5806,13 +5806,13 @@ def go_deps():
name = "com_github_pingcap_errors",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/errors",
sha256 = "b4db3d3c222d9039c84baacbbd9c46aa0346f3f04d2577a77475a64ecfefebf9",
strip_prefix = "github.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb",
sha256 = "0edb07dbd73a90f97e06e11e54b270d64d5cabe6142025682d840fe302087b23",
strip_prefix = "github.com/pingcap/errors@v0.11.5-0.20240311024730-e056997136bb",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip",
],
)
go_repository(
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/redact/redact.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (

// InitRedact inits the enableRedactLog
func InitRedact(redactLog bool) {
errors.RedactLogEnabled.Store(redactLog)
mode := errors.RedactLogDisable
if redactLog {
mode = errors.RedactLogEnable
}
errors.RedactLogEnabled.Store(mode)
}

// NeedRedact returns whether to redact log
func NeedRedact() bool {
return errors.RedactLogEnabled.Load()
mode := errors.RedactLogEnabled.Load()
return mode != errors.RedactLogDisable && mode != ""
}

// String receives string argument and return omitted information if redact log enabled
Expand Down
26 changes: 15 additions & 11 deletions br/pkg/restore/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ const (
type Batcher struct {
cachedTables []TableWithRange
cachedTablesMu *sync.Mutex
rewriteRules *RewriteRules

// autoCommitJoiner is for joining the background batch sender.
autoCommitJoiner chan<- struct{}
Expand Down Expand Up @@ -114,7 +113,6 @@ func NewBatcher(
outCh := DefaultOutputTableChan()
sendChan := make(chan SendType, 2)
b := &Batcher{
rewriteRules: EmptyRewriteRule(),
sendErr: errCh,
outCh: outCh,
sender: sender,
Expand Down Expand Up @@ -227,8 +225,10 @@ type DrainResult struct {
TablesToSend []CreatedTable
// BlankTablesAfterSend are tables that will be full-restored after this batch send.
BlankTablesAfterSend []CreatedTable
RewriteRules *RewriteRules
Ranges []rtree.Range
// RewriteRules are the rewrite rules for the tables.
// the key is the table id after rewritten.
RewriteRulesMap map[int64]*RewriteRules
Ranges []rtree.Range
// Record which part of ranges belongs to the table
TableEndOffsetInRanges []int
}
Expand All @@ -240,14 +240,19 @@ func (result DrainResult) Files() []TableIDWithFiles {
for i, endOffset := range result.TableEndOffsetInRanges {
tableID := result.TablesToSend[i].Table.ID
ranges := result.Ranges[startOffset:endOffset]
files := make([]*backuppb.File, 0, len(result.Ranges)*2)
// each range has at least a default file + a write file
files := make([]*backuppb.File, 0, len(ranges)*2)
for _, rg := range ranges {
files = append(files, rg.Files...)
}

var rules *RewriteRules
if r, ok := result.RewriteRulesMap[tableID]; ok {
rules = r
}
tableIDWithFiles = append(tableIDWithFiles, TableIDWithFiles{
TableID: tableID,
Files: files,
TableID: tableID,
Files: files,
RewriteRules: rules,
})

// update start offset
Expand All @@ -261,7 +266,7 @@ func newDrainResult() DrainResult {
return DrainResult{
TablesToSend: make([]CreatedTable, 0),
BlankTablesAfterSend: make([]CreatedTable, 0),
RewriteRules: EmptyRewriteRule(),
RewriteRulesMap: EmptyRewriteRulesMap(),
Ranges: make([]rtree.Range, 0),
TableEndOffsetInRanges: make([]int, 0),
}
Expand Down Expand Up @@ -329,7 +334,7 @@ func (b *Batcher) drainRanges() DrainResult {
thisTableLen := len(thisTable.Range)
collected := len(result.Ranges)

result.RewriteRules.Append(*thisTable.RewriteRule)
result.RewriteRulesMap[thisTable.Table.ID] = thisTable.RewriteRule
result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable)

// the batch is full, we should stop here!
Expand Down Expand Up @@ -423,7 +428,6 @@ func (b *Batcher) Add(tbs TableWithRange) {
zap.Int("batch size", b.Len()),
)
b.cachedTables = append(b.cachedTables, tbs)
b.rewriteRules.Append(*tbs.RewriteRule)
atomic.AddInt32(&b.size, int32(len(tbs.Range)))
b.cachedTablesMu.Unlock()

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (sender *drySender) RestoreBatch(ranges restore.DrainResult) {
defer sender.mu.Unlock()
log.Info("fake restore range", rtree.ZapRanges(ranges.Ranges))
sender.nBatch++
sender.rewriteRules.Append(*ranges.RewriteRules)
for _, r := range ranges.RewriteRulesMap {
sender.rewriteRules.Append(*r)
}
sender.ranges = append(sender.ranges, ranges.Ranges...)
sender.sink.EmitTables(ranges.BlankTablesAfterSend...)
}
Expand Down
7 changes: 3 additions & 4 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,10 +1407,9 @@ func getGroupFiles(files []*backuppb.File, supportMulti bool) [][]*backuppb.File
// SplitRanges implements TiKVRestorer.
func (rc *Client) SplitRanges(ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool) error {
return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv)
return SplitRanges(ctx, rc, ranges, updateCh, isRawKv)
}

func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int64]*RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error) {
Expand Down Expand Up @@ -1470,7 +1469,6 @@ func (rc *Client) WrapLogFilesIterWithCheckpoint(
func (rc *Client) RestoreSSTFiles(
ctx context.Context,
tableIDWithFiles []TableIDWithFiles,
rewriteRules *RewriteRules,
updateCh glue.Progress,
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -1505,6 +1503,7 @@ LOOPFORTABLE:
for _, tableIDWithFile := range tableIDWithFiles {
tableID := tableIDWithFile.TableID
files := tableIDWithFile.Files
rules := tableIDWithFile.RewriteRules
fileCount += len(files)
for rangeFiles, leftFiles = drainFilesByRange(files); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles) {
filesReplica := rangeFiles
Expand All @@ -1529,7 +1528,7 @@ LOOPFORTABLE:
updateCh.Inc()
}
}()
return rc.fileImporter.ImportSSTFiles(ectx, fs, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion())
return rc.fileImporter.ImportSSTFiles(ectx, fs, rules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion())
}(filesGroup); importErr != nil {
return errors.Trace(importErr)
}
Expand Down
63 changes: 58 additions & 5 deletions br/pkg/restore/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ import (

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/tablecodec"
"go.uber.org/zap"
)

const (
Expand All @@ -29,13 +34,16 @@ type MergeRangesStat struct {
MergedRegionBytesAvg int
}

// MergeFileRanges returns ranges of the files are merged based on
// MergeAndRewriteFileRanges returns ranges of the files are merged based on
// splitSizeBytes and splitKeyCount.
//
// By merging small ranges, it speeds up restoring a backup that contains many
// small ranges (regions) as it reduces split region and scatter region.
func MergeFileRanges(
files []*backuppb.File, splitSizeBytes, splitKeyCount uint64,
func MergeAndRewriteFileRanges(
files []*backuppb.File,
rewriteRules *RewriteRules,
splitSizeBytes,
splitKeyCount uint64,
) ([]rtree.Range, *MergeRangesStat, error) {
if len(files) == 0 {
return []rtree.Range{}, &MergeRangesStat{}, nil
Expand Down Expand Up @@ -78,12 +86,20 @@ func MergeFileRanges(
for _, f := range filesMap[key] {
rangeSize += f.Size_
}
if out := rangeTree.InsertRange(rtree.Range{
rg := &rtree.Range{
StartKey: files[0].GetStartKey(),
EndKey: files[0].GetEndKey(),
Files: files,
Size: rangeSize,
}); out != nil {
}
// rewrite Range for split.
// so that splitRanges no need to handle rewrite rules any more.
tmpRng, err := RewriteRange(rg, rewriteRules)
if err != nil {
return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange,
"unable to rewrite range files %+v", files)
}
if out := rangeTree.InsertRange(*tmpRng); out != nil {
return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange,
"duplicate range %s files %+v", out, files)
}
Expand All @@ -107,3 +123,40 @@ func MergeFileRanges(
MergedRegionBytesAvg: int(mergedRegionBytesAvg),
}, nil
}

func RewriteRange(rg *rtree.Range, rewriteRules *RewriteRules) (*rtree.Range, error) {
if rewriteRules == nil {
return rg, nil
}
startID := tablecodec.DecodeTableID(rg.StartKey)
endID := tablecodec.DecodeTableID(rg.EndKey)
var rule *import_sstpb.RewriteRule
if startID != endID {
log.Warn("table id does not match",
logutil.Key("startKey", rg.StartKey),
logutil.Key("endKey", rg.EndKey),
zap.Int64("startID", startID),
zap.Int64("endID", endID))
return nil, errors.Annotate(berrors.ErrRestoreTableIDMismatch, "table id mismatch")
}
rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", logutil.Key("key", rg.StartKey))
} else {
log.Debug(
"rewrite start key",
logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule))
}
oldKey := rg.EndKey
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
logutil.Key("origin-key", oldKey),
logutil.Key("key", rg.EndKey),
logutil.RewriteRule(rule))
}
return rg, nil
}
Loading

0 comments on commit 1de619d

Please sign in to comment.