Skip to content

Commit

Permalink
schedulers: evict leader support multiple targets (#4401)
Browse files Browse the repository at this point in the history
* upgrate pingcap/check dep

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* add multi-target support in TransferLeader

Add a new field `Peers` in `TransferLeader` to support
multi-target evict leader. During building steps,
`targetLeaderStoreIDs` is only used by multi-target
evict leader, so the field will not be filtered by
joint/not-joint rules.

Close #4229 .

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* add multi target transfer leader test

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix SetLeaders validation

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix bugs

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix build

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor code

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix build

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Jan 10, 2022
1 parent ebe6161 commit b140104
Show file tree
Hide file tree
Showing 22 changed files with 136 additions and 52 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
Expand All @@ -48,7 +48,7 @@ require (
go.etcd.io/bbolt v1.3.5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.19.0
go.uber.org/zap v1.19.1
golang.org/x/tools v0.1.5
google.golang.org/grpc v1.26.0
gotest.tools/gotestsum v1.7.0
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/aws/aws-sdk-go v1.35.3/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+
github.com/axw/gocov v1.0.0 h1:YsqYR66hUmilVr23tu8USgnJIJvnwh3n7j5zRn7x4LU=
github.com/axw/gocov v1.0.0/go.mod h1:LvQpEYiwwIb2nYkXY2fDWhg9/AsYqkhmrCshjlUJECE=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -388,8 +390,8 @@ github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 h1:HVl5539r48eA+uDuX/ziBmQCxzT1pGrzWbKuXT46Bq0=
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/errcode v0.3.0 h1:IF6LC/4+b1KNwrMlr2rBTUrojFPMexXBcDWZSpNwxjg=
github.com/pingcap/errcode v0.3.0/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
Expand Down Expand Up @@ -569,6 +571,7 @@ go.uber.org/fx v1.12.0 h1:+1+3Cz9M0dFMPy9SW9XUIUHye8bnPUm7q7DroNGWYG4=
go.uber.org/fx v1.12.0/go.mod h1:egT3Kyg1JFYQkvKLZ3EsykxkNrZxgXS+gKoKo7abERY=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand All @@ -584,8 +587,9 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.19.0 h1:mZQZefskPPCMIBCSEH0v2/iUqqLrYtaeqwD6FUGUnFE=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
15 changes: 14 additions & 1 deletion pkg/testutil/operator_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func CheckRemovePeer(c *C, op *operator.Operator, storeID uint64) {
func CheckTransferLeader(c *C, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
c.Assert(op, NotNil)
c.Assert(op.Len(), Equals, 1)
c.Assert(op.Step(0), Equals, operator.TransferLeader{FromStore: sourceID, ToStore: targetID})
c.Assert(op.Step(0), DeepEquals, operator.TransferLeader{FromStore: sourceID, ToStore: targetID})
kind |= operator.OpLeader
c.Assert(op.Kind()&kind, Equals, kind)
}
Expand All @@ -60,6 +60,19 @@ func CheckTransferLeaderFrom(c *C, op *operator.Operator, kind operator.OpKind,
c.Assert(op.Kind()&kind, Equals, kind)
}

// CheckMultiTargetTransferLeader checks if the operator is to transfer leader from the specified source to one of the target stores.
func CheckMultiTargetTransferLeader(c *C, op *operator.Operator, kind operator.OpKind, sourceID uint64, targetIDs []uint64) {
c.Assert(op, NotNil)
c.Assert(op.Len(), Equals, 1)
expectedOps := make([]interface{}, 0, len(targetIDs))
for _, targetID := range targetIDs {
expectedOps = append(expectedOps, operator.TransferLeader{FromStore: sourceID, ToStore: targetID, ToStores: targetIDs})
}
c.Assert(op.Step(0), DeepEqualsIn, expectedOps)
kind |= operator.OpLeader
c.Assert(op.Kind()&kind, Equals, kind)
}

func trimTransferLeaders(op *operator.Operator) (steps []operator.OpStep, lastLeader uint64) {
for i := 0; i < op.Len(); i++ {
step := op.Step(i)
Expand Down
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operato
if target == nil {
continue
}
op, err := operator.CreateTransferLeaderOperator(EvictLeaderType, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(EvictLeaderType, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), []uint64{}, operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand Down
10 changes: 8 additions & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,12 +1258,18 @@ func waitTransferLeader(c *C, stream mockhbstream.HeartbeatStream, region *core.
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() && res.GetTransferLeader().GetPeer().GetStoreId() == storeID
if res.GetRegionId() == region.GetID() {
for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) {
if peer.GetStoreId() == storeID {
return true
}
}
}
}
return false
})
return region.Clone(
core.WithLeader(res.GetTransferLeader().GetPeer()),
core.WithLeader(region.GetStorePeer(storeID)),
)
}

Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err
return errors.Errorf("region has no voter in store %v", storeID)
}

op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, region.GetLeader().GetStoreId(), newLeader.GetStoreId(), []uint64{}, operator.OpAdmin)
if err != nil {
log.Debug("fail to create transfer leader operator", errs.ZapError(err))
return err
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
if region.GetLeader().GetId() != peer.GetId() && rf.Rule.Role == placement.Leader {
checkerCounter.WithLabelValues("rule_checker", "fix-leader-role").Inc()
if c.allowLeader(fit, peer) {
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, region.GetLeader().StoreId, peer.GetStoreId(), 0)
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, region.GetLeader().StoreId, peer.GetStoreId(), []uint64{}, 0)
}
checkerCounter.WithLabelValues("rule_checker", "not-allow-leader")
return nil, errors.New("peer cannot be leader")
Expand All @@ -235,7 +235,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
checkerCounter.WithLabelValues("rule_checker", "fix-follower-role").Inc()
for _, p := range region.GetPeers() {
if c.allowLeader(fit, p) {
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, peer.GetStoreId(), p.GetStoreId(), 0)
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, peer.GetStoreId(), p.GetStoreId(), []uint64{}, 0)
}
}
checkerCounter.WithLabelValues("rule_checker", "no-new-leader").Inc()
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,8 @@ func (c *StoreCandidates) RandomPick() *core.StoreInfo {
}
return c.Stores[rand.Intn(len(c.Stores))]
}

// PickAll return all stores in candidate list.
func (c *StoreCandidates) PickAll() []*core.StoreInfo {
return c.Stores
}
53 changes: 37 additions & 16 deletions server/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ type Builder struct {
expectedRoles map[uint64]placement.PeerRoleType

// operation record
originPeers peersMap
unhealthyPeers peersMap
originLeaderStoreID uint64
targetPeers peersMap
targetLeaderStoreID uint64
err error
originPeers peersMap
unhealthyPeers peersMap
originLeaderStoreID uint64
targetPeers peersMap
targetLeaderStoreID uint64
targetLeaderStoreIDs []uint64 // This field is only used during multi-target evict leader, and will not be filtered during `Build`.
err error

// skip origin check flags
skipOriginJointStateCheck bool
Expand Down Expand Up @@ -241,6 +242,24 @@ func (b *Builder) SetLeader(storeID uint64) *Builder {
return b
}

// SetLeaders records all valid target leaders in Builder.
func (b *Builder) SetLeaders(storeIDs []uint64) *Builder {
if b.err != nil {
return b
}
for _, storeID := range storeIDs {
peer := b.targetPeers[storeID]
if peer == nil || core.IsLearner(peer) || b.unhealthyPeers[storeID] != nil {
continue
}
b.targetLeaderStoreIDs = append(b.targetLeaderStoreIDs, storeID)
}
// Don't need to check if there's valid target, because `targetLeaderStoreIDs`
// can be empty if this is not a multi-target evict leader operation. Besides,
// `targetLeaderStoreID` must be valid and there must be at least one valid target.
return b
}

// SetPeers resets the target peer list.
//
// If peer's ID is 0, the builder will allocate a new ID later. If current
Expand Down Expand Up @@ -447,6 +466,8 @@ func (b *Builder) brief() string {
return fmt.Sprintf("promote peer: store %s", b.toPromote)
case len(b.toDemote) > 0:
return fmt.Sprintf("demote peer: store %s", b.toDemote)
case len(b.targetLeaderStoreIDs) != 0:
return fmt.Sprintf("evict leader: from store %d to one in %v, or to %d (for compatibility)", b.originLeaderStoreID, b.targetLeaderStoreIDs, b.targetLeaderStoreID)
case b.originLeaderStoreID != b.targetLeaderStoreID:
return fmt.Sprintf("transfer leader: store %d to %d", b.originLeaderStoreID, b.targetLeaderStoreID)
default:
Expand Down Expand Up @@ -492,7 +513,7 @@ func (b *Builder) buildStepsWithJointConsensus(kind OpKind) (OpKind, error) {
if targetLeaderBefore, ok := b.originPeers[b.targetLeaderStoreID]; ok && !core.IsLearner(targetLeaderBefore) {
// target leader is a voter in `originPeers`, transfer leader first.
if b.originLeaderStoreID != b.targetLeaderStoreID {
b.execTransferLeader(b.targetLeaderStoreID)
b.execTransferLeader(b.targetLeaderStoreID, b.targetLeaderStoreIDs)
kind |= OpLeader
}
b.execChangePeerV2(true, false)
Expand All @@ -501,7 +522,7 @@ func (b *Builder) buildStepsWithJointConsensus(kind OpKind) (OpKind, error) {
// origin leader is none or a voter in `targetPeers`, change peers first.
b.execChangePeerV2(true, false)
if b.originLeaderStoreID != b.targetLeaderStoreID {
b.execTransferLeader(b.targetLeaderStoreID)
b.execTransferLeader(b.targetLeaderStoreID, b.targetLeaderStoreIDs)
kind |= OpLeader
}
} else {
Expand Down Expand Up @@ -589,7 +610,7 @@ func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {
return kind, errors.New("fail to build operator: plan is empty, maybe no valid leader")
}
if plan.leaderBeforeAdd != 0 && plan.leaderBeforeAdd != b.currentLeaderStoreID {
b.execTransferLeader(plan.leaderBeforeAdd)
b.execTransferLeader(plan.leaderBeforeAdd, b.targetLeaderStoreIDs)
kind |= OpLeader
}
if plan.add != nil {
Expand All @@ -600,7 +621,7 @@ func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {
b.execPromoteLearner(plan.promote)
}
if plan.leaderBeforeRemove != 0 && plan.leaderBeforeRemove != b.currentLeaderStoreID {
b.execTransferLeader(plan.leaderBeforeRemove)
b.execTransferLeader(plan.leaderBeforeRemove, b.targetLeaderStoreIDs)
kind |= OpLeader
}
if plan.remove != nil {
Expand All @@ -615,7 +636,7 @@ func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {
b.currentLeaderStoreID != b.targetLeaderStoreID &&
b.currentPeers[b.targetLeaderStoreID] != nil {
// Transfer only when target leader is legal.
b.execTransferLeader(b.targetLeaderStoreID)
b.execTransferLeader(b.targetLeaderStoreID, b.targetLeaderStoreIDs)
kind |= OpLeader
}

Expand All @@ -625,9 +646,9 @@ func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {
return kind, nil
}

func (b *Builder) execTransferLeader(id uint64) {
b.steps = append(b.steps, TransferLeader{FromStore: b.currentLeaderStoreID, ToStore: id})
b.currentLeaderStoreID = id
func (b *Builder) execTransferLeader(targetStoreID uint64, targetStoreIDs []uint64) {
b.steps = append(b.steps, TransferLeader{FromStore: b.currentLeaderStoreID, ToStore: targetStoreID, ToStores: targetStoreIDs})
b.currentLeaderStoreID = targetStoreID
}

func (b *Builder) execPromoteLearner(peer *metapb.Peer) {
Expand Down Expand Up @@ -668,7 +689,7 @@ func (b *Builder) execChangePeerV2(needEnter bool, needTransferLeader bool) {

// Transfer Leader
if needTransferLeader && b.originLeaderStoreID != b.targetLeaderStoreID {
b.execTransferLeader(b.targetLeaderStoreID)
b.execTransferLeader(b.targetLeaderStoreID, b.targetLeaderStoreIDs)
}

return
Expand Down Expand Up @@ -700,7 +721,7 @@ func (b *Builder) execChangePeerV2(needEnter bool, needTransferLeader bool) {

// Transfer Leader
if needTransferLeader && b.originLeaderStoreID != b.targetLeaderStoreID {
b.execTransferLeader(b.targetLeaderStoreID)
b.execTransferLeader(b.targetLeaderStoreID, b.targetLeaderStoreIDs)
}

// TiKV will handle leave step if only single peer change in promote and demote when enter step is bypassed
Expand Down
3 changes: 2 additions & 1 deletion server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func CreateRemovePeerOperator(desc string, cluster opt.Cluster, kind OpKind, reg
}

// CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store.
func CreateTransferLeaderOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, kind OpKind) (*Operator, error) {
func CreateTransferLeaderOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, cluster, region, SkipOriginJointStateCheck).
SetLeader(targetStoreID).
SetLeaders(targetStoreIDs).
Build(kind)
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *testCreateOperatorSuite) TestCreateTransferLeaderOperator(c *C) {
}
for _, tc := range cases {
region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: tc.originPeers}, tc.originPeers[0])
op, err := CreateTransferLeaderOperator("test", s.cluster, region, tc.originPeers[0].StoreId, tc.targetLeaderStoreID, 0)
op, err := CreateTransferLeaderOperator("test", s.cluster, region, tc.originPeers[0].StoreId, tc.targetLeaderStoreID, []uint64{}, 0)

if tc.isErr {
c.Assert(err, NotNil)
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *testOperatorSuite) newTestOperator(regionID uint64, kind OpKind, steps
func (s *testOperatorSuite) checkSteps(c *C, op *Operator, steps []OpStep) {
c.Assert(op.Len(), Equals, len(steps))
for i := range steps {
c.Assert(op.Step(i), Equals, steps[i])
c.Assert(op.Step(i), DeepEquals, steps[i])
}
}

Expand Down
32 changes: 25 additions & 7 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ type OpStep interface {

// TransferLeader is an OpStep that transfers a region's leader.
type TransferLeader struct {
// Compatible with old TiKV's TransferLeader.
FromStore, ToStore uint64
// Multi-target transfer leader.
ToStores []uint64
}

// ConfVerChanged returns the delta value for version increased by this step.
Expand All @@ -56,19 +59,34 @@ func (tl TransferLeader) String() string {

// IsFinish checks if current step is finished.
func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool {
for _, storeID := range tl.ToStores {
if region.GetLeader().GetStoreId() == storeID {
return true
}
}
return region.GetLeader().GetStoreId() == tl.ToStore
}

// CheckInProgress checks if the step is in the progress of advancing.
func (tl TransferLeader) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(tl.ToStore)
if peer == nil {
return errors.New("peer does not existed")
}
if core.IsLearner(peer) {
return errors.New("peer already is a learner")
errList := make([]error, 0, len(tl.ToStores)+1)
for _, storeID := range append(tl.ToStores, tl.ToStore) {
peer := region.GetStorePeer(tl.ToStore)
if peer == nil {
errList = append(errList, errors.New("peer does not existed"))
continue
}
if core.IsLearner(peer) {
errList = append(errList, errors.New("peer already is a learner"))
continue
}
if err := validateStore(cluster, storeID); err != nil {
errList = append(errList, err)
continue
}
return nil
}
return validateStore(cluster, tl.ToStore)
return errors.Errorf("%v", errList)
}

// Influence calculates the store difference that current step makes.
Expand Down
7 changes: 6 additions & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,14 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step
var cmd *pdpb.RegionHeartbeatResponse
switch st := step.(type) {
case operator.TransferLeader:
peers := make([]*metapb.Peer, 0, len(st.ToStores))
for _, storeID := range st.ToStores {
peers = append(peers, region.GetStorePeer(storeID))
}
cmd = &pdpb.RegionHeartbeatResponse{
TransferLeader: &pdpb.TransferLeader{
Peer: region.GetStorePeer(st.ToStore),
Peer: region.GetStorePeer(st.ToStore),
Peers: peers,
},
}
case operator.AddPeer:
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.O
return nil
}

op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, plan.cluster, plan.region, plan.region.GetLeader().GetStoreId(), plan.TargetStoreID(), operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, plan.cluster, plan.region, plan.region.GetLeader().GetStoreId(), plan.TargetStoreID(), []uint64{}, operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
return nil
Expand Down
Loading

0 comments on commit b140104

Please sign in to comment.