diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 4fbfb6c0092da..cb9f7c8408331 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -797,7 +797,7 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tb netDataSize: e.dataAvgRowSize * float64(len(handles)), } tableReaderExec.buildVirtualColumnInfo() - // Reorder handles because SplitKeyRangesByLocations() requires startKey of kvRanges is ordered. + // Reorder handles because SplitKeyRangesByLocationsWith/WithoutBuckets() requires startKey of kvRanges is ordered. // Also it's good for performance. tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true) if err != nil { diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 673cbffce8ade..35a3ea4c8adab 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -81,7 +81,7 @@ go_test( embed = [":copr"], flaky = True, race = "on", - shard_count = 28, + shard_count = 29, deps = [ "//pkg/kv", "//pkg/store/driver/backoff", diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 6246aa87b6eb9..752be5f16d9e4 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -635,7 +635,7 @@ func buildBatchCopTasksConsistentHash( for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } @@ -904,7 +904,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } @@ -1430,7 +1430,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, splitKeyStart := time.Now() for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index 219a457f88420..fe061b5698660 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -361,7 +361,87 @@ func TestBuildTasksByBuckets(t *testing.T) { } } -func TestSplitRegionRanges(t *testing.T) { +func TestSplitKeyRangesByLocationsWithoutBuckets(t *testing.T) { + // nil --- 'g' --- 'n' --- 't' --- nil + // <- 0 -> <- 1 -> <- 2 -> <- 3 -> + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + + testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + + locRanges, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 1) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "c") + + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0) + require.NoError(t, err) + require.Len(t, locRanges, 0) + + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 3) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "n", "t") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "t", "y") + + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "n")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 1) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n") + + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 1) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "s", "s") + + // min --> max + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 4) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") + rangeEqual(t, locRanges[3].Ranges.ToRanges(), "t", "z") + + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3) + require.NoError(t, err) + require.Len(t, locRanges, 3) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") + + // many range + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 4) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t") + rangeEqual(t, locRanges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z") + + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 4) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "b", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "h", "m", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") + rangeEqual(t, locRanges[3].Ranges.ToRanges(), "v", "w") +} + +func TestSplitKeyRanges(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) @@ -386,6 +466,10 @@ func TestSplitRegionRanges(t *testing.T) { require.Len(t, ranges, 1) rangeEqual(t, ranges, "a", "c") + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), 0) + require.NoError(t, err) + require.Len(t, ranges, 0) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 3) diff --git a/pkg/store/copr/key_ranges.go b/pkg/store/copr/key_ranges.go index 7324dde426958..ba2f57f3f0333 100644 --- a/pkg/store/copr/key_ranges.go +++ b/pkg/store/copr/key_ranges.go @@ -137,6 +137,15 @@ func (r *KeyRanges) Split(key []byte) (*KeyRanges, *KeyRanges) { return r.Slice(0, n), r.Slice(n, r.Len()) } +// ToRanges converts ranges to []kv.KeyRange. +func (r *KeyRanges) ToRanges() []kv.KeyRange { + ranges := make([]kv.KeyRange, 0, r.Len()) + r.Do(func(ran *kv.KeyRange) { + ranges = append(ranges, *ran) + }) + return ranges +} + // ToPBRanges converts ranges to wire type. func (r *KeyRanges) ToPBRanges() []*coprocessor.KeyRange { ranges := make([]*coprocessor.KeyRange, 0, r.Len()) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index cf962d9abcd96..7883dc13c0166 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -20,6 +20,7 @@ import ( "strconv" "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -46,7 +47,7 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache { func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) { ranges := NewKeyRanges(keyRanges) - locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit) + locations, err := c.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, limit) if err != nil { return nil, derr.ToTiDBErr(err) } @@ -123,11 +124,49 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { return res } +func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *KeyRanges, res []*LocationKeyRanges) ([]*LocationKeyRanges, *KeyRanges, bool) { + // Iterate to the first range that is not complete in the region. + var r kv.KeyRange + var i int + for ; i < ranges.Len(); i++ { + r = ranges.At(i) + if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { + break + } + } + // All rest ranges belong to the same region. + if i == ranges.Len() { + res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + return res, ranges, true + } + if loc.Contains(r.StartKey) { + // Part of r is not in the region. We need to split it. + taskRanges := ranges.Slice(0, i) + taskRanges.last = &kv.KeyRange{ + StartKey: r.StartKey, + EndKey: loc.EndKey, + } + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + ranges = ranges.Slice(i+1, ranges.Len()) + ranges.first = &kv.KeyRange{ + StartKey: loc.EndKey, + EndKey: r.EndKey, + } + } else { + // rs[i] is not in the region. + taskRanges := ranges.Slice(0, i) + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + ranges = ranges.Slice(i, ranges.Len()) + } + return res, ranges, false +} + // UnspecifiedLimit means no limit. const UnspecifiedLimit = -1 -// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { +// SplitKeyRangesByLocationsWithBuckets splits the KeyRanges by logical info in the cache. +// The buckets in the returned LocationKeyRanges are not empty if the region is split by bucket. +func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { res := make([]*LocationKeyRanges, 0) for ranges.Len() > 0 { if limit != UnspecifiedLimit && len(res) >= limit { @@ -138,43 +177,59 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges return res, derr.ToTiDBErr(err) } - // Iterate to the first range that is not complete in the region. - var r kv.KeyRange - var i int - for ; i < ranges.Len(); i++ { - r = ranges.At(i) - if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { - break - } + isBreak := false + res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) + if isBreak { + break } - // All rest ranges belong to the same region. - if i == ranges.Len() { - res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + } + + return res, nil +} + +// SplitKeyRangesByLocationsWithoutBuckets splits the KeyRanges by logical info in the cache. +// The buckets in the returned LocationKeyRanges are empty, regardless of whether the region is split by bucket. +func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { + if limit == 0 || ranges.Len() <= 0 { + return nil, nil + } + // Currently, LocationKeyRanges returned by `LocateKeyRange` doesn't contains buckets, + // because of https://github.com/tikv/client-go/blob/09ecb550d383c1b048119b586fb5cda658312262/internal/locate/region_cache.go#L1550-L1551. + locs, err := c.LocateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + + resCap := len(locs) + if limit != UnspecifiedLimit { + resCap = min(resCap, limit) + } + res := make([]*LocationKeyRanges, 0, resCap) + + for ranges.Len() > 0 { + if limit != UnspecifiedLimit && len(res) >= limit { break } - if loc.Contains(r.StartKey) { - // Part of r is not in the region. We need to split it. - taskRanges := ranges.Slice(0, i) - taskRanges.last = &kv.KeyRange{ - StartKey: r.StartKey, - EndKey: loc.EndKey, - } - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + nextLocIndex := len(res) + if nextLocIndex >= len(locs) { + err = errors.Errorf("Unexpected loc index %d, which should less than %d", nextLocIndex, len(locs)) + return nil, err + } - ranges = ranges.Slice(i+1, ranges.Len()) - ranges.first = &kv.KeyRange{ - StartKey: loc.EndKey, - EndKey: r.EndKey, - } - } else { - // rs[i] is not in the region. - taskRanges := ranges.Slice(0, i) - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - ranges = ranges.Slice(i, ranges.Len()) + loc := locs[nextLocIndex] + // For the last loc. + if nextLocIndex == (len(locs) - 1) { + res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + break } - } + isBreak := false + res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) + if isBreak { + break + } + } return res, nil } @@ -183,7 +238,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges // // TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled. func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { - locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locs, err := c.SplitKeyRangesByLocationsWithBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, derr.ToTiDBErr(err) }