From 5b463b1cf3bf32372c13c987af897a42cd6cabff Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Feb 2024 11:01:20 +0800 Subject: [PATCH 01/19] init --- pkg/store/copr/batch_coprocessor.go | 2 +- pkg/store/copr/region_cache.go | 81 +++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 6246aa87b6eb9..bf749512815ec 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -904,7 +904,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsNew(bo, ranges) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index cf962d9abcd96..4fd8002851713 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -178,6 +178,87 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges return res, nil } +// todo move to client.go +func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([]*tikv.KeyLocation, error) { + regions, err := c.LoadRegionsInKeyRange(bo.TiKVBackoffer(), startKey, endKey) + if err != nil { + return nil, err + } + res := make([]*tikv.KeyLocation, 0, len(regions)) + for _, r := range regions { + res = append(res, &tikv.KeyLocation{ + Region: r.VerID(), + StartKey: r.StartKey(), + EndKey: r.EndKey(), + Buckets: nil, + }) + } + return res, nil +} + +// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. +func (c *RegionCache) SplitKeyRangesByLocationsNew(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { + locs, err := c.locateKeyRange(bo, ranges.first.StartKey, ranges.last.EndKey) + if err != nil { + return nil, derr.ToTiDBErr(err) + } + + res := make([]*LocationKeyRanges, 0, len(locs)) + // All ranges belong to the same region. + if len(locs) == 1 { + res = append(res, &LocationKeyRanges{Location: locs[0], Ranges: ranges}) + return res, nil + } + + curLocIndex := 0 + for ranges.Len() > 0 { + loc := locs[curLocIndex] + if curLocIndex == (len(locs) - 1) { + res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + break + } + + // Iterate to the first range that is not complete in the region. + var r kv.KeyRange + var i int + for ; i < ranges.Len(); i++ { + r = ranges.At(i) + if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { + break + } + } + // All rest ranges belong to the same region. + if i == ranges.Len() { + res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + break + } + + if loc.Contains(r.StartKey) { + // Part of r is not in the region. We need to split it. + taskRanges := ranges.Slice(0, i) + taskRanges.last = &kv.KeyRange{ + StartKey: r.StartKey, + EndKey: loc.EndKey, + } + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + curLocIndex++ + + ranges = ranges.Slice(i+1, ranges.Len()) + ranges.first = &kv.KeyRange{ + StartKey: loc.EndKey, + EndKey: r.EndKey, + } + } else { + // rs[i] is not in the region. + taskRanges := ranges.Slice(0, i) + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + curLocIndex++ + ranges = ranges.Slice(i, ranges.Len()) + } + } + return res, nil +} + // SplitKeyRangesByBuckets splits the KeyRanges by buckets information in the cache. If regions don't have buckets, // it's equal to SplitKeyRangesByLocations. // From 4f7490e5049ba7874cc4594d72783e55413a397a Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Feb 2024 11:13:31 +0800 Subject: [PATCH 02/19] update --- pkg/store/copr/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 4fd8002851713..ceee8b7a1f9f2 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -196,7 +196,7 @@ func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([] return res, nil } -// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. +// SplitKeyRangesByLocationsNew splits the KeyRanges by logical info in the cache. func (c *RegionCache) SplitKeyRangesByLocationsNew(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { locs, err := c.locateKeyRange(bo, ranges.first.StartKey, ranges.last.EndKey) if err != nil { From 0bc4a99326db44b7c71082c1d43db10ab286052f Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 27 Feb 2024 15:08:13 +0800 Subject: [PATCH 03/19] fix crash --- pkg/store/copr/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index ceee8b7a1f9f2..f6e45b0d30c11 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -198,7 +198,7 @@ func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([] // SplitKeyRangesByLocationsNew splits the KeyRanges by logical info in the cache. func (c *RegionCache) SplitKeyRangesByLocationsNew(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { - locs, err := c.locateKeyRange(bo, ranges.first.StartKey, ranges.last.EndKey) + locs, err := c.locateKeyRange(bo, ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) if err != nil { return nil, derr.ToTiDBErr(err) } From 201253b4250e9bbf5ddcc7a8f9dbc3f9d04f9fd0 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 12:05:33 +0800 Subject: [PATCH 04/19] update --- pkg/store/copr/batch_coprocessor.go | 2 +- pkg/store/copr/region_cache.go | 80 ++++++++--------------------- 2 files changed, 21 insertions(+), 61 deletions(-) diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index bf749512815ec..6246aa87b6eb9 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -904,7 +904,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocationsNew(bo, ranges) + locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index f6e45b0d30c11..9771ad1e01ff8 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -20,6 +20,7 @@ import ( "strconv" "time" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -126,58 +127,6 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { // UnspecifiedLimit means no limit. const UnspecifiedLimit = -1 -// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { - res := make([]*LocationKeyRanges, 0) - for ranges.Len() > 0 { - if limit != UnspecifiedLimit && len(res) >= limit { - break - } - loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey) - if err != nil { - return res, derr.ToTiDBErr(err) - } - - // Iterate to the first range that is not complete in the region. - var r kv.KeyRange - var i int - for ; i < ranges.Len(); i++ { - r = ranges.At(i) - if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { - break - } - } - // All rest ranges belong to the same region. - if i == ranges.Len() { - res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) - break - } - - if loc.Contains(r.StartKey) { - // Part of r is not in the region. We need to split it. - taskRanges := ranges.Slice(0, i) - taskRanges.last = &kv.KeyRange{ - StartKey: r.StartKey, - EndKey: loc.EndKey, - } - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - - ranges = ranges.Slice(i+1, ranges.Len()) - ranges.first = &kv.KeyRange{ - StartKey: loc.EndKey, - EndKey: r.EndKey, - } - } else { - // rs[i] is not in the region. - taskRanges := ranges.Slice(0, i) - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - ranges = ranges.Slice(i, ranges.Len()) - } - } - - return res, nil -} - // todo move to client.go func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([]*tikv.KeyLocation, error) { regions, err := c.LoadRegionsInKeyRange(bo.TiKVBackoffer(), startKey, endKey) @@ -196,24 +145,37 @@ func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([] return res, nil } -// SplitKeyRangesByLocationsNew splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocationsNew(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { +// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. +func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { + if limit == 0 { + return nil, nil + } locs, err := c.locateKeyRange(bo, ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) if err != nil { return nil, derr.ToTiDBErr(err) } - res := make([]*LocationKeyRanges, 0, len(locs)) + res := make([]*LocationKeyRanges, 0, min(len(locs), limit)) // All ranges belong to the same region. if len(locs) == 1 { res = append(res, &LocationKeyRanges{Location: locs[0], Ranges: ranges}) return res, nil } - curLocIndex := 0 for ranges.Len() > 0 { - loc := locs[curLocIndex] - if curLocIndex == (len(locs) - 1) { + if limit != UnspecifiedLimit && len(res) >= limit { + break + } + + nextLocIndex := len(res) + if nextLocIndex >= len(locs) { + err = errors.Errorf("Unexpected loc index %d, which should less than %d", nextLocIndex, len(locs)) + return nil, err + } + + loc := locs[nextLocIndex] + // For the last loc. + if nextLocIndex == (len(locs) - 1) { res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) break } @@ -241,7 +203,6 @@ func (c *RegionCache) SplitKeyRangesByLocationsNew(bo *Backoffer, ranges *KeyRan EndKey: loc.EndKey, } res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - curLocIndex++ ranges = ranges.Slice(i+1, ranges.Len()) ranges.first = &kv.KeyRange{ @@ -252,7 +213,6 @@ func (c *RegionCache) SplitKeyRangesByLocationsNew(bo *Backoffer, ranges *KeyRan // rs[i] is not in the region. taskRanges := ranges.Slice(0, i) res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - curLocIndex++ ranges = ranges.Slice(i, ranges.Len()) } } From 5c8fb970fc1b24eb8cb633268b6c035b1c4beffc Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 14:27:12 +0800 Subject: [PATCH 05/19] fix --- pkg/store/copr/region_cache.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 9771ad1e01ff8..42dc03703070e 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -155,7 +155,12 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges return nil, derr.ToTiDBErr(err) } - res := make([]*LocationKeyRanges, 0, min(len(locs), limit)) + cap := len(locs) + if limit != UnspecifiedLimit { + cap = min(cap, limit) + } + res := make([]*LocationKeyRanges, 0, cap) + // All ranges belong to the same region. if len(locs) == 1 { res = append(res, &LocationKeyRanges{Location: locs[0], Ranges: ranges}) From 7d596ff202add43e4b25035e97fbafbf2f35a510 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 15:16:17 +0800 Subject: [PATCH 06/19] add ut --- pkg/store/copr/coprocessor_test.go | 81 +++++++++++++++++++++++++++++- pkg/store/copr/key_ranges.go | 9 ++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index 219a457f88420..062dbbca594c0 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -361,7 +361,82 @@ func TestBuildTasksByBuckets(t *testing.T) { } } -func TestSplitRegionRanges(t *testing.T) { +func TestSplitKeyRangesByLocations(t *testing.T) { + // nil --- 'g' --- 'n' --- 't' --- nil + // <- 0 -> <- 1 -> <- 2 -> <- 3 -> + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + + testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + + loc_ranges, err := cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, loc_ranges, 1) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "c") + + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0) + require.NoError(t, err) + require.Len(t, loc_ranges, 0) + + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, loc_ranges, 3) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "h", "n") + rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "n", "t") + rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "t", "y") + + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, loc_ranges, 1) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "s", "s") + + // min --> max + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, loc_ranges, 4) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "g") + rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "n") + rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") + rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "t", "z") + + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3) + require.NoError(t, err) + require.Len(t, loc_ranges, 3) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "g") + rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "n") + rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") + + // many range + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "s", "y", "z")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, loc_ranges, 4) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g") + rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n") + rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t") + rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "u", "v", "w", "s", "y", "z") + + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, loc_ranges, 4) + rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "b", "b", "g") + rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "h", "h", "m", "n") + rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") + rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "v", "w") +} + +func TestSplitKeyRanges(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) @@ -386,6 +461,10 @@ func TestSplitRegionRanges(t *testing.T) { require.Len(t, ranges, 1) rangeEqual(t, ranges, "a", "c") + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), 0) + require.NoError(t, err) + require.Len(t, ranges, 0) + ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit) require.NoError(t, err) require.Len(t, ranges, 3) diff --git a/pkg/store/copr/key_ranges.go b/pkg/store/copr/key_ranges.go index 7324dde426958..7f98fa3f902e4 100644 --- a/pkg/store/copr/key_ranges.go +++ b/pkg/store/copr/key_ranges.go @@ -137,6 +137,15 @@ func (r *KeyRanges) Split(key []byte) (*KeyRanges, *KeyRanges) { return r.Slice(0, n), r.Slice(n, r.Len()) } +// ToPBRanges converts ranges to []kv.KeyRange. +func (r *KeyRanges) ToRanges() []kv.KeyRange { + ranges := make([]kv.KeyRange, 0, r.Len()) + r.Do(func(ran *kv.KeyRange) { + ranges = append(ranges, *ran) + }) + return ranges +} + // ToPBRanges converts ranges to wire type. func (r *KeyRanges) ToPBRanges() []*coprocessor.KeyRange { ranges := make([]*coprocessor.KeyRange, 0, r.Len()) From 538a5e8baa6053847a8fd505577e5e07c1d48231 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 15:20:45 +0800 Subject: [PATCH 07/19] fix ut --- pkg/store/copr/coprocessor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index 062dbbca594c0..0e56a10bc5d00 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -419,13 +419,13 @@ func TestSplitKeyRangesByLocations(t *testing.T) { rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") // many range - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "s", "y", "z")), UnspecifiedLimit) + loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, loc_ranges, 4) rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g") rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n") rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t") - rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "u", "v", "w", "s", "y", "z") + rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z") loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) require.NoError(t, err) From 4a8a2801e8bfbf3446ad1bb8c2553307d31b0e51 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 15:48:38 +0800 Subject: [PATCH 08/19] fix lint --- pkg/store/copr/region_cache.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 42dc03703070e..a48ce7f4728f3 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -155,11 +155,11 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges return nil, derr.ToTiDBErr(err) } - cap := len(locs) + resCap := len(locs) if limit != UnspecifiedLimit { - cap = min(cap, limit) + resCap = min(resCap, limit) } - res := make([]*LocationKeyRanges, 0, cap) + res := make([]*LocationKeyRanges, 0, resCap) // All ranges belong to the same region. if len(locs) == 1 { From 0a3f718a0c428984edb6a0b6439f2c83a489b544 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 15:54:11 +0800 Subject: [PATCH 09/19] Update pkg/store/copr/key_ranges.go --- pkg/store/copr/key_ranges.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/copr/key_ranges.go b/pkg/store/copr/key_ranges.go index 7f98fa3f902e4..ba2f57f3f0333 100644 --- a/pkg/store/copr/key_ranges.go +++ b/pkg/store/copr/key_ranges.go @@ -137,7 +137,7 @@ func (r *KeyRanges) Split(key []byte) (*KeyRanges, *KeyRanges) { return r.Slice(0, n), r.Slice(n, r.Len()) } -// ToPBRanges converts ranges to []kv.KeyRange. +// ToRanges converts ranges to []kv.KeyRange. func (r *KeyRanges) ToRanges() []kv.KeyRange { ranges := make([]kv.KeyRange, 0, r.Len()) r.Do(func(ran *kv.KeyRange) { From 4096ba6307eb6346838907d9b2ae9c1878eac447 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 16:03:25 +0800 Subject: [PATCH 10/19] lint --- pkg/store/copr/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 673cbffce8ade..35a3ea4c8adab 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -81,7 +81,7 @@ go_test( embed = [":copr"], flaky = True, race = "on", - shard_count = 28, + shard_count = 29, deps = [ "//pkg/kv", "//pkg/store/driver/backoff", From 638e450279e3e25d2a04183ad2515b1daa875c83 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 16:10:28 +0800 Subject: [PATCH 11/19] fix --- pkg/store/copr/coprocessor_test.go | 72 +++++++++++++++--------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index 0e56a10bc5d00..022a2d983c523 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -381,59 +381,59 @@ func TestSplitKeyRangesByLocations(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) - loc_ranges, err := cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit) + locRanges, err := cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit) require.NoError(t, err) - require.Len(t, loc_ranges, 1) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "c") + require.Len(t, locRanges, 1) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "c") - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0) require.NoError(t, err) - require.Len(t, loc_ranges, 0) + require.Len(t, locRanges, 0) - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit) require.NoError(t, err) - require.Len(t, loc_ranges, 3) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "h", "n") - rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "n", "t") - rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "t", "y") + require.Len(t, locRanges, 3) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "n", "t") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "t", "y") - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) require.NoError(t, err) - require.Len(t, loc_ranges, 1) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "s", "s") + require.Len(t, locRanges, 1) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "s", "s") // min --> max - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit) require.NoError(t, err) - require.Len(t, loc_ranges, 4) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "g") - rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "n") - rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") - rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "t", "z") + require.Len(t, locRanges, 4) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") + rangeEqual(t, locRanges[3].Ranges.ToRanges(), "t", "z") - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3) require.NoError(t, err) - require.Len(t, loc_ranges, 3) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "g") - rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "n") - rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") + require.Len(t, locRanges, 3) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") // many range - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit) require.NoError(t, err) - require.Len(t, loc_ranges, 4) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g") - rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n") - rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t") - rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z") + require.Len(t, locRanges, 4) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "i", "j", "k", "l", "m", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t") + rangeEqual(t, locRanges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z") - loc_ranges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) require.NoError(t, err) - require.Len(t, loc_ranges, 4) - rangeEqual(t, loc_ranges[0].Ranges.ToRanges(), "a", "b", "b", "g") - rangeEqual(t, loc_ranges[1].Ranges.ToRanges(), "g", "h", "h", "m", "n") - rangeEqual(t, loc_ranges[2].Ranges.ToRanges(), "n", "t") - rangeEqual(t, loc_ranges[3].Ranges.ToRanges(), "v", "w") + require.Len(t, locRanges, 4) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "b", "g") + rangeEqual(t, locRanges[1].Ranges.ToRanges(), "g", "h", "h", "m", "n") + rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") + rangeEqual(t, locRanges[3].Ranges.ToRanges(), "v", "w") } func TestSplitKeyRanges(t *testing.T) { From be1739a547fd0c37086c86aecd0c9159acea8a44 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 16:28:40 +0800 Subject: [PATCH 12/19] add a case --- pkg/store/copr/coprocessor_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index 022a2d983c523..1bcf99d564576 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -397,6 +397,11 @@ func TestSplitKeyRangesByLocations(t *testing.T) { rangeEqual(t, locRanges[1].Ranges.ToRanges(), "n", "t") rangeEqual(t, locRanges[2].Ranges.ToRanges(), "t", "y") + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("h", "n")), UnspecifiedLimit) + require.NoError(t, err) + require.Len(t, locRanges, 1) + rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n") + locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 1) From 70299b7dd66eb0f97765f43f2e0bdb48d70e8ff1 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 28 Feb 2024 16:56:02 +0800 Subject: [PATCH 13/19] refine --- pkg/store/copr/region_cache.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index a48ce7f4728f3..eefcf332b37d3 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -147,7 +147,7 @@ func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([] // SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { - if limit == 0 { + if limit == 0 || ranges.Len() <= 0 { return nil, nil } locs, err := c.locateKeyRange(bo, ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) @@ -161,12 +161,6 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges } res := make([]*LocationKeyRanges, 0, resCap) - // All ranges belong to the same region. - if len(locs) == 1 { - res = append(res, &LocationKeyRanges{Location: locs[0], Ranges: ranges}) - return res, nil - } - for ranges.Len() > 0 { if limit != UnspecifiedLimit && len(res) >= limit { break From 4c3948a4a09a50486f4c0236f304dde54d8614e2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 29 Feb 2024 13:22:19 +0800 Subject: [PATCH 14/19] todo update client-go version from https://github.com/tikv/client-go/pull/1189 --- pkg/store/copr/region_cache.go | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index eefcf332b37d3..19d3c3c3ba02d 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -127,30 +127,12 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { // UnspecifiedLimit means no limit. const UnspecifiedLimit = -1 -// todo move to client.go -func (c *RegionCache) locateKeyRange(bo *Backoffer, startKey, endKey []byte) ([]*tikv.KeyLocation, error) { - regions, err := c.LoadRegionsInKeyRange(bo.TiKVBackoffer(), startKey, endKey) - if err != nil { - return nil, err - } - res := make([]*tikv.KeyLocation, 0, len(regions)) - for _, r := range regions { - res = append(res, &tikv.KeyLocation{ - Region: r.VerID(), - StartKey: r.StartKey(), - EndKey: r.EndKey(), - Buckets: nil, - }) - } - return res, nil -} - // SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { if limit == 0 || ranges.Len() <= 0 { return nil, nil } - locs, err := c.locateKeyRange(bo, ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) + locs, err := c.locateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) if err != nil { return nil, derr.ToTiDBErr(err) } From 4d862051ec7fe2196b16fde19721e923dd935533 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 29 Feb 2024 15:46:15 +0800 Subject: [PATCH 15/19] merge master --- pkg/store/copr/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 19d3c3c3ba02d..75c8ca471504a 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -132,7 +132,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges if limit == 0 || ranges.Len() <= 0 { return nil, nil } - locs, err := c.locateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) + locs, err := c.LocateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) if err != nil { return nil, derr.ToTiDBErr(err) } From 881a9aa31deb2a4d601545077598a24c27546b57 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 29 Feb 2024 16:15:13 +0800 Subject: [PATCH 16/19] update --- pkg/executor/index_merge_reader.go | 2 +- pkg/store/copr/batch_coprocessor.go | 6 +-- pkg/store/copr/coprocessor_test.go | 20 ++++----- pkg/store/copr/region_cache.go | 64 +++++++++++++++++++++++++++-- 4 files changed, 74 insertions(+), 18 deletions(-) diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 4fbfb6c0092da..cb9f7c8408331 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -797,7 +797,7 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tb netDataSize: e.dataAvgRowSize * float64(len(handles)), } tableReaderExec.buildVirtualColumnInfo() - // Reorder handles because SplitKeyRangesByLocations() requires startKey of kvRanges is ordered. + // Reorder handles because SplitKeyRangesByLocationsWith/WithoutBuckets() requires startKey of kvRanges is ordered. // Also it's good for performance. tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true) if err != nil { diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 6246aa87b6eb9..752be5f16d9e4 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -635,7 +635,7 @@ func buildBatchCopTasksConsistentHash( for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } @@ -904,7 +904,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach rangesLen = 0 for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } @@ -1430,7 +1430,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, splitKeyStart := time.Now() for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locations, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/copr/coprocessor_test.go b/pkg/store/copr/coprocessor_test.go index 1bcf99d564576..fe061b5698660 100644 --- a/pkg/store/copr/coprocessor_test.go +++ b/pkg/store/copr/coprocessor_test.go @@ -361,7 +361,7 @@ func TestBuildTasksByBuckets(t *testing.T) { } } -func TestSplitKeyRangesByLocations(t *testing.T) { +func TestSplitKeyRangesByLocationsWithoutBuckets(t *testing.T) { // nil --- 'g' --- 'n' --- 't' --- nil // <- 0 -> <- 1 -> <- 2 -> <- 3 -> mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) @@ -381,34 +381,34 @@ func TestSplitKeyRangesByLocations(t *testing.T) { bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) - locRanges, err := cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit) + locRanges, err := cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 1) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "c") - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "c")), 0) require.NoError(t, err) require.Len(t, locRanges, 0) - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "y")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 3) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n") rangeEqual(t, locRanges[1].Ranges.ToRanges(), "n", "t") rangeEqual(t, locRanges[2].Ranges.ToRanges(), "t", "y") - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("h", "n")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("h", "n")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 1) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "h", "n") - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("s", "s")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 1) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "s", "s") // min --> max - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 4) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g") @@ -416,7 +416,7 @@ func TestSplitKeyRangesByLocations(t *testing.T) { rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") rangeEqual(t, locRanges[3].Ranges.ToRanges(), "t", "z") - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "z")), 3) require.NoError(t, err) require.Len(t, locRanges, 3) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "g") @@ -424,7 +424,7 @@ func TestSplitKeyRangesByLocations(t *testing.T) { rangeEqual(t, locRanges[2].Ranges.ToRanges(), "n", "t") // many range - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 4) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "c", "d", "e", "f", "f", "g") @@ -432,7 +432,7 @@ func TestSplitKeyRangesByLocations(t *testing.T) { rangeEqual(t, locRanges[2].Ranges.ToRanges(), "o", "p", "q", "r", "s", "t") rangeEqual(t, locRanges[3].Ranges.ToRanges(), "u", "v", "w", "x", "y", "z") - locRanges, err = cache.SplitKeyRangesByLocations(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) + locRanges, err = cache.SplitKeyRangesByLocationsWithoutBuckets(bo, NewKeyRanges(BuildKeyRanges("a", "b", "b", "h", "h", "m", "n", "t", "v", "w")), UnspecifiedLimit) require.NoError(t, err) require.Len(t, locRanges, 4) rangeEqual(t, locRanges[0].Ranges.ToRanges(), "a", "b", "b", "g") diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 75c8ca471504a..321b5643cfe50 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -47,7 +47,7 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache { func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) { ranges := NewKeyRanges(keyRanges) - locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit) + locations, err := c.SplitKeyRangesByLocationsWithoutBuckets(bo, ranges, limit) if err != nil { return nil, derr.ToTiDBErr(err) } @@ -127,11 +127,67 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { // UnspecifiedLimit means no limit. const UnspecifiedLimit = -1 -// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache. -func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { +// SplitKeyRangesByLocationsWithBuckets splits the KeyRanges by logical info in the cache. +// The buckets in the returned LocationKeyRanges are not empty if the region is split by bucket. +func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { + res := make([]*LocationKeyRanges, 0) + for ranges.Len() > 0 { + if limit != UnspecifiedLimit && len(res) >= limit { + break + } + loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey) + if err != nil { + return res, derr.ToTiDBErr(err) + } + + // Iterate to the first range that is not complete in the region. + var r kv.KeyRange + var i int + for ; i < ranges.Len(); i++ { + r = ranges.At(i) + if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { + break + } + } + // All rest ranges belong to the same region. + if i == ranges.Len() { + res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + break + } + + if loc.Contains(r.StartKey) { + // Part of r is not in the region. We need to split it. + taskRanges := ranges.Slice(0, i) + taskRanges.last = &kv.KeyRange{ + StartKey: r.StartKey, + EndKey: loc.EndKey, + } + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + + ranges = ranges.Slice(i+1, ranges.Len()) + ranges.first = &kv.KeyRange{ + StartKey: loc.EndKey, + EndKey: r.EndKey, + } + } else { + // rs[i] is not in the region. + taskRanges := ranges.Slice(0, i) + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + ranges = ranges.Slice(i, ranges.Len()) + } + } + + return res, nil +} + +// SplitKeyRangesByLocationsWithoutBuckets splits the KeyRanges by logical info in the cache. +// The buckets in the returned LocationKeyRanges are empty, regardless of whether the region is split by bucket. +func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) { if limit == 0 || ranges.Len() <= 0 { return nil, nil } + // Currently, LocationKeyRanges returned by `LocateKeyRange`` doesn't contains buckets, + // because of https://github.com/tikv/client-go/blob/09ecb550d383c1b048119b586fb5cda658312262/internal/locate/region_cache.go#L1550-L1551. locs, err := c.LocateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) if err != nil { return nil, derr.ToTiDBErr(err) @@ -205,7 +261,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges // // TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled. func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) { - locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) + locs, err := c.SplitKeyRangesByLocationsWithBuckets(bo, ranges, UnspecifiedLimit) if err != nil { return nil, derr.ToTiDBErr(err) } From c210f5feb187523f042d837b651a0589089ba8b3 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 29 Feb 2024 16:23:25 +0800 Subject: [PATCH 17/19] update --- pkg/store/copr/region_cache.go | 109 +++++++++++++-------------------- 1 file changed, 43 insertions(+), 66 deletions(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 321b5643cfe50..a5ae69e5ace96 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -124,6 +124,43 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { return res } +func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *KeyRanges, res []*LocationKeyRanges) ([]*LocationKeyRanges, bool) { + // Iterate to the first range that is not complete in the region. + var r kv.KeyRange + var i int + for ; i < ranges.Len(); i++ { + r = ranges.At(i) + if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { + break + } + } + // All rest ranges belong to the same region. + if i == ranges.Len() { + res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + return res, true + } + if loc.Contains(r.StartKey) { + // Part of r is not in the region. We need to split it. + taskRanges := ranges.Slice(0, i) + taskRanges.last = &kv.KeyRange{ + StartKey: r.StartKey, + EndKey: loc.EndKey, + } + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + *ranges = *ranges.Slice(i+1, ranges.Len()) + ranges.first = &kv.KeyRange{ + StartKey: loc.EndKey, + EndKey: r.EndKey, + } + } else { + // rs[i] is not in the region. + taskRanges := ranges.Slice(0, i) + res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) + *ranges = *ranges.Slice(i, ranges.Len()) + } + return res, false +} + // UnspecifiedLimit means no limit. const UnspecifiedLimit = -1 @@ -140,41 +177,11 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges return res, derr.ToTiDBErr(err) } - // Iterate to the first range that is not complete in the region. - var r kv.KeyRange - var i int - for ; i < ranges.Len(); i++ { - r = ranges.At(i) - if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { - break - } - } - // All rest ranges belong to the same region. - if i == ranges.Len() { - res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + isBreak := false + res, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) + if isBreak { break } - - if loc.Contains(r.StartKey) { - // Part of r is not in the region. We need to split it. - taskRanges := ranges.Slice(0, i) - taskRanges.last = &kv.KeyRange{ - StartKey: r.StartKey, - EndKey: loc.EndKey, - } - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - - ranges = ranges.Slice(i+1, ranges.Len()) - ranges.first = &kv.KeyRange{ - StartKey: loc.EndKey, - EndKey: r.EndKey, - } - } else { - // rs[i] is not in the region. - taskRanges := ranges.Slice(0, i) - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - ranges = ranges.Slice(i, ranges.Len()) - } } return res, nil @@ -217,41 +224,11 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ran break } - // Iterate to the first range that is not complete in the region. - var r kv.KeyRange - var i int - for ; i < ranges.Len(); i++ { - r = ranges.At(i) - if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) { - break - } - } - // All rest ranges belong to the same region. - if i == ranges.Len() { - res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) + isBreak := false + res, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) + if isBreak { break } - - if loc.Contains(r.StartKey) { - // Part of r is not in the region. We need to split it. - taskRanges := ranges.Slice(0, i) - taskRanges.last = &kv.KeyRange{ - StartKey: r.StartKey, - EndKey: loc.EndKey, - } - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - - ranges = ranges.Slice(i+1, ranges.Len()) - ranges.first = &kv.KeyRange{ - StartKey: loc.EndKey, - EndKey: r.EndKey, - } - } else { - // rs[i] is not in the region. - taskRanges := ranges.Slice(0, i) - res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - ranges = ranges.Slice(i, ranges.Len()) - } } return res, nil } From 27781a213a469ddae509a9255eee12f397391744 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 29 Feb 2024 16:31:03 +0800 Subject: [PATCH 18/19] more refine --- pkg/store/copr/region_cache.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index a5ae69e5ace96..3f102cbaab609 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -124,7 +124,7 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges { return res } -func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *KeyRanges, res []*LocationKeyRanges) ([]*LocationKeyRanges, bool) { +func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *KeyRanges, res []*LocationKeyRanges) ([]*LocationKeyRanges, *KeyRanges, bool) { // Iterate to the first range that is not complete in the region. var r kv.KeyRange var i int @@ -137,7 +137,7 @@ func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *Ke // All rest ranges belong to the same region. if i == ranges.Len() { res = append(res, &LocationKeyRanges{Location: loc, Ranges: ranges}) - return res, true + return res, ranges, true } if loc.Contains(r.StartKey) { // Part of r is not in the region. We need to split it. @@ -147,7 +147,7 @@ func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *Ke EndKey: loc.EndKey, } res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - *ranges = *ranges.Slice(i+1, ranges.Len()) + ranges = ranges.Slice(i+1, ranges.Len()) ranges.first = &kv.KeyRange{ StartKey: loc.EndKey, EndKey: r.EndKey, @@ -156,9 +156,9 @@ func (c *RegionCache) splitKeyRangesByLocation(loc *tikv.KeyLocation, ranges *Ke // rs[i] is not in the region. taskRanges := ranges.Slice(0, i) res = append(res, &LocationKeyRanges{Location: loc, Ranges: taskRanges}) - *ranges = *ranges.Slice(i, ranges.Len()) + ranges = ranges.Slice(i, ranges.Len()) } - return res, false + return res, ranges, false } // UnspecifiedLimit means no limit. @@ -178,7 +178,7 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges } isBreak := false - res, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) + res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) if isBreak { break } @@ -225,7 +225,7 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ran } isBreak := false - res, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) + res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res) if isBreak { break } From 3aa36f42f11945010f98591e09ff7ab6d480ab54 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 29 Feb 2024 16:55:31 +0800 Subject: [PATCH 19/19] Update pkg/store/copr/region_cache.go Co-authored-by: guo-shaoge --- pkg/store/copr/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/copr/region_cache.go b/pkg/store/copr/region_cache.go index 3f102cbaab609..7883dc13c0166 100644 --- a/pkg/store/copr/region_cache.go +++ b/pkg/store/copr/region_cache.go @@ -193,7 +193,7 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ran if limit == 0 || ranges.Len() <= 0 { return nil, nil } - // Currently, LocationKeyRanges returned by `LocateKeyRange`` doesn't contains buckets, + // Currently, LocationKeyRanges returned by `LocateKeyRange` doesn't contains buckets, // because of https://github.com/tikv/client-go/blob/09ecb550d383c1b048119b586fb5cda658312262/internal/locate/region_cache.go#L1550-L1551. locs, err := c.LocateKeyRange(bo.TiKVBackoffer(), ranges.RefAt(0).StartKey, ranges.RefAt(ranges.Len()-1).EndKey) if err != nil {