Skip to content

Commit

Permalink
client: report error when scan region encounter hole region (#8375)
Browse files Browse the repository at this point in the history
close #8358

client: report error when scan region encounter hole region
- add a input parameter(OutputMustContainAllKeyRange) in the BatchScanRegionsRequest
- when the new param enable and find the result doesn't contain all key range(input), it will return an error to user
- add Merge() method to merge the continuous KeyRanges
- pull out the scanRegion function and add unit tests for it

Signed-off-by: okJiang <819421878@qq.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Jul 17, 2024
1 parent 1ad446e commit 0b5ed0f
Show file tree
Hide file tree
Showing 15 changed files with 417 additions and 81 deletions.
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
allowFollowerHandle bool
needBuckets bool
allowFollowerHandle bool
outputMustContainAllKeyRange bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -231,6 +232,11 @@ func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// WithOutputMustContainAllKeyRange means the output must contain all key ranges.
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true }
}

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand Down Expand Up @@ -1193,10 +1199,11 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
}
req := &pdpb.BatchScanRegionsRequest{
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
ContainAllKeyRange: options.outputMustContainAllKeyRange,
}
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20240612100141-91f6c281e441
Expand Down Expand Up @@ -173,7 +173,7 @@ require (
go.etcd.io/bbolt v1.3.9 // indirect
go.uber.org/dig v1.9.0 // indirect
go.uber.org/fx v1.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/multierr v1.11.0
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/image v0.10.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
55 changes: 53 additions & 2 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package core

import "bytes"

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
*StoresInfo
Expand Down Expand Up @@ -97,7 +99,29 @@ type RegionSetInformer interface {
GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo)
ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo
GetRegionByKey(regionKey []byte) *RegionInfo
BatchScanRegions(keyRanges *KeyRanges, limit int) []*RegionInfo
BatchScanRegions(keyRanges *KeyRanges, opts ...BatchScanRegionsOptionFunc) ([]*RegionInfo, error)
}

type batchScanRegionsOptions struct {
limit int
outputMustContainAllKeyRange bool
}

// BatchScanRegionsOptionFunc is the option function for BatchScanRegions.
type BatchScanRegionsOptionFunc func(*batchScanRegionsOptions)

// WithLimit is an option for batchScanRegionsOptions.
func WithLimit(limit int) BatchScanRegionsOptionFunc {
return func(opt *batchScanRegionsOptions) {
opt.limit = limit
}
}

// WithOutputMustContainAllKeyRange is an option for batchScanRegionsOptions.
func WithOutputMustContainAllKeyRange() BatchScanRegionsOptionFunc {
return func(opt *batchScanRegionsOptions) {
opt.outputMustContainAllKeyRange = true
}
}

// StoreSetInformer provides access to a shared informer of stores.
Expand Down Expand Up @@ -136,7 +160,7 @@ func NewKeyRange(startKey, endKey string) KeyRange {
}
}

// KeyRanges is a slice of KeyRange.
// KeyRanges is a slice of monotonically increasing KeyRange.
type KeyRanges struct {
krs []*KeyRange
}
Expand All @@ -163,3 +187,30 @@ func (rs *KeyRanges) Ranges() []*KeyRange {
}
return rs.krs
}

// Merge merges the continuous KeyRanges.
func (rs *KeyRanges) Merge() {
if len(rs.krs) == 0 {
return
}
merged := make([]*KeyRange, 0, len(rs.krs))
start := rs.krs[0].StartKey
end := rs.krs[0].EndKey
for _, kr := range rs.krs[1:] {
if bytes.Equal(end, kr.StartKey) {
end = kr.EndKey
} else {
merged = append(merged, &KeyRange{
StartKey: start,
EndKey: end,
})
start = kr.StartKey
end = kr.EndKey
}
}
merged = append(merged, &KeyRange{
StartKey: start,
EndKey: end,
})
rs.krs = merged
}
93 changes: 93 additions & 0 deletions pkg/core/basic_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMergeKeyRanges(t *testing.T) {
re := require.New(t)

testCases := []struct {
name string
input []*KeyRange
expect []*KeyRange
}{
{
name: "empty",
input: []*KeyRange{},
expect: []*KeyRange{},
},
{
name: "single",
input: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("b")},
},
expect: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("b")},
},
},
{
name: "non-overlapping",
input: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("b")},
{StartKey: []byte("c"), EndKey: []byte("d")},
},
expect: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("b")},
{StartKey: []byte("c"), EndKey: []byte("d")},
},
},
{
name: "continuous",
input: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("b")},
{StartKey: []byte("b"), EndKey: []byte("c")},
},
expect: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("c")},
},
},
{
name: "boundless 1",
input: []*KeyRange{
{StartKey: nil, EndKey: []byte("b")},
{StartKey: []byte("b"), EndKey: []byte("c")},
},
expect: []*KeyRange{
{StartKey: nil, EndKey: []byte("c")},
},
},
{
name: "boundless 2",
input: []*KeyRange{
{StartKey: []byte("a"), EndKey: []byte("b")},
{StartKey: []byte("b"), EndKey: nil},
},
expect: []*KeyRange{
{StartKey: []byte("a"), EndKey: nil},
},
},
}

for _, tc := range testCases {
rs := &KeyRanges{krs: tc.input}
rs.Merge()
re.Equal(tc.expect, rs.Ranges(), tc.name)
}
}
100 changes: 77 additions & 23 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1826,37 +1826,91 @@ func (r *RegionsInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionI
// BatchScanRegions scans regions in given key pairs, returns at most `limit` regions.
// limit <= 0 means no limit.
// The given key pairs should be non-overlapping.
func (r *RegionsInfo) BatchScanRegions(keyRanges *KeyRanges, limit int) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()

func (r *RegionsInfo) BatchScanRegions(keyRanges *KeyRanges, opts ...BatchScanRegionsOptionFunc) ([]*RegionInfo, error) {
keyRanges.Merge()
krs := keyRanges.Ranges()
res := make([]*RegionInfo, 0, len(krs))
var lastRegion *RegionInfo

scanOptions := &batchScanRegionsOptions{}
for _, opt := range opts {
opt(scanOptions)
}

r.t.RLock()
defer r.t.RUnlock()
for _, keyRange := range krs {
if limit > 0 && len(res) >= limit {
return res
if scanOptions.limit > 0 && len(res) >= scanOptions.limit {
res = res[:scanOptions.limit]
return res, nil
}
if lastRegion != nil {
if lastRegion.Contains(keyRange.EndKey) {
continue
} else if lastRegion.Contains(keyRange.StartKey) {
keyRange.StartKey = lastRegion.GetEndKey()
}

regions, err := scanRegion(r.tree, keyRange, scanOptions.limit, scanOptions.outputMustContainAllKeyRange)
if err != nil {
return nil, err
}
r.tree.scanRange(keyRange.StartKey, func(region *RegionInfo) bool {
if len(keyRange.EndKey) > 0 && bytes.Compare(region.GetStartKey(), keyRange.EndKey) >= 0 {
return false
}
if limit > 0 && len(res) >= limit {
if len(res) > 0 && len(regions) > 0 && res[len(res)-1].meta.Id == regions[0].meta.Id {
// skip the region that has been scanned
regions = regions[1:]
}
res = append(res, regions...)
}
return res, nil
}

func scanRegion(regionTree *regionTree, keyRange *KeyRange, limit int, outputMustContainAllKeyRange bool) ([]*RegionInfo, error) {
var (
res []*RegionInfo
lastRegion = &RegionInfo{
meta: &metapb.Region{EndKey: keyRange.StartKey},
}
exceedLimit = func() bool { return limit > 0 && len(res) >= limit }
err error
)
regionTree.scanRange(keyRange.StartKey, func(region *RegionInfo) bool {
if len(keyRange.EndKey) > 0 && len(region.GetStartKey()) > 0 &&
bytes.Compare(region.GetStartKey(), keyRange.EndKey) >= 0 {
return false
}
if exceedLimit() {
return false
}
if len(lastRegion.GetEndKey()) > 0 && len(region.GetStartKey()) > 0 &&
bytes.Compare(region.GetStartKey(), lastRegion.GetEndKey()) > 0 {
err = errs.ErrRegionNotAdjacent.FastGen(
"key range[%x, %x) found a hole region between region[%x, %x) and region[%x, %x)",
keyRange.StartKey, keyRange.EndKey,
lastRegion.GetStartKey(), lastRegion.GetEndKey(),
region.GetStartKey(), region.GetEndKey())
log.Warn("scan regions failed", zap.Bool("outputMustContainAllKeyRange",
outputMustContainAllKeyRange), zap.Error(err))
if outputMustContainAllKeyRange {
return false
}
lastRegion = region
res = append(res, region)
return true
})
}

lastRegion = region
res = append(res, region)
return true
})
if outputMustContainAllKeyRange && err != nil {
return nil, err
}
return res

if !(exceedLimit()) && len(keyRange.EndKey) > 0 && len(lastRegion.GetEndKey()) > 0 &&
bytes.Compare(lastRegion.GetEndKey(), keyRange.EndKey) < 0 {
err = errs.ErrRegionNotAdjacent.FastGen(
"key range[%x, %x) found a hole region in the last, the last scanned region is [%x, %x), [%x, %x) is missing",
keyRange.StartKey, keyRange.EndKey,
lastRegion.GetStartKey(), lastRegion.GetEndKey(),
lastRegion.GetEndKey(), keyRange.EndKey)
log.Warn("scan regions failed", zap.Bool("outputMustContainAllKeyRange",
outputMustContainAllKeyRange), zap.Error(err))
if outputMustContainAllKeyRange {
return nil, err
}
}

return res, nil
}

// ScanRegionWithIterator scans from the first region containing or behind start key,
Expand Down
Loading

0 comments on commit 0b5ed0f

Please sign in to comment.