From d15d58cbcd4824b7c06b3b40b24389c87bf0f6ad Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 24 Dec 2020 15:41:59 +0800 Subject: [PATCH 1/4] cherry pick #578 to release-4.0 Signed-off-by: ti-srebot --- cmd/debug.go | 16 +- pkg/backup/client.go | 4 +- pkg/restore/client.go | 3 - pkg/restore/merge.go | 155 ++++++++++++++++++ pkg/restore/merge_test.go | 297 +++++++++++++++++++++++++++++++++++ pkg/restore/util.go | 152 +++++++----------- pkg/restore/util_test.go | 32 ++-- pkg/rtree/rtree.go | 13 ++ pkg/task/backup_test.go | 2 + pkg/task/restore.go | 80 ++++++++-- pkg/task/restore_raw.go | 19 ++- pkg/task/restore_test.go | 23 +++ tests/_utils/run_services | 2 + tests/br_debug_meta/run.sh | 22 +-- tests/br_debug_meta/workload | 12 ++ tests/br_full_ddl/run.sh | 14 +- tests/br_other/run.sh | 4 +- 17 files changed, 693 insertions(+), 157 deletions(-) create mode 100644 pkg/restore/merge.go create mode 100644 pkg/restore/merge_test.go create mode 100644 pkg/task/restore_test.go create mode 100644 tests/br_debug_meta/workload diff --git a/cmd/debug.go b/cmd/debug.go index 88e2c5703..5d4576b24 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -143,9 +143,18 @@ origin sha256 is %s`, func newBackupMetaCommand() *cobra.Command { command := &cobra.Command{ - Use: "backupmeta", - Short: "check the backup meta", - Args: cobra.NoArgs, + Use: "backupmeta", + Short: "utilities of backupmeta", + SilenceUsage: false, + } + command.AddCommand(newBackupMetaValidateCommand()) + return command +} + +func newBackupMetaValidateCommand() *cobra.Command { + command := &cobra.Command{ + Use: "validate", + Short: "validate key range and rewrite rules of backupmeta", RunE: func(cmd *cobra.Command, _ []string) error { ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() @@ -234,7 +243,6 @@ func newBackupMetaCommand() *cobra.Command { }, } command.Flags().Uint64("offset", 0, "the offset of table id alloctor") - command.Hidden = true return command } diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 052d19e98..e3195994b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -244,7 +244,9 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) { return kvRanges, nil } -// BuildBackupRangeAndSchema gets the range and schema of tables. +// BuildBackupRangeAndSchema gets KV range and schema of tables. +// KV ranges are separated by Table IDs. +// Also, KV ranges are separated by Index IDs in the same table. func BuildBackupRangeAndSchema( dom *domain.Domain, storage kv.Storage, diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 701aa2a37..662cade56 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -390,9 +390,6 @@ func (rc *Client) createTable( if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { - // don't use rc.ctx here... - // remove the ctx field of Client would be a great work, - // we just take a small step here :< err := db.CreateTable(ctx, table) if err != nil { return CreatedTable{}, errors.Trace(err) diff --git a/pkg/restore/merge.go b/pkg/restore/merge.go new file mode 100644 index 000000000..49c78436a --- /dev/null +++ b/pkg/restore/merge.go @@ -0,0 +1,155 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "strings" + + "github.com/pingcap/errors" + kvproto "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/tablecodec" + + berrors "github.com/pingcap/br/pkg/errors" + "github.com/pingcap/br/pkg/rtree" + "github.com/pingcap/br/pkg/utils" +) + +const ( + // DefaultMergeRegionSizeBytes is the default region split size, 96MB. + // See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38 + DefaultMergeRegionSizeBytes uint64 = 96 * utils.MB + + // DefaultMergeRegionKeyCount is the default region key count, 960000. + DefaultMergeRegionKeyCount uint64 = 960000 + + writeCFName = "write" + defaultCFName = "default" +) + +// MergeRangesStat holds statistics for the MergeRanges. +type MergeRangesStat struct { + TotalFiles int + TotalWriteCFFile int + TotalDefaultCFFile int + TotalRegions int + RegionKeysAvg int + RegionBytesAvg int + MergedRegions int + MergedRegionKeysAvg int + MergedRegionBytesAvg int +} + +// MergeFileRanges returns ranges of the files are merged based on +// splitSizeBytes and splitKeyCount. +// +// By merging small ranges, it speeds up restoring a backup that contains many +// small ranges (regions) as it reduces split region and scatter region. +func MergeFileRanges( + files []*kvproto.File, splitSizeBytes, splitKeyCount uint64, +) ([]rtree.Range, *MergeRangesStat, error) { + if len(files) == 0 { + return []rtree.Range{}, &MergeRangesStat{}, nil + } + totalBytes := uint64(0) + totalKvs := uint64(0) + totalFiles := len(files) + writeCFFile := 0 + defaultCFFile := 0 + + filesMap := make(map[string][]*kvproto.File) + for _, file := range files { + filesMap[string(file.StartKey)] = append(filesMap[string(file.StartKey)], file) + + // We skips all default cf files because we don't range overlap. + if file.Cf == writeCFName || strings.Contains(file.GetName(), writeCFName) { + writeCFFile++ + } else if file.Cf == defaultCFName || strings.Contains(file.GetName(), defaultCFName) { + defaultCFFile++ + } + totalBytes += file.TotalBytes + totalKvs += file.TotalKvs + } + if writeCFFile == 0 && defaultCFFile == 0 { + return []rtree.Range{}, nil, errors.Annotatef(berrors.ErrRestoreInvalidBackup, + "unknown backup data from neither Wrtie CF nor Default CF") + } + + // RawKV does not have data in write CF. + totalRegions := writeCFFile + if defaultCFFile > writeCFFile { + totalRegions = defaultCFFile + } + + // Check if files are overlapped + rangeTree := rtree.NewRangeTree() + for key := range filesMap { + files := filesMap[key] + if out := rangeTree.InsertRange(rtree.Range{ + StartKey: files[0].GetStartKey(), + EndKey: files[0].GetEndKey(), + Files: files, + }); out != nil { + return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, + "duplicate range %s files %+v", out, files) + } + } + + needMerge := func(left, right *rtree.Range) bool { + leftBytes, leftKeys := left.BytesAndKeys() + rightBytes, rightKeys := right.BytesAndKeys() + if rightBytes == 0 { + return true + } + if leftBytes+rightBytes > splitSizeBytes { + return false + } + if leftKeys+rightKeys > splitKeyCount { + return false + } + // Do not merge ranges in different tables. + if tablecodec.DecodeTableID(kv.Key(left.StartKey)) != tablecodec.DecodeTableID(kv.Key(right.StartKey)) { + return false + } + // Do not merge ranges in different indexes even if they are in the same + // table, as rewrite rule only supports rewriting one pattern. + // tableID, indexID, indexValues, err + _, indexID1, _, err1 := tablecodec.DecodeIndexKey(kv.Key(left.StartKey)) + _, indexID2, _, err2 := tablecodec.DecodeIndexKey(kv.Key(right.StartKey)) + // If both of them are index keys, ... + if err1 == nil && err2 == nil { + // Merge left and right if they are in the same index. + return indexID1 == indexID2 + } + // Otherwise, merge if they are both record keys + return err1 != nil && err2 != nil + } + sortedRanges := rangeTree.GetSortedRanges() + for i := 1; i < len(sortedRanges); { + if !needMerge(&sortedRanges[i-1], &sortedRanges[i]) { + i++ + continue + } + sortedRanges[i-1].EndKey = sortedRanges[i].EndKey + sortedRanges[i-1].Files = append(sortedRanges[i-1].Files, sortedRanges[i].Files...) + // TODO: this is slow when there are lots of ranges need to merge. + sortedRanges = append(sortedRanges[:i], sortedRanges[i+1:]...) + } + + regionBytesAvg := totalBytes / uint64(totalRegions) + regionKeysAvg := totalKvs / uint64(totalRegions) + mergedRegionBytesAvg := totalBytes / uint64(len(sortedRanges)) + mergedRegionKeysAvg := totalKvs / uint64(len(sortedRanges)) + + return sortedRanges, &MergeRangesStat{ + TotalFiles: totalFiles, + TotalWriteCFFile: writeCFFile, + TotalDefaultCFFile: defaultCFFile, + TotalRegions: totalRegions, + RegionKeysAvg: int(regionKeysAvg), + RegionBytesAvg: int(regionBytesAvg), + MergedRegions: len(sortedRanges), + MergedRegionKeysAvg: int(mergedRegionKeysAvg), + MergedRegionBytesAvg: int(mergedRegionBytesAvg), + }, nil +} diff --git a/pkg/restore/merge_test.go b/pkg/restore/merge_test.go new file mode 100644 index 000000000..32c0f5f2e --- /dev/null +++ b/pkg/restore/merge_test.go @@ -0,0 +1,297 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package restore_test + +import ( + "bytes" + "fmt" + "math" + "math/rand" + "testing" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + kvproto "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + + berrors "github.com/pingcap/br/pkg/errors" + "github.com/pingcap/br/pkg/restore" +) + +var _ = Suite(&testMergeRangesSuite{}) + +type testMergeRangesSuite struct{} + +type fileBulder struct { + tableID, startKeyOffset int64 +} + +func (fb *fileBulder) build(tableID, indexID, num, bytes, kv int) (files []*kvproto.File) { + if num != 1 && num != 2 { + panic("num must be 1 or 2") + } + + // Rotate table ID + if fb.tableID != int64(tableID) { + fb.tableID = int64(tableID) + fb.startKeyOffset = 0 + } + + low := codec.EncodeInt(nil, fb.startKeyOffset) + fb.startKeyOffset += 10 + high := codec.EncodeInt(nil, fb.startKeyOffset) + + startKey := tablecodec.EncodeRowKey(fb.tableID, low) + endKey := tablecodec.EncodeRowKey(fb.tableID, high) + if indexID != 0 { + lowVal := types.NewIntDatum(fb.startKeyOffset - 10) + highVal := types.NewIntDatum(fb.startKeyOffset) + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + lowValue, err := codec.EncodeKey(sc, nil, lowVal) + if err != nil { + panic(err) + } + highValue, err := codec.EncodeKey(sc, nil, highVal) + if err != nil { + panic(err) + } + startKey = tablecodec.EncodeIndexSeekKey(int64(tableID), int64(indexID), lowValue) + endKey = tablecodec.EncodeIndexSeekKey(int64(tableID), int64(indexID), highValue) + } + + files = append(files, &kvproto.File{ + Name: fmt.Sprint(rand.Int63n(math.MaxInt64), "_write.sst"), + StartKey: startKey, + EndKey: endKey, + TotalKvs: uint64(kv), + TotalBytes: uint64(bytes), + Cf: "write", + }) + if num == 1 { + return + } + + // To match TiKV's behavior. + files[0].TotalKvs = 0 + files[0].TotalBytes = 0 + files = append(files, &kvproto.File{ + Name: fmt.Sprint(rand.Int63n(math.MaxInt64), "_default.sst"), + StartKey: tablecodec.EncodeRowKey(fb.tableID, low), + EndKey: tablecodec.EncodeRowKey(fb.tableID, high), + TotalKvs: uint64(kv), + TotalBytes: uint64(bytes), + Cf: "default", + }) + return files +} + +func (s *testMergeRangesSuite) TestMergeRanges(c *C) { + type Case struct { + files [][5]int // tableID, indexID num, bytes, kv + merged []int // length of each merged range + stat restore.MergeRangesStat + } + splitSizeBytes := int(restore.DefaultMergeRegionSizeBytes) + splitKeyCount := int(restore.DefaultMergeRegionKeyCount) + cases := []Case{ + // Empty backup. + { + files: [][5]int{}, + merged: []int{}, + stat: restore.MergeRangesStat{TotalRegions: 0, MergedRegions: 0}, + }, + + // Do not merge big range. + { + files: [][5]int{{1, 0, 1, splitSizeBytes, 1}, {1, 0, 1, 1, 1}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + { + files: [][5]int{{1, 0, 1, 1, 1}, {1, 0, 1, splitSizeBytes, 1}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + { + files: [][5]int{{1, 0, 1, 1, splitKeyCount}, {1, 0, 1, 1, 1}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + { + files: [][5]int{{1, 0, 1, 1, 1}, {1, 0, 1, 1, splitKeyCount}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + + // 3 -> 1 + { + files: [][5]int{{1, 0, 1, 1, 1}, {1, 0, 1, 1, 1}, {1, 0, 1, 1, 1}}, + merged: []int{3}, + stat: restore.MergeRangesStat{TotalRegions: 3, MergedRegions: 1}, + }, + // 3 -> 2, size: [split*1/3, split*1/3, split*1/2] -> [split*2/3, split*1/2] + { + files: [][5]int{{1, 0, 1, splitSizeBytes / 3, 1}, {1, 0, 1, splitSizeBytes / 3, 1}, {1, 0, 1, splitSizeBytes / 2, 1}}, + merged: []int{2, 1}, + stat: restore.MergeRangesStat{TotalRegions: 3, MergedRegions: 2}, + }, + // 4 -> 2, size: [split*1/3, split*1/3, split*1/2, 1] -> [split*2/3, split*1/2 +1] + { + files: [][5]int{{1, 0, 1, splitSizeBytes / 3, 1}, {1, 0, 1, splitSizeBytes / 3, 1}, {1, 0, 1, splitSizeBytes / 2, 1}, {1, 0, 1, 1, 1}}, + merged: []int{2, 2}, + stat: restore.MergeRangesStat{TotalRegions: 4, MergedRegions: 2}, + }, + // 5 -> 3, size: [split*1/3, split*1/3, split, split*1/2, 1] -> [split*2/3, split, split*1/2 +1] + { + files: [][5]int{{1, 0, 1, splitSizeBytes / 3, 1}, {1, 0, 1, splitSizeBytes / 3, 1}, {1, 0, 1, splitSizeBytes, 1}, {1, 0, 1, splitSizeBytes / 2, 1}, {1, 0, 1, 1, 1}}, + merged: []int{2, 1, 2}, + stat: restore.MergeRangesStat{TotalRegions: 5, MergedRegions: 3}, + }, + + // Do not merge ranges from different tables + // 2 -> 2, size: [1, 1] -> [1, 1], table ID: [1, 2] -> [1, 2] + { + files: [][5]int{{1, 0, 1, 1, 1}, {2, 0, 1, 1, 1}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + // 3 -> 2, size: [1@split*1/3, 2@split*1/3, 2@split*1/2] -> [1@split*1/3, 2@split*5/6] + { + files: [][5]int{{1, 0, 1, splitSizeBytes / 3, 1}, {2, 0, 1, splitSizeBytes / 3, 1}, {2, 0, 1, splitSizeBytes / 2, 1}}, + merged: []int{1, 2}, + stat: restore.MergeRangesStat{TotalRegions: 3, MergedRegions: 2}, + }, + + // Do not merge ranges from different indexes. + // 2 -> 2, size: [1, 1] -> [1, 1], index ID: [1, 2] -> [1, 2] + { + files: [][5]int{{1, 1, 1, 1, 1}, {1, 2, 1, 1, 1}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + // Index ID out of order. + // 2 -> 2, size: [1, 1] -> [1, 1], index ID: [2, 1] -> [1, 2] + { + files: [][5]int{{1, 2, 1, 1, 1}, {1, 1, 1, 1, 1}}, + merged: []int{1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 2, MergedRegions: 2}, + }, + // 3 -> 3, size: [1, 1, 1] -> [1, 1, 1] + // (table ID, index ID): [(1, 0), (2, 1), (2, 2)] -> [(1, 0), (2, 1), (2, 2)] + { + files: [][5]int{{1, 0, 1, 1, 1}, {2, 1, 1, 1, 1}, {2, 2, 1, 1, 1}}, + merged: []int{1, 1, 1}, + stat: restore.MergeRangesStat{TotalRegions: 3, MergedRegions: 3}, + }, + // 4 -> 3, size: [1, 1, 1, 1] -> [1, 1, 2] + // (table ID, index ID): [(1, 0), (2, 1), (2, 0), (2, 0)] -> [(1, 0), (2, 1), (2, 0)] + { + files: [][5]int{{1, 0, 1, 1, 1}, {2, 1, 1, 1, 1}, {2, 0, 1, 1, 1}, {2, 0, 1, 1, 1}}, + merged: []int{1, 1, 2}, + stat: restore.MergeRangesStat{TotalRegions: 4, MergedRegions: 3}, + }, + // Merge the same table ID and index ID. + // 4 -> 3, size: [1, 1, 1, 1] -> [1, 2, 1] + // (table ID, index ID): [(1, 0), (2, 1), (2, 1), (2, 0)] -> [(1, 0), (2, 1), (2, 0)] + { + files: [][5]int{{1, 0, 1, 1, 1}, {2, 1, 1, 1, 1}, {2, 1, 1, 1, 1}, {2, 0, 1, 1, 1}}, + merged: []int{1, 2, 1}, + stat: restore.MergeRangesStat{TotalRegions: 4, MergedRegions: 3}, + }, + } + + for i, cs := range cases { + files := make([]*kvproto.File, 0) + fb := fileBulder{} + for _, f := range cs.files { + files = append(files, fb.build(f[0], f[1], f[2], f[3], f[4])...) + } + rngs, stat, err := restore.MergeFileRanges( + files, restore.DefaultMergeRegionSizeBytes, restore.DefaultMergeRegionKeyCount) + c.Assert(err, IsNil, Commentf("%+v", cs)) + c.Assert(stat.TotalRegions, Equals, cs.stat.TotalRegions, Commentf("%+v", cs)) + c.Assert(stat.MergedRegions, Equals, cs.stat.MergedRegions, Commentf("%+v", cs)) + + c.Assert(len(rngs), Equals, len(cs.merged), Commentf("case %d", i)) + for i, rg := range rngs { + c.Assert(len(rg.Files), Equals, cs.merged[i], Commentf("%+v", cs)) + // Files range must be in [Range.StartKey, Range.EndKey]. + for _, f := range rg.Files { + c.Assert(bytes.Compare(rg.StartKey, f.StartKey), LessEqual, 0) + c.Assert(bytes.Compare(rg.EndKey, f.EndKey), GreaterEqual, 0) + } + } + } +} + +func (s *testMergeRangesSuite) TestMergeRawKVRanges(c *C) { + files := make([]*kvproto.File, 0) + fb := fileBulder{} + files = append(files, fb.build(1, 0, 2, 1, 1)...) + // RawKV does not have write cf + files = files[1:] + _, stat, err := restore.MergeFileRanges( + files, restore.DefaultMergeRegionSizeBytes, restore.DefaultMergeRegionKeyCount) + c.Assert(err, IsNil) + c.Assert(stat.TotalRegions, Equals, 1) + c.Assert(stat.MergedRegions, Equals, 1) +} + +func (s *testMergeRangesSuite) TestInvalidRanges(c *C) { + files := make([]*kvproto.File, 0) + fb := fileBulder{} + files = append(files, fb.build(1, 0, 1, 1, 1)...) + files[0].Name = "invalid.sst" + files[0].Cf = "invalid" + _, _, err := restore.MergeFileRanges( + files, restore.DefaultMergeRegionSizeBytes, restore.DefaultMergeRegionKeyCount) + c.Assert(err, NotNil) + c.Assert(errors.Cause(err), Equals, berrors.ErrRestoreInvalidBackup) +} + +// Benchmark results on Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz +// +// BenchmarkMergeRanges100-40 9676 114344 ns/op +// BenchmarkMergeRanges1k-40 345 3700739 ns/op +// BenchmarkMergeRanges10k-40 3 414097277 ns/op +// BenchmarkMergeRanges50k-40 1 17258177908 ns/op +// BenchmarkMergeRanges100k-40 1 73403873161 ns/op + +func benchmarkMergeRanges(b *testing.B, filesCount int) { + files := make([]*kvproto.File, 0) + fb := fileBulder{} + for i := 0; i < filesCount; i++ { + files = append(files, fb.build(1, 0, 1, 1, 1)...) + } + var err error + for i := 0; i < b.N; i++ { + _, _, err = restore.MergeFileRanges(files, restore.DefaultMergeRegionSizeBytes, restore.DefaultMergeRegionKeyCount) + if err != nil { + b.Error(err) + } + } +} + +func BenchmarkMergeRanges100(b *testing.B) { + benchmarkMergeRanges(b, 100) +} + +func BenchmarkMergeRanges1k(b *testing.B) { + benchmarkMergeRanges(b, 1000) +} + +func BenchmarkMergeRanges10k(b *testing.B) { + benchmarkMergeRanges(b, 10000) +} + +func BenchmarkMergeRanges50k(b *testing.B) { + benchmarkMergeRanges(b, 50000) +} + +func BenchmarkMergeRanges100k(b *testing.B) { + benchmarkMergeRanges(b, 100000) +} diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 17f695104..c23471527 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -13,7 +13,7 @@ import ( _ "github.com/go-sql-driver/mysql" // mysql driver "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" + kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -38,9 +38,7 @@ var ( // GetRewriteRules returns the rewrite rule of the new table and the old table. func GetRewriteRules( - newTable *model.TableInfo, - oldTable *model.TableInfo, - newTimeStamp uint64, + newTable, oldTable *model.TableInfo, newTimeStamp uint64, ) *RewriteRules { tableIDs := make(map[int64]int64) tableIDs[oldTable.ID] = newTable.ID @@ -94,16 +92,16 @@ func GetRewriteRules( // The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)]. func GetSSTMetaFromFile( id []byte, - file *backup.File, + file *kvproto.File, region *metapb.Region, regionRule *import_sstpb.RewriteRule, ) import_sstpb.SSTMeta { // Get the column family of the file by the file name. var cfName string - if strings.Contains(file.GetName(), "default") { - cfName = "default" - } else if strings.Contains(file.GetName(), "write") { - cfName = "write" + if strings.Contains(file.GetName(), defaultCFName) { + cfName = defaultCFName + } else if strings.Contains(file.GetName(), writeCFName) { + cfName = writeCFName } // Find the overlapped part between the file and the region. // Here we rewrites the keys to compare with the keys of the region. @@ -162,7 +160,7 @@ func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { } // EstimateRangeSize estimates the total range count by file. -func EstimateRangeSize(files []*backup.File) int { +func EstimateRangeSize(files []*kvproto.File) int { result := 0 for _, f := range files { if strings.HasSuffix(f.GetName(), "_write.sst") { @@ -172,32 +170,10 @@ func EstimateRangeSize(files []*backup.File) int { return result } -// ValidateFileRanges checks and returns the ranges of the files. -func ValidateFileRanges( - files []*backup.File, - rewriteRules *RewriteRules, -) ([]rtree.Range, error) { - ranges := make([]rtree.Range, 0, len(files)) - fileAppended := make(map[string]bool) - - for _, file := range files { - // We skips all default cf files because we don't range overlap. - if !fileAppended[file.GetName()] && strings.Contains(file.GetName(), "write") { - rng, err := validateAndGetFileRange(file, rewriteRules) - if err != nil { - return nil, errors.Trace(err) - } - ranges = append(ranges, rng) - fileAppended[file.GetName()] = true - } - } - return ranges, nil -} - // MapTableToFiles makes a map that mapping table ID to its backup files. // aware that one file can and only can hold one table. -func MapTableToFiles(files []*backup.File) map[int64][]*backup.File { - result := map[int64][]*backup.File{} +func MapTableToFiles(files []*kvproto.File) map[int64][]*kvproto.File { + result := map[int64][]*kvproto.File{} for _, file := range files { tableID := tablecodec.DecodeTableID(file.GetStartKey()) tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) @@ -218,11 +194,13 @@ func MapTableToFiles(files []*backup.File) map[int64][]*backup.File { return result } -// GoValidateFileRanges validate files by a stream of tables and yields tables with range. +// GoValidateFileRanges validate files by a stream of tables and yields +// tables with range. func GoValidateFileRanges( ctx context.Context, tableStream <-chan CreatedTable, - fileOfTable map[int64][]*backup.File, + fileOfTable map[int64][]*kvproto.File, + splitSizeBytes, splitKeyCount uint64, errCh chan<- error, ) <-chan TableWithRange { // Could we have a smaller outCh size? @@ -250,14 +228,36 @@ func GoValidateFileRanges( files = append(files, fileOfTable[partition.ID]...) } } - ranges, err := ValidateFileRanges(files, t.RewriteRule) + for _, file := range files { + err := ValidateFileRewriteRule(file, t.RewriteRule) + if err != nil { + errCh <- err + return + } + } + // Merge small ranges to reduce split and scatter regions. + ranges, stat, err := MergeFileRanges( + files, splitSizeBytes, splitKeyCount) if err != nil { errCh <- err return } + log.Info("merge and validate file", + zap.Stringer("database", t.OldTable.DB.Name), + zap.Stringer("table", t.Table.Name), + zap.Int("Files(total)", stat.TotalFiles), + zap.Int("File(write)", stat.TotalWriteCFFile), + zap.Int("File(default)", stat.TotalDefaultCFFile), + zap.Int("Region(total)", stat.TotalRegions), + zap.Int("Regoin(keys avg)", stat.RegionKeysAvg), + zap.Int("Region(bytes avg)", stat.RegionBytesAvg), + zap.Int("Merged(regions)", stat.MergedRegions), + zap.Int("Merged(keys avg)", stat.MergedRegionKeysAvg), + zap.Int("Merged(bytes avg)", stat.MergedRegionBytesAvg)) + tableWithRange := TableWithRange{ CreatedTable: t, - Range: AttachFilesToRanges(files, ranges), + Range: ranges, } log.Debug("sending range info", zap.Stringer("table", t.Table.Name), @@ -271,60 +271,8 @@ func GoValidateFileRanges( return outCh } -// validateAndGetFileRange validates a file, if success, return the key range of this file. -func validateAndGetFileRange(file *backup.File, rules *RewriteRules) (rtree.Range, error) { - err := ValidateFileRewriteRule(file, rules) - if err != nil { - return rtree.Range{}, errors.Trace(err) - } - startID := tablecodec.DecodeTableID(file.GetStartKey()) - endID := tablecodec.DecodeTableID(file.GetEndKey()) - if startID != endID { - log.Error("table ids mismatch", - zap.Int64("startID", startID), - zap.Int64("endID", endID), - logutil.File(file)) - return rtree.Range{}, errors.Annotate(berrors.ErrRestoreTableIDMismatch, "validateAndGetFileRange") - } - r := rtree.Range{StartKey: file.GetStartKey(), EndKey: file.GetEndKey()} - return r, nil -} - -// AttachFilesToRanges attach files to ranges. -// Panic if range is overlapped or no range for files. -// nolint:staticcheck -func AttachFilesToRanges( - files []*backup.File, - ranges []rtree.Range, -) []rtree.Range { - rangeTree := rtree.NewRangeTree() - for _, rg := range ranges { - rangeTree.Update(rg) - } - for _, f := range files { - rg := rangeTree.Find(&rtree.Range{ - StartKey: f.GetStartKey(), - EndKey: f.GetEndKey(), - }) - if rg == nil { - log.Panic("range not found", - zap.Stringer("startKey", logutil.WrapKey(f.GetStartKey())), - zap.Stringer("endKey", logutil.WrapKey(f.GetEndKey()))) - } - file := *f - rg.Files = append(rg.Files, &file) - } - if rangeTree.Len() != len(ranges) { - log.Panic("ranges overlapped", - zap.Int("ranges length", len(ranges)), - zap.Int("tree length", rangeTree.Len())) - } - sortedRanges := rangeTree.GetSortedRanges() - return sortedRanges -} - // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file. -func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) error { +func ValidateFileRewriteRule(file *kvproto.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key _, startRule := rewriteRawKey(file.GetStartKey(), rewriteRules) if rewriteRules != nil && startRule == nil { @@ -361,6 +309,16 @@ func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) erro ) return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "unexpected rewrite rules") } + + startID := tablecodec.DecodeTableID(file.GetStartKey()) + endID := tablecodec.DecodeTableID(file.GetEndKey()) + if startID != endID { + log.Error("table ids mismatch", + zap.Int64("startID", startID), + zap.Int64("endID", endID), + logutil.File(file)) + return errors.Annotate(berrors.ErrRestoreTableIDMismatch, "file start_key end_key table ids mismatch") + } return nil } @@ -436,7 +394,7 @@ func SplitRanges( }) } -func rewriteFileKeys(file *backup.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) { +func rewriteFileKeys(file *kvproto.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) { startID := tablecodec.DecodeTableID(file.GetStartKey()) endID := tablecodec.DecodeTableID(file.GetEndKey()) var rule *import_sstpb.RewriteRule @@ -546,18 +504,18 @@ func (rs rangesSliceObjectMixin) MarshalLogObject(encoder zapcore.ObjectEncoder) } // ParseQuoteName parse the quote `db`.`table` name, and split it. -func ParseQuoteName(name string) (string, string) { +func ParseQuoteName(name string) (db, table string) { names := quoteRegexp.FindAllStringSubmatch(name, -1) if len(names) != 2 { log.Panic("failed to parse schema name", zap.String("origin name", name), zap.Any("parsed names", names)) } - schema := names[0][0] - table := names[1][0] - schema = strings.ReplaceAll(unQuoteName(schema), "``", "`") + db = names[0][0] + table = names[1][0] + db = strings.ReplaceAll(unQuoteName(db), "``", "`") table = strings.ReplaceAll(unQuoteName(table), "``", "`") - return schema, table + return db, table } func unQuoteName(name string) string { diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index 8a41f111c..169aea319 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -95,7 +95,7 @@ func (s *testRestoreUtilSuite) TestMapTableToFiles(c *C) { c.Assert(result[2], DeepEquals, filesOfTable2) } -func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { +func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) { rules := &restore.RewriteRules{ Table: []*import_sstpb.RewriteRule{&import_sstpb.RewriteRule{ OldKeyPrefix: []byte(tablecodec.EncodeTablePrefix(1)), @@ -104,34 +104,34 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { } // Empty start/end key is not allowed. - _, err := restore.ValidateFileRanges( - []*backup.File{&backup.File{ + err := restore.ValidateFileRewriteRule( + &backup.File{ Name: "file_write.sst", StartKey: []byte(""), EndKey: []byte(""), - }}, + }, rules, ) c.Assert(err, ErrorMatches, ".*cannot find rewrite rule.*") // Range is not overlap, no rule found. - _, err = restore.ValidateFileRanges( - []*backup.File{{ + err = restore.ValidateFileRewriteRule( + &backup.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(0), EndKey: tablecodec.EncodeTablePrefix(1), - }}, + }, rules, ) c.Assert(err, ErrorMatches, ".*cannot find rewrite rule.*") // No rule for end key. - _, err = restore.ValidateFileRanges( - []*backup.File{{ + err = restore.ValidateFileRewriteRule( + &backup.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(1), EndKey: tablecodec.EncodeTablePrefix(2), - }}, + }, rules, ) c.Assert(err, ErrorMatches, ".*cannot find rewrite rule.*") @@ -141,12 +141,12 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { OldKeyPrefix: tablecodec.EncodeTablePrefix(2), NewKeyPrefix: tablecodec.EncodeTablePrefix(3), }) - _, err = restore.ValidateFileRanges( - []*backup.File{{ + err = restore.ValidateFileRewriteRule( + &backup.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(1), EndKey: tablecodec.EncodeTablePrefix(2), - }}, + }, rules, ) c.Assert(err, ErrorMatches, ".*restore table ID mismatch") @@ -156,12 +156,12 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { OldKeyPrefix: tablecodec.EncodeTablePrefix(2), NewKeyPrefix: tablecodec.EncodeTablePrefix(1), }) - _, err = restore.ValidateFileRanges( - []*backup.File{{ + err = restore.ValidateFileRewriteRule( + &backup.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(1), EndKey: tablecodec.EncodeTablePrefix(2), - }}, + }, rules, ) c.Assert(err, ErrorMatches, ".*unexpected rewrite rules.*") diff --git a/pkg/rtree/rtree.go b/pkg/rtree/rtree.go index cc33e6700..43f60bfac 100644 --- a/pkg/rtree/rtree.go +++ b/pkg/rtree/rtree.go @@ -26,7 +26,20 @@ func (rg *Range) String() string { return fmt.Sprintf("[%x %x]", rg.StartKey, rg.EndKey) } +<<<<<<< HEAD // Intersect returns? +======= +// BytesAndKeys returns total bytes and keys in a range. +func (rg *Range) BytesAndKeys() (bytes, keys uint64) { + for _, f := range rg.Files { + bytes += f.TotalBytes + keys += f.TotalKvs + } + return +} + +// Intersect returns intersect range in the tree. +>>>>>>> 1a527fb... restore: merge small ranges (#578) func (rg *Range) Intersect( start, end []byte, ) (subStart, subEnd []byte, isIntersect bool) { diff --git a/pkg/task/backup_test.go b/pkg/task/backup_test.go index 6bd60515b..d511b066d 100644 --- a/pkg/task/backup_test.go +++ b/pkg/task/backup_test.go @@ -1,3 +1,5 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + package task import ( diff --git a/pkg/task/restore.go b/pkg/task/restore.go index cca8d80e0..a8dd93724 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -29,41 +29,98 @@ const ( flagOnline = "online" flagNoSchema = "no-schema" + // FlagMergeRegionSizeBytes is the flag name of merge small regions by size + FlagMergeRegionSizeBytes = "merge-region-size-bytes" + // FlagMergeRegionKeyCount is the flag name of merge small regions by key count + FlagMergeRegionKeyCount = "merge-region-key-count" + defaultRestoreConcurrency = 128 maxRestoreBatchSizeLimit = 10240 defaultDDLConcurrency = 16 ) +// RestoreCommonConfig is the common configuration for all BR restore tasks. +type RestoreCommonConfig struct { + Online bool `json:"online" toml:"online"` + + // MergeSmallRegionSizeBytes is the threshold of merging small regions (Default 96MB, region split size). + // MergeSmallRegionKeyCount is the threshold of merging smalle regions (Default 960_000, region split key count). + // See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38 + MergeSmallRegionSizeBytes uint64 `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` + MergeSmallRegionKeyCount uint64 `json:"merge-region-key-count" toml:"merge-region-key-count"` +} + +// adjust adjusts the abnormal config value in the current config. +// useful when not starting BR from CLI (e.g. from BRIE in SQL). +func (cfg *RestoreCommonConfig) adjust() { + if cfg.MergeSmallRegionKeyCount == 0 { + cfg.MergeSmallRegionKeyCount = restore.DefaultMergeRegionKeyCount + } + if cfg.MergeSmallRegionSizeBytes == 0 { + cfg.MergeSmallRegionSizeBytes = restore.DefaultMergeRegionSizeBytes + } +} + +// DefineRestoreCommonFlags defines common flags for the restore command. +func DefineRestoreCommonFlags(flags *pflag.FlagSet) { + // TODO remove experimental tag if it's stable + flags.Bool(flagOnline, false, "(experimental) Whether online when restore") + + flags.Uint64(FlagMergeRegionSizeBytes, restore.DefaultMergeRegionSizeBytes, + "the threshold of merging small regions (Default 96MB, region split size)") + flags.Uint64(FlagMergeRegionKeyCount, restore.DefaultMergeRegionKeyCount, + "the threshold of merging smalle regions (Default 960_000, region split key count)") + _ = flags.MarkHidden(FlagMergeRegionSizeBytes) + _ = flags.MarkHidden(FlagMergeRegionKeyCount) +} + +// ParseFromFlags parses the config from the flag set. +func (cfg *RestoreCommonConfig) ParseFromFlags(flags *pflag.FlagSet) error { + var err error + cfg.Online, err = flags.GetBool(flagOnline) + if err != nil { + return errors.Trace(err) + } + cfg.MergeSmallRegionKeyCount, err = flags.GetUint64(FlagMergeRegionKeyCount) + if err != nil { + return errors.Trace(err) + } + cfg.MergeSmallRegionSizeBytes, err = flags.GetUint64(FlagMergeRegionSizeBytes) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(err) +} + // RestoreConfig is the configuration specific for restore tasks. type RestoreConfig struct { Config + RestoreCommonConfig - Online bool `json:"online" toml:"online"` NoSchema bool `json:"no-schema" toml:"no-schema"` } -// DefineRestoreFlags defines common flags for the restore command. +// DefineRestoreFlags defines common flags for the restore tidb command. func DefineRestoreFlags(flags *pflag.FlagSet) { - // TODO remove experimental tag if it's stable - flags.Bool(flagOnline, false, "(experimental) Whether online when restore") flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones") - // Do not expose this flag _ = flags.MarkHidden(flagNoSchema) + + DefineRestoreCommonFlags(flags) } // ParseFromFlags parses the restore-related flags from the flag set. func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { var err error - cfg.Online, err = flags.GetBool(flagOnline) + cfg.NoSchema, err = flags.GetBool(flagNoSchema) if err != nil { return errors.Trace(err) } - cfg.NoSchema, err = flags.GetBool(flagNoSchema) + err = cfg.Config.ParseFromFlags(flags) if err != nil { return errors.Trace(err) } - err = cfg.Config.ParseFromFlags(flags) + err = cfg.RestoreCommonConfig.ParseFromFlags(flags) if err != nil { return errors.Trace(err) } @@ -79,7 +136,8 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { // we should set proper value in this function. // so that both binary and TiDB will use same default value. func (cfg *RestoreConfig) adjustRestoreConfig() { - cfg.adjust() + cfg.Config.adjust() + cfg.RestoreCommonConfig.adjust() if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency @@ -137,6 +195,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Trace(err) } g.Record("Size", utils.ArchiveSize(backupMeta)) + if err = client.InitBackupMeta(backupMeta, u); err != nil { return errors.Trace(err) } @@ -227,7 +286,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf tableFileMap := restore.MapTableToFiles(files) log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) - rangeStream := restore.GoValidateFileRanges(ctx, tableStream, tableFileMap, errCh) + rangeStream := restore.GoValidateFileRanges( + ctx, tableStream, tableFileMap, cfg.MergeSmallRegionKeyCount, cfg.MergeSmallRegionKeyCount, errCh) rangeSize := restore.EstimateRangeSize(files) summary.CollectInt("restore ranges", rangeSize) diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index b8f109ad9..67fbd0512 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -20,8 +20,7 @@ import ( // RestoreRawConfig is the configuration specific for raw kv restore tasks. type RestoreRawConfig struct { RawKvConfig - - Online bool `json:"online" toml:"online"` + RestoreCommonConfig } // DefineRawRestoreFlags defines common flags for the backup command. @@ -31,9 +30,7 @@ func DefineRawRestoreFlags(command *cobra.Command) { command.Flags().StringP(flagStartKey, "", "", "restore raw kv start key, key is inclusive") command.Flags().StringP(flagEndKey, "", "", "restore raw kv end key, key is exclusive") - command.Flags().Bool(flagOnline, false, "Whether online when restore") - // TODO remove hidden flag if it's stable - _ = command.Flags().MarkHidden(flagOnline) + DefineRestoreCommonFlags(command.PersistentFlags()) } // ParseFromFlags parses the backup-related flags from the flag set. @@ -43,11 +40,16 @@ func (cfg *RestoreRawConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + err = cfg.RestoreCommonConfig.ParseFromFlags(flags) + if err != nil { + return errors.Trace(err) + } return cfg.RawKvConfig.ParseFromFlags(flags) } func (cfg *RestoreRawConfig) adjust() { cfg.Config.adjust() + cfg.RestoreCommonConfig.adjust() if cfg.Concurrency == 0 { cfg.Concurrency = defaultRestoreConcurrency @@ -108,7 +110,8 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } summary.CollectInt("restore files", len(files)) - ranges, err := restore.ValidateFileRanges(files, nil) + ranges, _, err := restore.MergeFileRanges( + files, cfg.MergeSmallRegionKeyCount, cfg.MergeSmallRegionKeyCount) if err != nil { return errors.Trace(err) } @@ -122,7 +125,9 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR int64(len(ranges)+len(files)), !cfg.LogProgress) - err = restore.SplitRanges(ctx, client, ranges, nil, updateCh) + // RawKV restore does not need to rewrite keys. + rewrite := &restore.RewriteRules{} + err = restore.SplitRanges(ctx, client, ranges, rewrite, updateCh) if err != nil { return errors.Trace(err) } diff --git a/pkg/task/restore_test.go b/pkg/task/restore_test.go new file mode 100644 index 000000000..48f49720b --- /dev/null +++ b/pkg/task/restore_test.go @@ -0,0 +1,23 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package task + +import ( + . "github.com/pingcap/check" + + "github.com/pingcap/br/pkg/restore" +) + +type testRestoreSuite struct{} + +var _ = Suite(&testRestoreSuite{}) + +func (s *testRestoreSuite) TestRestoreConfigAdjust(c *C) { + cfg := &RestoreConfig{} + cfg.adjustRestoreConfig() + + c.Assert(cfg.Config.Concurrency, Equals, uint32(defaultRestoreConcurrency)) + c.Assert(cfg.Config.SwitchModeInterval, Equals, defaultSwitchInterval) + c.Assert(cfg.MergeSmallRegionKeyCount, Equals, restore.DefaultMergeRegionKeyCount) + c.Assert(cfg.MergeSmallRegionSizeBytes, Equals, restore.DefaultMergeRegionSizeBytes) +} diff --git a/tests/_utils/run_services b/tests/_utils/run_services index 3e9b71054..b03324f54 100644 --- a/tests/_utils/run_services +++ b/tests/_utils/run_services @@ -98,6 +98,7 @@ start_services_impl() { -A "$TIKV_ADDR$i" \ --status-addr "$TIKV_STATUS_ADDR$i" \ --log-file "$TEST_DIR/tikv${i}.log" \ + --log-level debug \ -C "$TIKV_CONFIG" \ -s "$TEST_DIR/tikv${i}" & done @@ -204,6 +205,7 @@ start_services_with_tls() { --pd "$PD_ADDR" \ -A "$TIKV_ADDR$i" \ --log-file "$TEST_DIR/tikv${i}.log" \ + --log-level debug \ -C "$TIKV_CONFIG" \ -s "$TEST_DIR/tikv${i}" & done diff --git a/tests/br_debug_meta/run.sh b/tests/br_debug_meta/run.sh index 8dc3ef5a3..181bf3e2d 100644 --- a/tests/br_debug_meta/run.sh +++ b/tests/br_debug_meta/run.sh @@ -15,24 +15,24 @@ set -eu DB="$TEST_NAME" -TABLE="usertable1" +TABLE="usertable" run_sql "CREATE DATABASE $DB;" +go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB -run_sql "CREATE TABLE $DB.$TABLE( \ - YCSB_KEY varchar(64) NOT NULL, \ - FIELD0 varchar(1) DEFAULT NULL, \ - PRIMARY KEY (YCSB_KEY) \ -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" - -run_sql "INSERT INTO $DB.$TABLE VALUES (\"a\", \"b\");" -run_sql "INSERT INTO $DB.$TABLE VALUES (\"aa\", \"b\");" +table_region_sql="SELECT COUNT(*) FROM information_schema.tikv_region_status WHERE db_name = '$DB' AND table_name = '$TABLE';" +for i in $(seq 10); do + regioncount=$(run_sql "$table_region_sql" | awk '/COUNT/{print $2}') + [ $regioncount -ge 5 ] && break + sleep 3 +done +run_sql "$table_region_sql" row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') # backup table echo "backup start..." -run_br --pd $PD_ADDR backup table --db $DB --table usertable1 -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 +run_br --pd $PD_ADDR backup table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 run_sql "DROP DATABASE $DB;" @@ -59,7 +59,7 @@ mv "$TEST_DIR/$DB/backupmeta_from_json" "$TEST_DIR/$DB/backupmeta" # restore table echo "restore start..." -run_br --pd $PD_ADDR restore table --db $DB --table usertable1 -s "local://$TEST_DIR/$DB" +run_br --pd $PD_ADDR restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB" row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') diff --git a/tests/br_debug_meta/workload b/tests/br_debug_meta/workload new file mode 100644 index 000000000..bea666067 --- /dev/null +++ b/tests/br_debug_meta/workload @@ -0,0 +1,12 @@ +recordcount=10000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index d9d444478..0dd2442f1 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -18,6 +18,8 @@ DB="$TEST_NAME" TABLE="usertable" DDL_COUNT=10 LOG=/$TEST_DIR/backup.log +BACKUP_STAT=/$TEST_DIR/backup_stat +RESOTRE_STAT=/$TEST_DIR/restore_stat run_sql "CREATE DATABASE $DB;" go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB @@ -61,7 +63,7 @@ run_sql "analyze table $DB.$TABLE;" # } # ] # } -curl $TIDB_IP:10080/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > backup_stats +curl $TIDB_IP:10080/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > $BACKUP_STAT # backup full echo "backup start..." @@ -92,7 +94,7 @@ fi # restore full echo "restore start..." export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/PDEnabledPauseConfig=return(true)" -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --log-file $LOG +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --log-file $LOG || cat $LOG export GO_FAILPOINTS="" pause_count=$(cat $LOG | grep "pause configs successful"| wc -l | xargs) @@ -115,15 +117,15 @@ if [ "${skip_count}" -gt "2" ];then exit 1 fi -curl $TIDB_IP:10080/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > restore_stats +curl $TIDB_IP:10080/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > $RESOTRE_STAT -if diff -q backup_stats restore_stats > /dev/null +if diff -q $BACKUP_STAT $RESOTRE_STAT > /dev/null then echo "stats are equal" else echo "TEST: [$TEST_NAME] fail due to stats are not equal" - cat $backup_stats | head 1000 - cat $restore_stats | head 1000 + cat $BACKUP_STAT | head 1000 + cat $RESOTRE_STAT | head 1000 exit 1 fi diff --git a/tests/br_other/run.sh b/tests/br_other/run.sh index ed3b26af1..e769e3cff 100644 --- a/tests/br_other/run.sh +++ b/tests/br_other/run.sh @@ -42,8 +42,8 @@ run_br -s "local://$TEST_DIR/$DB" debug decode --field "EndVersion" run_br -s "local://$TEST_DIR/$DB" validate decode --field "end-version" # Test validate backupmeta -run_br validate backupmeta -s "local://$TEST_DIR/$DB" -run_br validate backupmeta -s "local://$TEST_DIR/$DB" --offset 100 +run_br debug backupmeta validate -s "local://$TEST_DIR/$DB" +run_br debug backupmeta validate -s "local://$TEST_DIR/$DB" --offset 100 # Test validate checksum run_br validate checksum -s "local://$TEST_DIR/$DB" From b3a7e102718adbcfb6a62ddeb1efdb2896e286a9 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 25 Dec 2020 15:33:52 +0800 Subject: [PATCH 2/4] resolve conflict Signed-off-by: Neil Shen --- pkg/rtree/rtree.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/rtree/rtree.go b/pkg/rtree/rtree.go index 43f60bfac..eb675cc7a 100644 --- a/pkg/rtree/rtree.go +++ b/pkg/rtree/rtree.go @@ -26,9 +26,6 @@ func (rg *Range) String() string { return fmt.Sprintf("[%x %x]", rg.StartKey, rg.EndKey) } -<<<<<<< HEAD -// Intersect returns? -======= // BytesAndKeys returns total bytes and keys in a range. func (rg *Range) BytesAndKeys() (bytes, keys uint64) { for _, f := range rg.Files { @@ -39,7 +36,6 @@ func (rg *Range) BytesAndKeys() (bytes, keys uint64) { } // Intersect returns intersect range in the tree. ->>>>>>> 1a527fb... restore: merge small ranges (#578) func (rg *Range) Intersect( start, end []byte, ) (subStart, subEnd []byte, isIntersect bool) { From 46a36525def93f859681ede096555ae0ad39ec44 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 25 Dec 2020 16:37:26 +0800 Subject: [PATCH 3/4] integration_test: fix br_incremental_index test not stable (#468) (#658) * cherry pick #468 to release-4.0 Signed-off-by: ti-srebot --- pkg/backup/client.go | 29 ----------------------------- pkg/task/backup.go | 4 ---- tests/br_incremental_index/run.sh | 2 +- 3 files changed, 1 insertion(+), 34 deletions(-) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 052d19e98..4a3949a9e 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -985,35 +985,6 @@ func CollectChecksums(backupMeta *kvproto.BackupMeta) ([]Checksum, error) { return checksums, nil } -// FilterSchema filter in-place schemas that doesn't have backup files -// this is useful during incremental backup, no files in backup means no files to restore -// so we can skip some DDL in restore to speed up restoration. -func FilterSchema(backupMeta *kvproto.BackupMeta) error { - dbs, err := utils.LoadBackupTables(backupMeta) - if err != nil { - return errors.Trace(err) - } - schemas := make([]*kvproto.Schema, 0, len(backupMeta.Schemas)) - for _, schema := range backupMeta.Schemas { - dbInfo := &model.DBInfo{} - err := json.Unmarshal(schema.Db, dbInfo) - if err != nil { - return errors.Trace(err) - } - tblInfo := &model.TableInfo{} - err = json.Unmarshal(schema.Table, tblInfo) - if err != nil { - return errors.Trace(err) - } - tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String()) - if len(tbl.Files) > 0 { - schemas = append(schemas, schema) - } - } - backupMeta.Schemas = schemas - return nil -} - // isRetryableError represents whether we should retry reset grpc connection. func isRetryableError(err error) bool { return status.Code(err) == codes.Unavailable || status.Code(err) == codes.Canceled diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 7c9ac7882..169580b4b 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -342,10 +342,6 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if isIncrementalBackup { // Since we don't support checksum for incremental data, fast checksum should be skipped. log.Info("Skip fast checksum in incremental backup") - err = backup.FilterSchema(&backupMeta) - if err != nil { - return errors.Trace(err) - } } else { // When user specified not to calculate checksum, don't calculate checksum. log.Info("Skip fast checksum because user requirement.") diff --git a/tests/br_incremental_index/run.sh b/tests/br_incremental_index/run.sh index f4b4b9de7..9955bf679 100755 --- a/tests/br_incremental_index/run.sh +++ b/tests/br_incremental_index/run.sh @@ -31,7 +31,7 @@ done # full backup echo "backup full start..." -run_sql "CREATE INDEX idx_c1 ON ${DB}.${TABLE}(c1)" & +run_sql "CREATE INDEX idx_c1 ON ${DB}.${TABLE}(c1)" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/full" --ratelimit 5 --concurrency 4 wait # run ddls From d99771d8fedf28281336c0c7a0b267c3bdbd84b5 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 5 Mar 2021 17:56:22 +0800 Subject: [PATCH 4/4] fix curl Signed-off-by: Neil Shen --- tests/br_full_ddl/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 579b3be7c..f23548c8d 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -63,7 +63,7 @@ run_sql "analyze table $DB.$TABLE;" # } # ] # } -curl $TIDB_IP:10080/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > $BACKUP_STAT +run_curl https://$TIDB_STATUS_ADDR/stats/dump/$DB/$TABLE | jq '.columns.field0 | del(.last_update_version)' > $BACKUP_STAT # backup full echo "backup start with stats..." @@ -96,7 +96,7 @@ fi echo "restore full without stats..." run_br restore full -s "local://$TEST_DIR/${DB}_disable_stats" --pd $PD_ADDR -curl $TIDB_IP:10080/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > $RESOTRE_STAT +run_curl https://$TIDB_STATUS_ADDR/stats/dump/$DB/$TABLE | jq '.columns.field0' | jq 'del(.last_update_version)' > $RESOTRE_STAT # stats should not be equal because we disable stats by default. if diff -q $BACKUP_STAT $RESOTRE_STAT > /dev/null