Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: actively push operator #1536

Merged
merged 11 commits into from
May 19, 2019
3 changes: 2 additions & 1 deletion server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pkg/errors"
"go.uber.org/zap"
)
Expand All @@ -39,7 +40,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta()))
}

c.coordinator.opController.Dispatch(region)
c.coordinator.opController.Dispatch(region, schedule.DispatchFromHeartBeat)
return nil
}

Expand Down
22 changes: 21 additions & 1 deletion server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ func (c *coordinator) patrolRegions() {
}
}

// drivePushOperator is used to push the unfinished operator to the excutor.
func (c *coordinator) drivePushOperator() {
defer logutil.LogPanic()

defer c.wg.Done()
log.Info("coordinator begins to actively drive push operator")
ticker := time.NewTicker(schedule.PushOperatorTickInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("drive push operator has been stopped")
return
case <-ticker.C:
c.opController.PushOperators()
}
}
}

func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
// If PD has restarted, it need to check learners added before and promote them.
// Don't check isRaftLearnerEnabled cause it maybe disable learner feature but there are still some learners to promote.
Expand Down Expand Up @@ -228,9 +247,10 @@ func (c *coordinator) run() {
log.Error("cannot persist schedule config", zap.Error(err))
}

c.wg.Add(1)
c.wg.Add(2)
// Starts to patrol regions.
go c.patrolRegions()
go c.drivePushOperator()
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *coordinator) stop() {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *m
if err := co.cluster.putRegion(region.Clone()); err != nil {
return err
}
co.opController.Dispatch(region)
co.opController.Dispatch(region, schedule.DispatchFromHeartBeat)
return nil
}

Expand Down
116 changes: 96 additions & 20 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schedule

import (
"container/heap"
"container/list"
"fmt"
"sync"
Expand All @@ -28,7 +29,20 @@ import (
"go.uber.org/zap"
)

var historyKeepTime = 5 * time.Minute
// The source of dispatched region.
const (
DispatchFromHeartBeat = "heartbeat"
DispatchFromNotifierQueue = "active push"
DispatchFromCreate = "create"
)

var (
historyKeepTime = 5 * time.Minute
slowNotifyInterval = 5 * time.Second
fastNotifyInterval = 2 * time.Second
// PushOperatorTickInterval is the interval try to push the operator.
PushOperatorTickInterval = 500 * time.Millisecond
)

// HeartbeatStreams is an interface of async region heartbeat.
type HeartbeatStreams interface {
Expand All @@ -38,34 +52,36 @@ type HeartbeatStreams interface {
// OperatorController is used to limit the speed of scheduling.
type OperatorController struct {
sync.RWMutex
cluster Cluster
operators map[uint64]*Operator
hbStreams HeartbeatStreams
histories *list.List
counts map[OperatorKind]uint64
opRecords *OperatorRecords
cluster Cluster
operators map[uint64]*Operator
hbStreams HeartbeatStreams
histories *list.List
counts map[OperatorKind]uint64
opRecords *OperatorRecords
opNotifierQueue operatorQueue
}

// NewOperatorController creates a OperatorController.
func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *OperatorController {
return &OperatorController{
cluster: cluster,
operators: make(map[uint64]*Operator),
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[OperatorKind]uint64),
opRecords: NewOperatorRecords(),
cluster: cluster,
operators: make(map[uint64]*Operator),
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[OperatorKind]uint64),
opRecords: NewOperatorRecords(),
opNotifierQueue: make(operatorQueue, 0),
}
}

// Dispatch is used to dispatch the operator of a region.
func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
oc.SendScheduleCommand(region, step)
oc.SendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
Expand All @@ -83,6 +99,65 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo) {
}
}

func (oc *OperatorController) getNextPushOperatorTime(step OperatorStep, now time.Time) time.Time {
nextTime := slowNotifyInterval
switch step.(type) {
case TransferLeader, PromoteLearner:
nextTime = fastNotifyInterval
}
return now.Add(nextTime)
}

// pollNeedDispatchRegion returns the region need to dispatch,
// "next" is true to indicate that it may exist in next attempt,
// and false is the end for the poll.
func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
oc.Lock()
defer oc.Unlock()
if oc.opNotifierQueue.Len() == 0 {
return nil, false
}
item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime)
regionID := item.op.regionID
op, ok := oc.operators[regionID]
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
}
now := time.Now()
if now.Before(item.time) {
heap.Push(&oc.opNotifierQueue, item)
return nil, false
}

// pushes with new notify time.
item.time = oc.getNextPushOperatorTime(step, now)
heap.Push(&oc.opNotifierQueue, item)
return r, true
}

// PushOperators periodically pushes the unfinished operator to the executor(TiKV).
func (oc *OperatorController) PushOperators() {
for {
r, next := oc.pollNeedDispatchRegion()
if !next {
break
}
if r == nil {
shafreeck marked this conversation as resolved.
Show resolved Hide resolved
continue
}

oc.Dispatch(r, DispatchFromNotifierQueue)
}
}

// AddOperator adds operators to the running operators.
func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
oc.Lock()
Expand All @@ -98,7 +173,6 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
for _, op := range ops {
oc.addOperatorLocked(op)
}

return true
}

Expand Down Expand Up @@ -145,12 +219,14 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
oc.operators[regionID] = op
oc.updateCounts(oc.operators)

var step OperatorStep
if region := oc.cluster.GetRegion(op.RegionID()); region != nil {
if step := op.Check(region); step != nil {
oc.SendScheduleCommand(region, step)
if step = op.Check(region); step != nil {
oc.SendScheduleCommand(region, step, DispatchFromCreate)
}
}

heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op, time: oc.getNextPushOperatorTime(step, time.Now())})
operatorCounter.WithLabelValues(op.Desc(), "create").Inc()
return true
}
Expand Down Expand Up @@ -203,8 +279,8 @@ func (oc *OperatorController) GetOperators() []*Operator {
}

// SendScheduleCommand sends a command to the region.
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step))
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step OperatorStep, source string) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source))
switch st := step.(type) {
case TransferLeader:
cmd := &pdpb.RegionHeartbeatResponse{
Expand Down
54 changes: 51 additions & 3 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package schedule

import (
"container/heap"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -84,11 +85,58 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
op1.createTime = time.Now().Add(-10 * time.Minute)
region2 = tc.ApplyOperatorStep(region2, op2)
tc.PutRegion(region2)
oc.Dispatch(region1)
oc.Dispatch(region2)
oc.Dispatch(region1, "test")
oc.Dispatch(region2, "test")
c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_TIMEOUT)
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING)
tc.ApplyOperator(op2)
oc.Dispatch(region2)
oc.Dispatch(region2, "test")
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}

func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
oc.hbStreams = mockHeadbeatStream{}
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
steps := []OperatorStep{
RemovePeer{FromStore: 2},
AddPeer{ToStore: 2, PeerID: 4},
}
op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
op2 := NewOperator("test", 2, &metapb.RegionEpoch{}, OpRegion, steps...)
region1 := tc.GetRegion(1)
region2 := tc.GetRegion(2)
// Adds operator and pushes to the notifier queue.
{
oc.SetOperator(op1)
oc.SetOperator(op2)
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)})
}
// fisrt poll got nil
r, next := oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)

// after wait 100 millisecond, the region1 need to dispatch, but not region2.
time.Sleep(100 * time.Millisecond)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region1.GetID())
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)

// after waiting 500 millseconds, the region2 need to dispatch
time.Sleep(400 * time.Millisecond)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region2.GetID())
}
49 changes: 49 additions & 0 deletions server/schedule/operator_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule

import "time"

type operatorWithTime struct {
op *Operator
time time.Time
}

type operatorQueue []*operatorWithTime

func (opn operatorQueue) Len() int { return len(opn) }

func (opn operatorQueue) Less(i, j int) bool {
return opn[i].time.Before(opn[j].time)
}

func (opn operatorQueue) Swap(i, j int) {
opn[i], opn[j] = opn[j], opn[i]
}

func (opn *operatorQueue) Push(x interface{}) {
item := x.(*operatorWithTime)
*opn = append(*opn, item)
}

func (opn *operatorQueue) Pop() interface{} {
old := *opn
n := len(old)
if n == 0 {
return nil
}
item := old[n-1]
shafreeck marked this conversation as resolved.
Show resolved Hide resolved
*opn = old[0 : n-1]
return item
}