Skip to content

Commit

Permalink
*: add skip store limit option for scatter region (tikv#6593)
Browse files Browse the repository at this point in the history
close tikv#6592

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jul 28, 2023
1 parent 4db1735 commit 31343e0
Show file tree
Hide file tree
Showing 22 changed files with 127 additions and 71 deletions.
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ func WithExcludeTombstone() GetStoreOption {

// RegionsOp represents available options when operate regions
type RegionsOp struct {
group string
retryLimit uint64
group string
retryLimit uint64
skipStoreLimit bool
}

// RegionsOption configures RegionsOp
Expand All @@ -191,6 +192,11 @@ func WithRetry(retry uint64) RegionsOption {
return func(op *RegionsOp) { op.retryLimit = retry }
}

// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions
func WithSkipStoreLimit() RegionsOption {
return func(op *RegionsOp) { op.skipStoreLimit = true }
}

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
Expand Down Expand Up @@ -1393,10 +1399,11 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
Group: options.group,
RegionsId: regionsID,
RetryLimit: options.retryLimit,
Header: c.requestHeader(),
Group: options.group,
RegionsId: regionsID,
RetryLimit: options.retryLimit,
SkipStoreLimit: options.skipStoreLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
25 changes: 14 additions & 11 deletions pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ type Builder struct {

// build flags
useJointConsensus bool
lightWeight bool
removeLightPeer bool
addLightPeer bool
forceTargetLeader bool

// intermediate states
Expand Down Expand Up @@ -370,9 +371,15 @@ func (b *Builder) SetExpectedRoles(roles map[uint64]placement.PeerRoleType) *Bui
return b
}

// EnableLightWeight marks the region as light weight. It is used for scatter regions.
func (b *Builder) EnableLightWeight() *Builder {
b.lightWeight = true
// SetAddLightPeer marks the add peer as light weight. It is used for scatter regions.
func (b *Builder) SetAddLightPeer() *Builder {
b.addLightPeer = true
return b
}

// SetRemoveLightPeer marks the remove peer as light weight. It is used for scatter regions.
func (b *Builder) SetRemoveLightPeer() *Builder {
b.removeLightPeer = true
return b
}

Expand Down Expand Up @@ -532,7 +539,7 @@ func (b *Builder) brief() string {
switch {
case len(b.toAdd) > 0 && len(b.toRemove) > 0:
op := "mv peer"
if b.lightWeight {
if b.addLightPeer && b.removeLightPeer {
op = "mv light peer"
}
return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd)
Expand Down Expand Up @@ -775,11 +782,7 @@ func (b *Builder) execPromoteNonWitness(peer *metapb.Peer) {
}

func (b *Builder) execAddPeer(peer *metapb.Peer) {
if b.lightWeight {
b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsLightWeight: b.lightWeight, IsWitness: peer.GetIsWitness(), SendStore: b.originLeaderStoreID})
} else {
b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: peer.GetIsWitness(), SendStore: b.originLeaderStoreID})
}
b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsLightWeight: b.addLightPeer, IsWitness: peer.GetIsWitness(), SendStore: b.originLeaderStoreID})
if !core.IsLearner(peer) {
b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsWitness: peer.GetIsWitness()})
}
Expand All @@ -795,7 +798,7 @@ func (b *Builder) execRemovePeer(peer *metapb.Peer) {
if store != nil {
isDownStore = store.DownTime() > b.GetSharedConfig().GetMaxStoreDownTime()
}
b.steps = append(b.steps, RemovePeer{FromStore: removeStoreID, PeerID: peer.GetId(), IsDownStore: isDownStore})
b.steps = append(b.steps, RemovePeer{FromStore: removeStoreID, PeerID: peer.GetId(), IsDownStore: isDownStore, IsLightWeight: b.removeLightPeer})
delete(b.currentPeers, removeStoreID)
delete(b.toRemove, removeStoreID)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/operator/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func (suite *operatorBuilderTestSuite) TestRecord() {
3: {StoreId: 3, Role: metapb.PeerRole_Learner},
4: {StoreId: 4},
}
builder := suite.newBuilder().SetPeers(m).EnableLightWeight()
builder := suite.newBuilder().SetPeers(m).SetAddLightPeer()
suite.Len(builder.targetPeers, 3)
suite.Equal(m[2], builder.targetPeers[2])
suite.Equal(m[3], builder.targetPeers[3])
suite.Equal(m[4], builder.targetPeers[4])
suite.Equal(uint64(0), builder.targetLeaderStoreID)
suite.True(builder.lightWeight)
suite.True(builder.addLightPeer)
}

func (suite *operatorBuilderTestSuite) TestPrepareBuild() {
Expand Down
12 changes: 9 additions & 3 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func isRegionMatch(a, b *core.RegionInfo) bool {
}

// CreateScatterRegionOperator creates an operator that scatters the specified region.
func CreateScatterRegionOperator(desc string, ci sche.SharedCluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) (*Operator, error) {
func CreateScatterRegionOperator(desc string, ci sche.SharedCluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64, skipLimitCheck bool) (*Operator, error) {
// randomly pick a leader.
var ids []uint64
for id, peer := range targetPeers {
Expand All @@ -249,10 +249,16 @@ func CreateScatterRegionOperator(desc string, ci sche.SharedCluster, origin *cor
if targetLeader != 0 {
leader = targetLeader
}
return NewBuilder(desc, ci, origin).

builder := NewBuilder(desc, ci, origin)
if skipLimitCheck {
builder.SetRemoveLightPeer()
}

return builder.
SetPeers(targetPeers).
SetLeader(leader).
EnableLightWeight().
SetAddLightPeer().
// EnableForceTargetLeader in order to ignore the leader schedule limit
EnableForceTargetLeader().
Build(OpAdmin)
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func (pl PromoteLearner) GetCmd(_ *core.RegionInfo, useConfChangeV2 bool) *pdpb.
// RemovePeer is an OpStep that removes a region peer.
type RemovePeer struct {
FromStore, PeerID uint64
IsLightWeight bool
IsDownStore bool
}

Expand Down Expand Up @@ -635,6 +636,10 @@ func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo)
return
}

if rp.IsLightWeight {
return
}

if rp.IsDownStore && regionSize > storelimit.SmallRegionThreshold {
regionSize = storelimit.SmallRegionThreshold
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s
regionMap[region.GetID()] = region
}
// If there existed any region failed to relocated after retry, add it into unProcessedRegions
opsCount, err := r.scatterRegions(regionMap, failures, group, retryLimit)
opsCount, err := r.scatterRegions(regionMap, failures, group, retryLimit, false)
if err != nil {
return 0, nil, err
}
return opsCount, failures, nil
}

// ScatterRegionsByID directly scatter regions by ScatterRegions
func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int) (int, map[uint64]error, error) {
func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, map[uint64]error, error) {
if len(regionsID) < 1 {
scatterSkipEmptyRegionCounter.Inc()
return 0, nil, errors.New("empty region")
Expand All @@ -220,7 +220,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r
regionMap[region.GetID()] = region
}
// If there existed any region failed to relocated after retry, add it into unProcessedRegions
opsCount, err := r.scatterRegions(regionMap, failures, group, retryLimit)
opsCount, err := r.scatterRegions(regionMap, failures, group, retryLimit, skipStoreLimit)
if err != nil {
return 0, nil, err
}
Expand All @@ -233,7 +233,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r
// time.Sleep between each retry.
// Failures indicates the regions which are failed to be relocated, the key of the failures indicates the regionID
// and the value of the failures indicates the failure error.
func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int) (int, error) {
func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int, skipStoreLimit bool) (int, error) {
if len(regions) < 1 {
scatterSkipEmptyRegionCounter.Inc()
return 0, errors.New("empty region")
Expand All @@ -244,7 +244,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa
opsCount := 0
for currentRetry := 0; currentRetry <= retryLimit; currentRetry++ {
for _, region := range regions {
op, err := r.Scatter(region, group)
op, err := r.Scatter(region, group, skipStoreLimit)
failpoint.Inject("scatterFail", func() {
if region.GetID() == 1 {
err = errors.New("mock error")
Expand Down Expand Up @@ -281,7 +281,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa

// Scatter relocates the region. If the group is defined, the regions' leader with the same group would be scattered
// in a group level instead of cluster level.
func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*operator.Operator, error) {
func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipStoreLimit bool) (*operator.Operator, error) {
if !filter.IsRegionReplicated(r.cluster, region) {
r.cluster.AddSuspectRegions(region.GetID())
scatterSkipNotReplicatedCounter.Inc()
Expand All @@ -301,10 +301,10 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera
return nil, errors.Errorf("region %d is hot", region.GetID())
}

return r.scatterRegion(region, group), nil
return r.scatterRegion(region, group, skipStoreLimit), nil
}

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator {
func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, skipStoreLimit bool) *operator.Operator {
engineFilter := filter.NewEngineFilter(r.name, filter.NotSpecialEngines)
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
specialPeers := make(map[string]map[uint64]*metapb.Peer)
Expand Down Expand Up @@ -383,7 +383,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
r.Put(targetPeers, targetLeader, group)
return nil
}
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader)
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader, skipStoreLimit)
if err != nil {
scatterFailCounter.Inc()
for _, peer := range region.GetPeers() {
Expand Down
Loading

0 comments on commit 31343e0

Please sign in to comment.