From 559000c10a6fef7a84ab870f89ec601d5ce38c2a Mon Sep 17 00:00:00 2001 From: ShuNing Date: Sun, 19 May 2019 15:16:47 +0800 Subject: [PATCH] schedule: actively push operator (#1536) * schedule: actively push operator Signed-off-by: nolouch --- server/cluster_worker.go | 2 +- server/coordinator.go | 107 ++++++++++++++++++++++++++++++++++--- server/coordinator_test.go | 8 +++ server/operator_queue.go | 52 ++++++++++++++++++ 4 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 server/operator_queue.go diff --git a/server/cluster_worker.go b/server/cluster_worker.go index 1d381d711f5..75246696d28 100644 --- a/server/cluster_worker.go +++ b/server/cluster_worker.go @@ -37,7 +37,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return errors.Errorf("invalid region, zero region peer count: %v", core.RegionToHexMeta(region.GetMeta())) } - c.coordinator.dispatch(region) + c.coordinator.dispatch(region, DispatchFromHeartBeat) return nil } diff --git a/server/coordinator.go b/server/coordinator.go index 8cd15dfe306..2d5ff46f46a 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -14,6 +14,7 @@ package server import ( + "container/heap" "container/list" "context" "fmt" @@ -44,6 +45,14 @@ const ( hotRegionScheduleName = "balance-hot-region-scheduler" patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + + DispatchFromHeartBeat = "heartbeat" + DispatchFromNotifierQueue = "active push" + DispatchFromCreate = "create" + slowNotifyInterval = 5 * time.Second + fastNotifyInterval = 2 * time.Second + // PushOperatorTickInterval is the interval try to push the operator. + PushOperatorTickInterval = 500 * time.Millisecond ) var ( @@ -70,6 +79,7 @@ type coordinator struct { histories *list.List hbStreams *heartbeatStreams opRecords *OperatorRecords + opNotifierQueue operatorQueue } func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifier namespace.Classifier) *coordinator { @@ -89,16 +99,17 @@ func newCoordinator(cluster *clusterInfo, hbStreams *heartbeatStreams, classifie histories: list.New(), hbStreams: hbStreams, opRecords: NewOperatorRecords(), + opNotifierQueue: make(operatorQueue, 0), } } -func (c *coordinator) dispatch(region *core.RegionInfo) { +func (c *coordinator) dispatch(region *core.RegionInfo, source string) { // Check existed operator. if op := c.getOperator(region.GetID()); op != nil { timeout := op.IsTimeout() if step := op.Check(region); step != nil && !timeout { operatorCounter.WithLabelValues(op.Desc(), "check").Inc() - c.sendScheduleCommand(region, step) + c.sendScheduleCommand(region, step, source) return } if op.IsFinish() { @@ -164,6 +175,25 @@ func (c *coordinator) patrolRegions() { } } +// drivePushOperator is used to push the unfinished operator to the excutor. +func (c *coordinator) drivePushOperator() { + defer logutil.LogPanic() + + defer c.wg.Done() + log.Info("coordinator begins to actively drive push operator") + ticker := time.NewTicker(PushOperatorTickInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("drive push operator has been stopped") + return + case <-ticker.C: + c.PushOperators() + } + } +} + func (c *coordinator) checkRegion(region *core.RegionInfo) bool { // If PD has restarted, it need to check learners added before and promote them. // Don't check isRaftLearnerEnabled cause it may be disable learner feature but still some learners to promote. @@ -258,8 +288,10 @@ func (c *coordinator) run() { log.Error("cannot persist schedule config", zap.Error(err)) } - c.wg.Add(1) + c.wg.Add(2) + // Starts to patrol regions. go c.patrolRegions() + go c.drivePushOperator() } func (c *coordinator) stop() { @@ -465,12 +497,14 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool { c.operators[regionID] = op c.limiter.UpdateCounts(c.operators) + var step schedule.OperatorStep if region := c.cluster.GetRegion(op.RegionID()); region != nil { - if step := op.Check(region); step != nil { - c.sendScheduleCommand(region, step) + if step = op.Check(region); step != nil { + c.sendScheduleCommand(region, step, DispatchFromCreate) } } + heap.Push(&c.opNotifierQueue, &operatorWithTime{op, c.getNextPushOperatorTime(step, time.Now())}) operatorCounter.WithLabelValues(op.Desc(), "create").Inc() return true } @@ -578,8 +612,8 @@ func (c *coordinator) getHistory(start time.Time) []schedule.OperatorHistory { return histories } -func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep) { - log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step)) +func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep, source string) { + log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source)) switch s := step.(type) { case schedule.TransferLeader: cmd := &pdpb.RegionHeartbeatResponse{ @@ -674,6 +708,65 @@ func (c *coordinator) GetOperatorStatus(id uint64) *OperatorWithStatus { return c.opRecords.Get(id) } +func (oc *coordinator) getNextPushOperatorTime(step schedule.OperatorStep, now time.Time) time.Time { + nextTime := slowNotifyInterval + switch step.(type) { + case schedule.TransferLeader, schedule.PromoteLearner: + nextTime = fastNotifyInterval + } + return now.Add(nextTime) +} + +// pollNeedDispatchRegion returns the region need to dispatch, +// "next" is true to indicate that it may exist in next attempt, +// and false is the end for the poll. +func (oc *coordinator) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { + oc.Lock() + defer oc.Unlock() + if oc.opNotifierQueue.Len() == 0 { + return nil, false + } + item := heap.Pop(&oc.opNotifierQueue).(*operatorWithTime) + regionID := item.op.RegionID() + op, ok := oc.operators[regionID] + if !ok || op == nil { + return nil, true + } + r = oc.cluster.GetRegion(regionID) + if r == nil { + return nil, true + } + step := op.Check(r) + if step == nil { + return nil, true + } + now := time.Now() + if now.Before(item.time) { + heap.Push(&oc.opNotifierQueue, item) + return nil, false + } + + // pushes with new notify time. + item.time = oc.getNextPushOperatorTime(step, now) + heap.Push(&oc.opNotifierQueue, item) + return r, true +} + +// PushOperators periodically pushes the unfinished operator to the executor(TiKV). +func (oc *coordinator) PushOperators() { + for { + r, next := oc.pollNeedDispatchRegion() + if !next { + break + } + if r == nil { + continue + } + + oc.dispatch(r, DispatchFromNotifierQueue) + } +} + type scheduleController struct { schedule.Scheduler cluster *clusterInfo diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 5aec1fafb80..405a3315dab 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -243,8 +243,16 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) { co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream) +<<<<<<< HEAD co.cluster.putRegion(region.Clone()) co.dispatch(region) +======= + if err := co.cluster.putRegion(region.Clone()); err != nil { + return err + } + co.opController.Dispatch(region, schedule.DispatchFromHeartBeat) + return nil +>>>>>>> b6150ca1... schedule: actively push operator (#1536) } func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { diff --git a/server/operator_queue.go b/server/operator_queue.go new file mode 100644 index 00000000000..1c1fd552c96 --- /dev/null +++ b/server/operator_queue.go @@ -0,0 +1,52 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "github.com/pingcap/pd/server/schedule" + "time" +) + +type operatorWithTime struct { + op *schedule.Operator + time time.Time +} + +type operatorQueue []*operatorWithTime + +func (opn operatorQueue) Len() int { return len(opn) } + +func (opn operatorQueue) Less(i, j int) bool { + return opn[i].time.Before(opn[j].time) +} + +func (opn operatorQueue) Swap(i, j int) { + opn[i], opn[j] = opn[j], opn[i] +} + +func (opn *operatorQueue) Push(x interface{}) { + item := x.(*operatorWithTime) + *opn = append(*opn, item) +} + +func (opn *operatorQueue) Pop() interface{} { + old := *opn + n := len(old) + if n == 0 { + return nil + } + item := old[n-1] + *opn = old[0 : n-1] + return item +}