Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: merge small ranges (#578) #655

Merged
merged 9 commits into from
Jul 8, 2021
16 changes: 12 additions & 4 deletions cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,18 @@ origin sha256 is %s`,

func newBackupMetaCommand() *cobra.Command {
command := &cobra.Command{
Use: "backupmeta",
Short: "check the backup meta",
Args: cobra.NoArgs,
Use: "backupmeta",
Short: "utilities of backupmeta",
SilenceUsage: false,
}
command.AddCommand(newBackupMetaValidateCommand())
return command
}

func newBackupMetaValidateCommand() *cobra.Command {
command := &cobra.Command{
Use: "validate",
Short: "validate key range and rewrite rules of backupmeta",
RunE: func(cmd *cobra.Command, _ []string) error {
ctx, cancel := context.WithCancel(GetDefaultContext())
defer cancel()
Expand Down Expand Up @@ -236,7 +245,6 @@ func newBackupMetaCommand() *cobra.Command {
},
}
command.Flags().Uint64("offset", 0, "the offset of table id alloctor")
command.Hidden = true
return command
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
return kvRanges, nil
}

// BuildBackupRangeAndSchema gets the range and schema of tables.
// BuildBackupRangeAndSchema gets KV range and schema of tables.
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
func BuildBackupRangeAndSchema(
storage kv.Storage,
tableFilter filter.Filter,
Expand Down
3 changes: 0 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,6 @@ func (rc *Client) createTable(
if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name))
} else {
// don't use rc.ctx here...
// remove the ctx field of Client would be a great work,
// we just take a small step here :<
err := db.CreateTable(ctx, table)
if err != nil {
return CreatedTable{}, errors.Trace(err)
Expand Down
155 changes: 155 additions & 0 deletions pkg/restore/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/rtree"
)

const (
// DefaultMergeRegionSizeBytes is the default region split size, 96MB.
// See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38
DefaultMergeRegionSizeBytes uint64 = 96 * units.MB

// DefaultMergeRegionKeyCount is the default region key count, 960000.
DefaultMergeRegionKeyCount uint64 = 960000

writeCFName = "write"
defaultCFName = "default"
)

// MergeRangesStat holds statistics for the MergeRanges.
type MergeRangesStat struct {
TotalFiles int
TotalWriteCFFile int
TotalDefaultCFFile int
TotalRegions int
RegionKeysAvg int
RegionBytesAvg int
MergedRegions int
MergedRegionKeysAvg int
MergedRegionBytesAvg int
}

// MergeFileRanges returns ranges of the files are merged based on
// splitSizeBytes and splitKeyCount.
//
// By merging small ranges, it speeds up restoring a backup that contains many
// small ranges (regions) as it reduces split region and scatter region.
func MergeFileRanges(
files []*backuppb.File, splitSizeBytes, splitKeyCount uint64,
) ([]rtree.Range, *MergeRangesStat, error) {
if len(files) == 0 {
return []rtree.Range{}, &MergeRangesStat{}, nil
}
totalBytes := uint64(0)
totalKvs := uint64(0)
totalFiles := len(files)
writeCFFile := 0
defaultCFFile := 0

filesMap := make(map[string][]*backuppb.File)
for _, file := range files {
filesMap[string(file.StartKey)] = append(filesMap[string(file.StartKey)], file)

// We skips all default cf files because we don't range overlap.
if file.Cf == writeCFName || strings.Contains(file.GetName(), writeCFName) {
writeCFFile++
} else if file.Cf == defaultCFName || strings.Contains(file.GetName(), defaultCFName) {
defaultCFFile++
}
totalBytes += file.TotalBytes
totalKvs += file.TotalKvs
}
if writeCFFile == 0 && defaultCFFile == 0 {
return []rtree.Range{}, nil, errors.Annotatef(berrors.ErrRestoreInvalidBackup,
"unknown backup data from neither Wrtie CF nor Default CF")
}

// RawKV does not have data in write CF.
totalRegions := writeCFFile
if defaultCFFile > writeCFFile {
totalRegions = defaultCFFile
}

// Check if files are overlapped
rangeTree := rtree.NewRangeTree()
for key := range filesMap {
files := filesMap[key]
if out := rangeTree.InsertRange(rtree.Range{
StartKey: files[0].GetStartKey(),
EndKey: files[0].GetEndKey(),
Files: files,
}); out != nil {
return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange,
"duplicate range %s files %+v", out, files)
}
}

needMerge := func(left, right *rtree.Range) bool {
leftBytes, leftKeys := left.BytesAndKeys()
rightBytes, rightKeys := right.BytesAndKeys()
if rightBytes == 0 {
return true
}
if leftBytes+rightBytes > splitSizeBytes {
return false
}
if leftKeys+rightKeys > splitKeyCount {
return false
}
// Do not merge ranges in different tables.
if tablecodec.DecodeTableID(kv.Key(left.StartKey)) != tablecodec.DecodeTableID(kv.Key(right.StartKey)) {
return false
}
// Do not merge ranges in different indexes even if they are in the same
// table, as rewrite rule only supports rewriting one pattern.
// tableID, indexID, indexValues, err
_, indexID1, _, err1 := tablecodec.DecodeIndexKey(kv.Key(left.StartKey))
_, indexID2, _, err2 := tablecodec.DecodeIndexKey(kv.Key(right.StartKey))
// If both of them are index keys, ...
if err1 == nil && err2 == nil {
// Merge left and right if they are in the same index.
return indexID1 == indexID2
}
// Otherwise, merge if they are both record keys
return err1 != nil && err2 != nil
}
sortedRanges := rangeTree.GetSortedRanges()
for i := 1; i < len(sortedRanges); {
if !needMerge(&sortedRanges[i-1], &sortedRanges[i]) {
i++
continue
}
sortedRanges[i-1].EndKey = sortedRanges[i].EndKey
sortedRanges[i-1].Files = append(sortedRanges[i-1].Files, sortedRanges[i].Files...)
// TODO: this is slow when there are lots of ranges need to merge.
sortedRanges = append(sortedRanges[:i], sortedRanges[i+1:]...)
}

regionBytesAvg := totalBytes / uint64(totalRegions)
regionKeysAvg := totalKvs / uint64(totalRegions)
mergedRegionBytesAvg := totalBytes / uint64(len(sortedRanges))
mergedRegionKeysAvg := totalKvs / uint64(len(sortedRanges))

return sortedRanges, &MergeRangesStat{
TotalFiles: totalFiles,
TotalWriteCFFile: writeCFFile,
TotalDefaultCFFile: defaultCFFile,
TotalRegions: totalRegions,
RegionKeysAvg: int(regionKeysAvg),
RegionBytesAvg: int(regionBytesAvg),
MergedRegions: len(sortedRanges),
MergedRegionKeysAvg: int(mergedRegionKeysAvg),
MergedRegionBytesAvg: int(mergedRegionBytesAvg),
}, nil
}
Loading