Skip to content

Commit

Permalink
schedule: actively push operator (#1536)
Browse files Browse the repository at this point in the history
* schedule: actively push operator

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Aug 16, 2019
1 parent a9f5b81 commit 559000c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 8 deletions.
2 changes: 1 addition & 1 deletion server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return errors.Errorf("invalid region, zero region peer count: %v", core.RegionToHexMeta(region.GetMeta()))
}

c.coordinator.dispatch(region)
c.coordinator.dispatch(region, DispatchFromHeartBeat)
return nil
}

Expand Down
107 changes: 100 additions & 7 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"container/heap"
"container/list"
"context"
"fmt"
Expand Down Expand Up @@ -44,6 +45,14 @@ const (
hotRegionScheduleName = "balance-hot-region-scheduler"

patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions.

DispatchFromHeartBeat = "heartbeat"
DispatchFromNotifierQueue = "active push"
DispatchFromCreate = "create"
slowNotifyInterval = 5 * time.Second
fastNotifyInterval = 2 * time.Second
// PushOperatorTickInterval is the interval try to push the operator.
PushOperatorTickInterval = 500 * time.Millisecond
)

var (
Expand All @@ -70,6 +79,7 @@ type coordinator struct {
histories *list.List
hbStreams *heartbeatStreams
opRecords *OperatorRecords
opNotifierQueue operatorQueue
}

func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifier namespace.Classifier) *coordinator {
Expand All @@ -89,16 +99,17 @@ func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifie
histories: list.New(),
hbStreams: hbStreams,
opRecords: NewOperatorRecords(),
opNotifierQueue: make(operatorQueue, 0),
}
}

func (c *coordinator) dispatch(region *core.RegionInfo) {
func (c *coordinator) dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := c.getOperator(region.GetID()); op != nil {
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
c.sendScheduleCommand(region, step)
c.sendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
Expand Down Expand Up @@ -164,6 +175,25 @@ func (c *coordinator) patrolRegions() {
}
}

// drivePushOperator is used to push the unfinished operator to the excutor.
func (c *coordinator) drivePushOperator() {
defer logutil.LogPanic()

defer c.wg.Done()
log.Info("coordinator begins to actively drive push operator")
ticker := time.NewTicker(PushOperatorTickInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("drive push operator has been stopped")
return
case <-ticker.C:
c.PushOperators()
}
}
}

func (c *coordinator) checkRegion(region *core.RegionInfo) bool {
// If PD has restarted, it need to check learners added before and promote them.
// Don't check isRaftLearnerEnabled cause it may be disable learner feature but still some learners to promote.
Expand Down Expand Up @@ -258,8 +288,10 @@ func (c *coordinator) run() {
log.Error("cannot persist schedule config", zap.Error(err))
}

c.wg.Add(1)
c.wg.Add(2)
// Starts to patrol regions.
go c.patrolRegions()
go c.drivePushOperator()
}

func (c *coordinator) stop() {
Expand Down Expand Up @@ -465,12 +497,14 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool {
c.operators[regionID] = op
c.limiter.UpdateCounts(c.operators)

var step schedule.OperatorStep
if region := c.cluster.GetRegion(op.RegionID()); region != nil {
if step := op.Check(region); step != nil {
c.sendScheduleCommand(region, step)
if step = op.Check(region); step != nil {
c.sendScheduleCommand(region, step, DispatchFromCreate)
}
}

heap.Push(&c.opNotifierQueue, &operatorWithTime{op, c.getNextPushOperatorTime(step, time.Now())})
operatorCounter.WithLabelValues(op.Desc(), "create").Inc()
return true
}
Expand Down Expand Up @@ -578,8 +612,8 @@ func (c *coordinator) getHistory(start time.Time) []schedule.OperatorHistory {
return histories
}

func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step))
func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep, source string) {
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source))
switch s := step.(type) {
case schedule.TransferLeader:
cmd := &pdpb.RegionHeartbeatResponse{
Expand Down Expand Up @@ -674,6 +708,65 @@ func (c *coordinator) GetOperatorStatus(id uint64) *OperatorWithStatus {
return c.opRecords.Get(id)
}

func (oc *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now time.Time) time.Time {
nextTime := slowNotifyInterval
switch step.(type) {
case schedule.TransferLeader, schedule.PromoteLearner:
nextTime = fastNotifyInterval
}
return now.Add(nextTime)
}

// pollNeedDispatchRegion returns the region need to dispatch,
// "next" is true to indicate that it may exist in next attempt,
// and false is the end for the poll.
func (oc *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
oc.Lock()
defer oc.Unlock()
if oc.opNotifierQueue.Len() == 0 {
return nil, false
}
item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime)
regionID := item.op.RegionID()
op, ok := oc.operators[regionID]
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
}
now := time.Now()
if now.Before(item.time) {
heap.Push(&oc.opNotifierQueue, item)
return nil, false
}

// pushes with new notify time.
item.time = oc.getNextPushOperatorTime(step, now)
heap.Push(&oc.opNotifierQueue, item)
return r, true
}

// PushOperators periodically pushes the unfinished operator to the executor(TiKV).
func (oc *coordinator) PushOperators() {
for {
r, next := oc.pollNeedDispatchRegion()
if !next {
break
}
if r == nil {
continue
}

oc.dispatch(r, DispatchFromNotifierQueue)
}
}

type scheduleController struct {
schedule.Scheduler
cluster *clusterInfo
Expand Down
8 changes: 8 additions & 0 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,16 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {

func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) {
co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream)
<<<<<<< HEAD
co.cluster.putRegion(region.Clone())
co.dispatch(region)
=======
if err := co.cluster.putRegion(region.Clone()); err != nil {
return err
}
co.opController.Dispatch(region, schedule.DispatchFromHeartBeat)
return nil
>>>>>>> b6150ca1... schedule: actively push operator (#1536)
}

func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
Expand Down
52 changes: 52 additions & 0 deletions server/operator_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"github.com/pingcap/pd/server/schedule"
"time"
)

type operatorWithTime struct {
op *schedule.Operator
time time.Time
}

type operatorQueue []*operatorWithTime

func (opn operatorQueue) Len() int { return len(opn) }

func (opn operatorQueue) Less(i, j int) bool {
return opn[i].time.Before(opn[j].time)
}

func (opn operatorQueue) Swap(i, j int) {
opn[i], opn[j] = opn[j], opn[i]
}

func (opn *operatorQueue) Push(x interface{}) {
item := x.(*operatorWithTime)
*opn = append(*opn, item)
}

func (opn *operatorQueue) Pop() interface{} {
old := *opn
n := len(old)
if n == 0 {
return nil
}
item := old[n-1]
*opn = old[0 : n-1]
return item
}

0 comments on commit 559000c

Please sign in to comment.