Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: load region info from pd in batch #51333

Merged
merged 20 commits into from
Mar 1, 2024
2 changes: 1 addition & 1 deletion pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tb
netDataSize: e.dataAvgRowSize * float64(len(handles)),
}
tableReaderExec.buildVirtualColumnInfo()
// Reorder handles because SplitKeyRangesByLocations() requires startKey of kvRanges is ordered.
// Reorder handles because SplitKeyRangesByLocationsWith/WithoutBuckets() requires startKey of kvRanges is ordered.
// Also it's good for performance.
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 28,
shard_count = 29,
deps = [
"//pkg/kv",
"//pkg/store/driver/backoff",
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func buildBatchCopTasksConsistentHash(

for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -904,7 +904,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
rangesLen = 0
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1430,7 +1430,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
splitKeyStart := time.Now()
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
86 changes: 85 additions & 1 deletion pkg/store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,87 @@ func TestBuildTasksByBuckets(t *testing.T) {
}
}

func TestSplitRegionRanges(t *testing.T) {
func TestSplitKeyRangesByLocationsWithoutBuckets(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

locRanges, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 1)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "c")

locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0)
require.NoError(t, err)
require.Len(t, locRanges, 0)

locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 3)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n")
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "n", "t")
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "t", "y")

locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "n")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 1)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n")

locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 1)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "s", "s")

// min --> max
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 4)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g")
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n")
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t")
rangeEqual(t, locRanges[3].Ranges.ToRanges(), "t", "z")

locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3)
require.NoError(t, err)
require.Len(t, locRanges, 3)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g")
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n")
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t")

// many range
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 4)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g")
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n")
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t")
rangeEqual(t, locRanges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z")

locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, locRanges, 4)
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "b", "g")
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "h", "m", "n")
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t")
rangeEqual(t, locRanges[3].Ranges.ToRanges(), "v", "w")
}

func TestSplitKeyRanges(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
Expand All @@ -386,6 +466,10 @@ func TestSplitRegionRanges(t *testing.T) {
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "a", "c")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), 0)
require.NoError(t, err)
require.Len(t, ranges, 0)

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 3)
Expand Down
9 changes: 9 additions & 0 deletions pkg/store/copr/key_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ func (r *KeyRanges) Split(key []byte) (*KeyRanges, *KeyRanges) {
return r.Slice(0, n), r.Slice(n, r.Len())
}

// ToRanges converts ranges to []kv.KeyRange.
func (r *KeyRanges) ToRanges() []kv.KeyRange {
ranges := make([]kv.KeyRange, 0, r.Len())
r.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
return ranges
}

// ToPBRanges converts ranges to wire type.
func (r *KeyRanges) ToPBRanges() []*coprocessor.KeyRange {
ranges := make([]*coprocessor.KeyRange, 0, r.Len())
Expand Down
123 changes: 89 additions & 34 deletions pkg/store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand All @@ -46,7 +47,7 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache {
func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) {
ranges := NewKeyRanges(keyRanges)

locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit)
locations, err := c.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, limit)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down Expand Up @@ -123,11 +124,49 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges {
return res
}

func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *KeyRanges, res []*LocationKeyRanges) ([]*LocationKeyRanges, *KeyRanges, bool) {
// Iterate to the first range that is not complete in the region.
var r kv.KeyRange
var i int
for ; i < ranges.Len(); i++ {
r = ranges.At(i)
if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) {
break
}
}
// All rest ranges belong to the same region.
if i == ranges.Len() {
res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges})
return res, ranges, true
}
if loc.Contains(r.StartKey) {
// Part of r is not in the region. We need to split it.
taskRanges := ranges.Slice(0, i)
taskRanges.last = &kv.KeyRange{
StartKey: r.StartKey,
EndKey: loc.EndKey,
}
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
ranges = ranges.Slice(i+1, ranges.Len())
ranges.first = &kv.KeyRange{
StartKey: loc.EndKey,
EndKey: r.EndKey,
}
} else {
// rs[i] is not in the region.
taskRanges := ranges.Slice(0, i)
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
ranges = ranges.Slice(i, ranges.Len())
}
return res, ranges, false
}

// UnspecifiedLimit means no limit.
const UnspecifiedLimit = -1

// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache.
func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
// SplitKeyRangesByLocationsWithBuckets splits the KeyRanges by logical info in the cache.
// The buckets in the returned LocationKeyRanges are not empty if the region is split by bucket.
func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
res := make([]*LocationKeyRanges, 0)
for ranges.Len() > 0 {
if limit != UnspecifiedLimit && len(res) >= limit {
Expand All @@ -138,43 +177,59 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges
return res, derr.ToTiDBErr(err)
}

// Iterate to the first range that is not complete in the region.
var r kv.KeyRange
var i int
for ; i < ranges.Len(); i++ {
r = ranges.At(i)
if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) {
break
}
isBreak := false
res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res)
if isBreak {
break
}
// All rest ranges belong to the same region.
if i == ranges.Len() {
res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges})
}

return res, nil
}

// SplitKeyRangesByLocationsWithoutBuckets splits the KeyRanges by logical info in the cache.
// The buckets in the returned LocationKeyRanges are empty, regardless of whether the region is split by bucket.
func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
if limit == 0 || ranges.Len() <= 0 {
return nil, nil
}
// Currently, LocationKeyRanges returned by `LocateKeyRange` doesn't contains buckets,
// because of https://github.com/tikv/client-go/blob/09ecb550d383c1b048119b586fb5cda658312262/internal/locate/region_cache.go#L1550-L1551.
locs, err := c.LocateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey)
if err != nil {
return nil, derr.ToTiDBErr(err)
}

resCap := len(locs)
if limit != UnspecifiedLimit {
resCap = min(resCap, limit)
}
res := make([]*LocationKeyRanges, 0, resCap)

for ranges.Len() > 0 {
if limit != UnspecifiedLimit && len(res) >= limit {
break
}

if loc.Contains(r.StartKey) {
// Part of r is not in the region. We need to split it.
taskRanges := ranges.Slice(0, i)
taskRanges.last = &kv.KeyRange{
StartKey: r.StartKey,
EndKey: loc.EndKey,
}
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
nextLocIndex := len(res)
if nextLocIndex >= len(locs) {
err = errors.Errorf("Unexpected loc index %d, which should less than %d", nextLocIndex, len(locs))
return nil, err
}

ranges = ranges.Slice(i+1, ranges.Len())
ranges.first = &kv.KeyRange{
StartKey: loc.EndKey,
EndKey: r.EndKey,
}
} else {
// rs[i] is not in the region.
taskRanges := ranges.Slice(0, i)
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
ranges = ranges.Slice(i, ranges.Len())
loc := locs[nextLocIndex]
// For the last loc.
if nextLocIndex == (len(locs) - 1) {
res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges})
break
}
}

isBreak := false
res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res)
if isBreak {
break
}
}
return res, nil
}

Expand All @@ -183,7 +238,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges
//
// TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled.
func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
locs, err := c.SplitKeyRangesByLocationsWithBuckets(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down