Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: one operator only occupy one limit (#3820) #3858

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,19 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {

op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader)
oc.AddWaitingOperator(op1)
c.Assert(oc.OperatorCount(op1.Kind()), Equals, uint64(1))
c.Assert(oc.OperatorCount(operator.OpLeader), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op1.RegionID())

// Region 1 already has an operator, cannot add another one.
op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0))
c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(0))

// Remove the operator manually, then we can add a new operator.
c.Assert(oc.RemoveOperator(op1), IsTrue)
op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion)
oc.AddWaitingOperator(op3)
c.Assert(oc.OperatorCount(op3.Kind()), Equals, uint64(1))
c.Assert(oc.OperatorCount(operator.OpRegion), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op3.RegionID())
}

Expand Down
29 changes: 19 additions & 10 deletions server/schedule/operator/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,25 @@ type OpKind uint32

// Flags for operators.
const (
OpLeader OpKind = 1 << iota // Include leader transfer.
OpRegion // Include peer movement.
OpSplit // Include region split.
OpAdmin // Initiated by admin.
OpHotRegion // Initiated by hot region scheduler.
OpAdjacent // Initiated by adjacent region scheduler.
OpReplica // Initiated by replica checkers.
OpBalance // Initiated by balancers.
OpMerge // Initiated by merge checkers or merge schedulers.
OpRange // Initiated by range scheduler.
// Initiated by admin.
OpAdmin OpKind = 1 << iota
// Initiated by merge checker or merge scheduler. Note that it may not include region merge.
// the order describe the operator's producer and is very helpful to decouple scheduler or checker limit
OpMerge
// Initiated by range scheduler.
OpRange
// Initiated by replica checker.
OpReplica
// Include region split. Initiated by rule checker if `kind & OpAdmin == 0`.
OpSplit
// Initiated by hot region scheduler.
OpHotRegion
// Include peer addition or removal. This means that this operator may take a long time.
OpRegion
// Include leader transfer.
OpLeader
OpBalance
OpAdjacent
opMax
)

Expand Down
10 changes: 10 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ func (o *Operator) Kind() OpKind {
return o.kind
}

// SchedulerKind return the highest OpKind even if the operator has many OpKind
// fix #3778
func (o *Operator) SchedulerKind() OpKind {
// LowBit ref: https://en.wikipedia.org/wiki/Find_first_set
// 6(110) ==> 2(10)
// 5(101) ==> 1(01)
// 4(100) ==> 4(100)
return o.kind & (-o.kind)
}

// Status returns operator status.
func (o *Operator) Status() OpStatus {
return o.status.Status()
Expand Down
43 changes: 40 additions & 3 deletions server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *testOperatorSuite) TestOperatorStep(c *C) {
}

func (s *testOperatorSuite) newTestOperator(regionID uint64, kind OpKind, steps ...OpStep) *Operator {
return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, OpAdmin|kind, steps...)
return NewOperator("test", "test", regionID, &metapb.RegionEpoch{}, kind, steps...)
}

func (s *testOperatorSuite) checkSteps(c *C, op *Operator, steps []OpStep) {
Expand All @@ -105,7 +105,7 @@ func (s *testOperatorSuite) TestOperator(c *C) {
TransferLeader{FromStore: 3, ToStore: 1},
RemovePeer{FromStore: 3},
}
op := s.newTestOperator(1, OpLeader|OpRegion, steps...)
op := s.newTestOperator(1, OpAdmin|OpLeader|OpRegion, steps...)
c.Assert(op.GetPriorityLevel(), Equals, core.HighPriority)
s.checkSteps(c, op, steps)
op.Start()
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
}

func (s *testOperatorSuite) TestOperatorKind(c *C) {
c.Assert((OpLeader | OpReplica).String(), Equals, "leader,replica")
c.Assert((OpLeader | OpReplica).String(), Equals, "replica,leader")
c.Assert(OpKind(0).String(), Equals, "unknown")
k, err := ParseOperatorKind("balance,region,leader")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -374,3 +374,40 @@ func (s *testOperatorSuite) TestCheck(c *C) {
c.Assert(op.Status(), Equals, SUCCESS)
}
}

func (s *testOperatorSuite) TestSchedulerKind(c *C) {
testdata := []struct {
op *Operator
expect OpKind
}{
{
op: s.newTestOperator(1, OpAdmin|OpMerge|OpRegion),
expect: OpAdmin,
},
{
op: s.newTestOperator(1, OpMerge|OpLeader|OpRegion),
expect: OpMerge,
}, {
op: s.newTestOperator(1, OpReplica|OpRegion),
expect: OpReplica,
}, {
op: s.newTestOperator(1, OpSplit|OpRegion),
expect: OpSplit,
}, {
op: s.newTestOperator(1, OpRange|OpRegion),
expect: OpRange,
}, {
op: s.newTestOperator(1, OpHotRegion|OpLeader|OpRegion),
expect: OpHotRegion,
}, {
op: s.newTestOperator(1, OpRegion|OpLeader),
expect: OpRegion,
}, {
op: s.newTestOperator(1, OpLeader),
expect: OpLeader,
},
}
for _, v := range testdata {
c.Assert(v.op.SchedulerKind(), Equals, v.expect)
}
}
15 changes: 5 additions & 10 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,21 +775,16 @@ func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operat
delete(oc.counts, k)
}
for _, op := range operators {
oc.counts[op.Kind()]++
oc.counts[op.SchedulerKind()]++
}
}

// OperatorCount gets the count of operators filtered by mask.
func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64 {
// OperatorCount gets the count of operators filtered by kind.
// kind only has one OpKind.
func (oc *OperatorController) OperatorCount(kind operator.OpKind) uint64 {
oc.RLock()
defer oc.RUnlock()
var total uint64
for k, count := range oc.counts {
if k&mask != 0 {
total += count
}
}
return total
return oc.counts[kind]
}

// GetOpInfluence gets OpInfluence.
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
allowed := s.opController.OperatorCount(operator.OpRegion)-s.opController.OperatorCount(operator.OpMerge) < cluster.GetRegionScheduleLimit()
allowed := s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
}
Expand Down
5 changes: 4 additions & 1 deletion server/schedulers/random_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,14 @@ func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operato
return nil
}

ops, err := operator.CreateMergeRegionOperator(RandomMergeType, cluster, region, target, operator.OpAdmin)
ops, err := operator.CreateMergeRegionOperator(RandomMergeType, cluster, region, target, operator.OpMerge)
if err != nil {
log.Debug("fail to create merge region operator", errs.ZapError(err))
return nil
}
for _, op := range ops {
op.SetPriorityLevel(core.HighPriority)
}
ops[0].Counters = append(ops[0].Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return ops
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *testShuffleLeaderSuite) TestShuffle(c *C) {
for i := 0; i < 4; i++ {
op := sl.Schedule(tc)
c.Assert(op, NotNil)
c.Assert(op[0].Kind(), Equals, operator.OpLeader|operator.OpAdmin)
c.Assert(op[0].Kind(), Equals, operator.OpLeader)
}
}

Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *testShuffleRegionSuite) TestShuffle(c *C) {
for i := 0; i < 4; i++ {
op := sl.Schedule(tc)
c.Assert(op, NotNil)
c.Assert(op[0].Kind(), Equals, operator.OpRegion|operator.OpAdmin)
c.Assert(op[0].Kind(), Equals, operator.OpRegion)
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc()
return nil
}
op, err := operator.CreateTransferLeaderOperator(ShuffleLeaderType, cluster, region, region.GetLeader().GetId(), targetStore.GetID(), operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator(ShuffleLeaderType, cluster, region, region.GetLeader().GetId(), targetStore.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create shuffle leader operator", errs.ZapError(err))
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return nil
}

op, err := operator.CreateMovePeerOperator(ShuffleRegionType, cluster, region, operator.OpAdmin, oldPeer.GetStoreId(), newPeer)
op, err := operator.CreateMovePeerOperator(ShuffleRegionType, cluster, region, operator.OpRegion, oldPeer.GetStoreId(), newPeer)
if err != nil {
schedulerCounter.WithLabelValues(s.GetName(), "create-operator-fail").Inc()
return nil
Expand Down