Skip to content

Commit 1d24b1c

Browse files
authored
executor: load region info from pd in batch (pingcap#51333)
close pingcap#51326
1 parent 718c7a3 commit 1d24b1c

6 files changed

+188
-40
lines changed

pkg/executor/index_merge_reader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tb
799799
netDataSize: e.dataAvgRowSize * float64(len(handles)),
800800
}
801801
tableReaderExec.buildVirtualColumnInfo()
802-
// Reorder handles because SplitKeyRangesByLocations() requires startKey of kvRanges is ordered.
802+
// Reorder handles because SplitKeyRangesByLocationsWith/WithoutBuckets() requires startKey of kvRanges is ordered.
803803
// Also it's good for performance.
804804
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true)
805805
if err != nil {

pkg/store/copr/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ go_test(
8181
embed = [":copr"],
8282
flaky = True,
8383
race = "on",
84-
shard_count = 28,
84+
shard_count = 29,
8585
deps = [
8686
"//pkg/kv",
8787
"//pkg/store/driver/backoff",

pkg/store/copr/batch_coprocessor.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ func buildBatchCopTasksConsistentHash(
635635

636636
for i, ranges := range rangesForEachPhysicalTable {
637637
rangesLen += ranges.Len()
638-
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
638+
locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit)
639639
if err != nil {
640640
return nil, errors.Trace(err)
641641
}
@@ -904,7 +904,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
904904
rangesLen = 0
905905
for i, ranges := range rangesForEachPhysicalTable {
906906
rangesLen += ranges.Len()
907-
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
907+
locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit)
908908
if err != nil {
909909
return nil, errors.Trace(err)
910910
}
@@ -1430,7 +1430,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
14301430
splitKeyStart := time.Now()
14311431
for i, ranges := range rangesForEachPhysicalTable {
14321432
rangesLen += ranges.Len()
1433-
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
1433+
locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit)
14341434
if err != nil {
14351435
return nil, errors.Trace(err)
14361436
}

pkg/store/copr/coprocessor_test.go

+85-1
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,87 @@ func TestBuildTasksByBuckets(t *testing.T) {
361361
}
362362
}
363363

364-
func TestSplitRegionRanges(t *testing.T) {
364+
func TestSplitKeyRangesByLocationsWithoutBuckets(t *testing.T) {
365+
// nil --- 'g' --- 'n' --- 't' --- nil
366+
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
367+
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
368+
require.NoError(t, err)
369+
defer func() {
370+
pdClient.Close()
371+
err = mockClient.Close()
372+
require.NoError(t, err)
373+
}()
374+
375+
testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
376+
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
377+
defer pdCli.Close()
378+
379+
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
380+
defer cache.Close()
381+
382+
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)
383+
384+
locRanges, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit)
385+
require.NoError(t, err)
386+
require.Len(t, locRanges, 1)
387+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "c")
388+
389+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0)
390+
require.NoError(t, err)
391+
require.Len(t, locRanges, 0)
392+
393+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit)
394+
require.NoError(t, err)
395+
require.Len(t, locRanges, 3)
396+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n")
397+
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "n", "t")
398+
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "t", "y")
399+
400+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "n")), UnspecifiedLimit)
401+
require.NoError(t, err)
402+
require.Len(t, locRanges, 1)
403+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n")
404+
405+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit)
406+
require.NoError(t, err)
407+
require.Len(t, locRanges, 1)
408+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "s", "s")
409+
410+
// min --> max
411+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit)
412+
require.NoError(t, err)
413+
require.Len(t, locRanges, 4)
414+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g")
415+
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n")
416+
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t")
417+
rangeEqual(t, locRanges[3].Ranges.ToRanges(), "t", "z")
418+
419+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3)
420+
require.NoError(t, err)
421+
require.Len(t, locRanges, 3)
422+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g")
423+
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n")
424+
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t")
425+
426+
// many range
427+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit)
428+
require.NoError(t, err)
429+
require.Len(t, locRanges, 4)
430+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g")
431+
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n")
432+
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t")
433+
rangeEqual(t, locRanges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z")
434+
435+
locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit)
436+
require.NoError(t, err)
437+
require.Len(t, locRanges, 4)
438+
rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "b", "g")
439+
rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "h", "m", "n")
440+
rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t")
441+
rangeEqual(t, locRanges[3].Ranges.ToRanges(), "v", "w")
442+
}
443+
444+
func TestSplitKeyRanges(t *testing.T) {
365445
// nil --- 'g' --- 'n' --- 't' --- nil
366446
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
367447
mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
@@ -386,6 +466,10 @@ func TestSplitRegionRanges(t *testing.T) {
386466
require.Len(t, ranges, 1)
387467
rangeEqual(t, ranges, "a", "c")
388468

469+
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), 0)
470+
require.NoError(t, err)
471+
require.Len(t, ranges, 0)
472+
389473
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit)
390474
require.NoError(t, err)
391475
require.Len(t, ranges, 3)

pkg/store/copr/key_ranges.go

+9
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ func (r *KeyRanges) Split(key []byte) (*KeyRanges, *KeyRanges) {
137137
return r.Slice(0, n), r.Slice(n, r.Len())
138138
}
139139

140+
// ToRanges converts ranges to []kv.KeyRange.
141+
func (r *KeyRanges) ToRanges() []kv.KeyRange {
142+
ranges := make([]kv.KeyRange, 0, r.Len())
143+
r.Do(func(ran *kv.KeyRange) {
144+
ranges = append(ranges, *ran)
145+
})
146+
return ranges
147+
}
148+
140149
// ToPBRanges converts ranges to wire type.
141150
func (r *KeyRanges) ToPBRanges() []*coprocessor.KeyRange {
142151
ranges := make([]*coprocessor.KeyRange, 0, r.Len())

pkg/store/copr/region_cache.go

+89-34
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strconv"
2121
"time"
2222

23+
"github.com/pingcap/errors"
2324
"github.com/pingcap/kvproto/pkg/coprocessor"
2425
"github.com/pingcap/kvproto/pkg/metapb"
2526
"github.com/pingcap/log"
@@ -46,7 +47,7 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache {
4647
func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) {
4748
ranges := NewKeyRanges(keyRanges)
4849

49-
locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit)
50+
locations, err := c.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, limit)
5051
if err != nil {
5152
return nil, derr.ToTiDBErr(err)
5253
}
@@ -123,11 +124,49 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges {
123124
return res
124125
}
125126

127+
func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *KeyRanges, res []*LocationKeyRanges) ([]*LocationKeyRanges, *KeyRanges, bool) {
128+
// Iterate to the first range that is not complete in the region.
129+
var r kv.KeyRange
130+
var i int
131+
for ; i < ranges.Len(); i++ {
132+
r = ranges.At(i)
133+
if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) {
134+
break
135+
}
136+
}
137+
// All rest ranges belong to the same region.
138+
if i == ranges.Len() {
139+
res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges})
140+
return res, ranges, true
141+
}
142+
if loc.Contains(r.StartKey) {
143+
// Part of r is not in the region. We need to split it.
144+
taskRanges := ranges.Slice(0, i)
145+
taskRanges.last = &kv.KeyRange{
146+
StartKey: r.StartKey,
147+
EndKey: loc.EndKey,
148+
}
149+
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
150+
ranges = ranges.Slice(i+1, ranges.Len())
151+
ranges.first = &kv.KeyRange{
152+
StartKey: loc.EndKey,
153+
EndKey: r.EndKey,
154+
}
155+
} else {
156+
// rs[i] is not in the region.
157+
taskRanges := ranges.Slice(0, i)
158+
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
159+
ranges = ranges.Slice(i, ranges.Len())
160+
}
161+
return res, ranges, false
162+
}
163+
126164
// UnspecifiedLimit means no limit.
127165
const UnspecifiedLimit = -1
128166

129-
// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache.
130-
func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
167+
// SplitKeyRangesByLocationsWithBuckets splits the KeyRanges by logical info in the cache.
168+
// The buckets in the returned LocationKeyRanges are not empty if the region is split by bucket.
169+
func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
131170
res := make([]*LocationKeyRanges, 0)
132171
for ranges.Len() > 0 {
133172
if limit != UnspecifiedLimit && len(res) >= limit {
@@ -138,43 +177,59 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges
138177
return res, derr.ToTiDBErr(err)
139178
}
140179

141-
// Iterate to the first range that is not complete in the region.
142-
var r kv.KeyRange
143-
var i int
144-
for ; i < ranges.Len(); i++ {
145-
r = ranges.At(i)
146-
if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) {
147-
break
148-
}
180+
isBreak := false
181+
res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res)
182+
if isBreak {
183+
break
149184
}
150-
// All rest ranges belong to the same region.
151-
if i == ranges.Len() {
152-
res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges})
185+
}
186+
187+
return res, nil
188+
}
189+
190+
// SplitKeyRangesByLocationsWithoutBuckets splits the KeyRanges by logical info in the cache.
191+
// The buckets in the returned LocationKeyRanges are empty, regardless of whether the region is split by bucket.
192+
func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
193+
if limit == 0 || ranges.Len() <= 0 {
194+
return nil, nil
195+
}
196+
// Currently, LocationKeyRanges returned by `LocateKeyRange` doesn't contains buckets,
197+
// because of https://github.com/tikv/client-go/blob/09ecb550d383c1b048119b586fb5cda658312262/internal/locate/region_cache.go#L1550-L1551.
198+
locs, err := c.LocateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey)
199+
if err != nil {
200+
return nil, derr.ToTiDBErr(err)
201+
}
202+
203+
resCap := len(locs)
204+
if limit != UnspecifiedLimit {
205+
resCap = min(resCap, limit)
206+
}
207+
res := make([]*LocationKeyRanges, 0, resCap)
208+
209+
for ranges.Len() > 0 {
210+
if limit != UnspecifiedLimit && len(res) >= limit {
153211
break
154212
}
155213

156-
if loc.Contains(r.StartKey) {
157-
// Part of r is not in the region. We need to split it.
158-
taskRanges := ranges.Slice(0, i)
159-
taskRanges.last = &kv.KeyRange{
160-
StartKey: r.StartKey,
161-
EndKey: loc.EndKey,
162-
}
163-
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
214+
nextLocIndex := len(res)
215+
if nextLocIndex >= len(locs) {
216+
err = errors.Errorf("Unexpected loc index %d, which should less than %d", nextLocIndex, len(locs))
217+
return nil, err
218+
}
164219

165-
ranges = ranges.Slice(i+1, ranges.Len())
166-
ranges.first = &kv.KeyRange{
167-
StartKey: loc.EndKey,
168-
EndKey: r.EndKey,
169-
}
170-
} else {
171-
// rs[i] is not in the region.
172-
taskRanges := ranges.Slice(0, i)
173-
res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges})
174-
ranges = ranges.Slice(i, ranges.Len())
220+
loc := locs[nextLocIndex]
221+
// For the last loc.
222+
if nextLocIndex == (len(locs) - 1) {
223+
res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges})
224+
break
175225
}
176-
}
177226

227+
isBreak := false
228+
res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res)
229+
if isBreak {
230+
break
231+
}
232+
}
178233
return res, nil
179234
}
180235

@@ -183,7 +238,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges
183238
//
184239
// TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled.
185240
func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
186-
locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
241+
locs, err := c.SplitKeyRangesByLocationsWithBuckets(bo, ranges, UnspecifiedLimit)
187242
if err != nil {
188243
return nil, derr.ToTiDBErr(err)
189244
}

0 commit comments

Comments
 (0)