Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: merge range in lightweight when snapshot restore #50648

Merged
merged 4 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ go_test(
"import_retry_test.go",
"log_client_test.go",
"main_test.go",
"merge_fuzz_test.go",
"merge_test.go",
"range_test.go",
"rawkv_client_test.go",
Expand Down
59 changes: 1 addition & 58 deletions br/pkg/restore/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ import (

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
)

const (
Expand All @@ -33,47 +29,6 @@ type MergeRangesStat struct {
MergedRegionBytesAvg int
}

// NeedsMerge checks whether two ranges needs to be merged.
func NeedsMerge(left, right *rtree.Range, splitSizeBytes, splitKeyCount uint64) bool {
leftBytes, leftKeys := left.BytesAndKeys()
rightBytes, rightKeys := right.BytesAndKeys()
if rightBytes == 0 {
return true
}
if leftBytes+rightBytes > splitSizeBytes {
return false
}
if leftKeys+rightKeys > splitKeyCount {
return false
}
tableID1, indexID1, isRecord1, err1 := tablecodec.DecodeKeyHead(kv.Key(left.StartKey))
tableID2, indexID2, isRecord2, err2 := tablecodec.DecodeKeyHead(kv.Key(right.StartKey))

// Failed to decode the file key head... can this happen?
if err1 != nil || err2 != nil {
log.Warn("Failed to parse the key head for merging files, skipping",
logutil.Key("left-start-key", left.StartKey),
logutil.Key("right-start-key", right.StartKey),
logutil.AShortError("left-err", err1),
logutil.AShortError("right-err", err2),
)
return false
}
// Merge if they are both record keys
if isRecord1 && isRecord2 {
// Do not merge ranges in different tables.
return tableID1 == tableID2
}
// If they are all index keys...
if !isRecord1 && !isRecord2 {
// Do not merge ranges in different indexes even if they are in the same
// table, as rewrite rule only supports rewriting one pattern.
// Merge left and right if they are in the same index.
return tableID1 == tableID2 && indexID1 == indexID2
}
return false
}

// MergeFileRanges returns ranges of the files are merged based on
// splitSizeBytes and splitKeyCount.
//
Expand Down Expand Up @@ -134,19 +89,7 @@ func MergeFileRanges(
}
}

sortedRanges := rangeTree.GetSortedRanges()
for i := 1; i < len(sortedRanges); {
if !NeedsMerge(&sortedRanges[i-1], &sortedRanges[i], splitSizeBytes, splitKeyCount) {
i++
continue
}
sortedRanges[i-1].EndKey = sortedRanges[i].EndKey
sortedRanges[i-1].Size += sortedRanges[i].Size
sortedRanges[i-1].Files = append(sortedRanges[i-1].Files, sortedRanges[i].Files...)
// TODO: this is slow when there are lots of ranges need to merge.
sortedRanges = append(sortedRanges[:i], sortedRanges[i+1:]...)
}

sortedRanges := rangeTree.MergedRanges(splitSizeBytes, splitKeyCount)
regionBytesAvg := totalBytes / uint64(totalRegions)
regionKeysAvg := totalKvs / uint64(totalRegions)
mergedRegionBytesAvg := totalBytes / uint64(len(sortedRanges))
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
deps = [
"//br/pkg/logutil",
"//br/pkg/redact",
"//pkg/kv",
"//pkg/tablecodec",
"@com_github_google_btree//:btree",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
Expand All @@ -25,13 +27,16 @@ go_test(
srcs = [
"logging_test.go",
"main_test.go",
"merge_fuzz_test.go",
"rtree_test.go",
],
flaky = True,
race = "on",
shard_count = 3,
shard_count = 4,
deps = [
":rtree",
"//pkg/kv",
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
//go:build go1.18
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package restore_test
package rtree_test

import (
"testing"

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/tablecodec"
)
Expand All @@ -19,6 +17,6 @@ func FuzzMerge(f *testing.F) {
f.Fuzz(func(t *testing.T, a, b []byte) {
left := rtree.Range{StartKey: a, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}}
right := rtree.Range{StartKey: b, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}}
restore.NeedsMerge(&left, &right, 42, 42)
rtree.NeedsMerge(&left, &right, 42, 42)
})
}
65 changes: 65 additions & 0 deletions br/pkg/rtree/rtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
)

// Range represents a backup response.
Expand Down Expand Up @@ -74,6 +76,47 @@ func (rg *Range) Less(than btree.Item) bool {
return bytes.Compare(rg.StartKey, ta.StartKey) < 0
}

// NeedsMerge checks whether two ranges needs to be merged.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

directly move from br/pkg/restore/merge.go

func NeedsMerge(left, right *Range, splitSizeBytes, splitKeyCount uint64) bool {
leftBytes, leftKeys := left.BytesAndKeys()
rightBytes, rightKeys := right.BytesAndKeys()
if rightBytes == 0 {
return true
}
if leftBytes+rightBytes > splitSizeBytes {
return false
}
if leftKeys+rightKeys > splitKeyCount {
return false
}
tableID1, indexID1, isRecord1, err1 := tablecodec.DecodeKeyHead(kv.Key(left.StartKey))
tableID2, indexID2, isRecord2, err2 := tablecodec.DecodeKeyHead(kv.Key(right.StartKey))

// Failed to decode the file key head... can this happen?
if err1 != nil || err2 != nil {
log.Warn("Failed to parse the key head for merging files, skipping",
logutil.Key("left-start-key", left.StartKey),
logutil.Key("right-start-key", right.StartKey),
logutil.AShortError("left-err", err1),
logutil.AShortError("right-err", err2),
)
return false
}
// Merge if they are both record keys
if isRecord1 && isRecord2 {
// Do not merge ranges in different tables.
return tableID1 == tableID2
}
// If they are all index keys...
if !isRecord1 && !isRecord2 {
// Do not merge ranges in different indexes even if they are in the same
// table, as rewrite rule only supports rewriting one pattern.
// Merge left and right if they are in the same index.
return tableID1 == tableID2 && indexID1 == indexID2
}
return false
}

var _ btree.Item = &Range{}

// RangeTree is sorted tree for Ranges.
Expand Down Expand Up @@ -165,6 +208,28 @@ func (rangeTree *RangeTree) InsertRange(rg Range) *Range {
return out.(*Range)
}

// MergedRanges output the sortedRanges having merged according to given `splitSizeBytes` and `splitKeyCount`.
func (rangeTree *RangeTree) MergedRanges(splitSizeBytes, splitKeyCount uint64) []Range {
var mergeTargetIndex int = -1
sortedRanges := make([]Range, 0, rangeTree.Len())
rangeTree.Ascend(func(item btree.Item) bool {
rg := item.(*Range)
if mergeTargetIndex < 0 || !NeedsMerge(&sortedRanges[mergeTargetIndex], rg, splitSizeBytes, splitKeyCount) {
// unintialized or the sortedRanges[mergeTargetIndex] does not need to merged
mergeTargetIndex += 1
sortedRanges = append(sortedRanges, *rg)
} else {
// need to merge from rg to sortedRages[mergeTargetIndex]
sortedRanges[mergeTargetIndex].EndKey = rg.EndKey
sortedRanges[mergeTargetIndex].Size += rg.Size
sortedRanges[mergeTargetIndex].Files = append(sortedRanges[mergeTargetIndex].Files, rg.Files...)
}

return true
})
return sortedRanges
}

// GetSortedRanges collects and returns sorted ranges.
func (rangeTree *RangeTree) GetSortedRanges() []Range {
sortedRanges := make([]Range, 0, rangeTree.Len())
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/rtree/rtree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"fmt"
"testing"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -180,3 +183,40 @@ func BenchmarkRangeTreeUpdate(b *testing.B) {
rangeTree.Update(item)
}
}

func encodeTableRecord(prefix kv.Key, rowID uint64) []byte {
return tablecodec.EncodeRecordKey(prefix, kv.IntHandle(rowID))
}

func TestRangeTreeMerge(t *testing.T) {
rangeTree := rtree.NewRangeTree()
tablePrefix := tablecodec.GenTableRecordPrefix(1)
for i := uint64(0); i < 10000; i += 1 {
item := rtree.Range{
StartKey: encodeTableRecord(tablePrefix, i),
EndKey: encodeTableRecord(tablePrefix, i+1),
Files: []*backuppb.File{
{
Name: fmt.Sprintf("%20d", i),
TotalKvs: 1,
TotalBytes: 1,
},
},
Size: i,
}
rangeTree.Update(item)
}
sortedRanges := rangeTree.MergedRanges(10, 10)
require.Equal(t, 1000, len(sortedRanges))
for i, rg := range sortedRanges {
require.Equal(t, encodeTableRecord(tablePrefix, uint64(i)*10), rg.StartKey)
require.Equal(t, encodeTableRecord(tablePrefix, uint64(i+1)*10), rg.EndKey)
require.Equal(t, uint64(i*10*10+45), rg.Size)
require.Equal(t, 10, len(rg.Files))
for j, file := range rg.Files {
require.Equal(t, fmt.Sprintf("%20d", i*10+j), file.Name)
require.Equal(t, uint64(1), file.TotalKvs)
require.Equal(t, uint64(1), file.TotalBytes)
}
}
}
Loading