From e17881170e22231d8b9a80274bf187ccc1d3b8a8 Mon Sep 17 00:00:00 2001 From: rleungx Date: Fri, 4 Jan 2019 16:34:29 +0800 Subject: [PATCH 1/3] make heartbeat more reasonable Signed-off-by: rleungx --- tools/pd-simulator/simulator/drive.go | 6 ++- tools/pd-simulator/simulator/node.go | 8 +-- tools/pd-simulator/simulator/raft.go | 22 +++++++- tools/pd-simulator/simulator/statistics.go | 62 ++++++++++++++++++++++ tools/pd-simulator/simulator/task.go | 23 +++----- 5 files changed, 101 insertions(+), 20 deletions(-) diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index eec218c97d2..ebb3c8656fd 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -15,6 +15,7 @@ package simulator import ( "context" + "sync" "go.uber.org/zap" @@ -27,6 +28,7 @@ import ( // Driver promotes the cluster status change. type Driver struct { + wg sync.WaitGroup pdAddr string simCase *cases.Case client Client @@ -106,8 +108,10 @@ func (d *Driver) Tick() { d.eventRunner.Tick(d.tickCount) for _, n := range d.conn.Nodes { n.reportRegionChange() - n.Tick() + d.wg.Add(1) + go n.Tick(d) } + d.wg.Wait() } // Check checks if the simulation is completed. diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 040a1cdffdc..2f8d9f2e824 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -112,7 +112,8 @@ func (n *Node) receiveRegionHeartbeat() { } // Tick steps node status change. -func (n *Node) Tick() { +func (n *Node) Tick(d *Driver) { + defer d.wg.Done() if n.GetState() != metapb.StoreState_Up { return } @@ -185,7 +186,8 @@ func (n *Node) regionHeartBeat() { } func (n *Node) reportRegionChange() { - for _, regionID := range n.raftEngine.regionChange[n.Id] { + regionIDs := n.raftEngine.GetRegionChange(n.Id) + for _, regionID := range regionIDs { region := n.raftEngine.GetRegion(regionID) ctx, cancel := context.WithTimeout(n.ctx, pdTimeout) err := n.client.RegionHeartbeat(ctx, region) @@ -195,9 +197,9 @@ func (n *Node) reportRegionChange() { zap.Uint64("region-id", region.GetID()), zap.Error(err)) } + n.raftEngine.ResetRegionChange(n.Id, regionID) cancel() } - delete(n.raftEngine.regionChange, n.Id) } // AddTask adds task in this node. diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index 1d2c76c5aee..eeb98b5ada7 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -237,13 +237,33 @@ func (r *RaftEngine) electNewLeader(region *core.RegionInfo) *metapb.Peer { return nil } -// GetRegion returns the RegionInfo with regionID +// GetRegion returns the RegionInfo with regionID. func (r *RaftEngine) GetRegion(regionID uint64) *core.RegionInfo { r.RLock() defer r.RUnlock() return r.regionsInfo.GetRegion(regionID) } +// GetRegionChange returns a list of RegionID for a given store. +func (r *RaftEngine) GetRegionChange(storeID uint64) []uint64 { + r.RLock() + defer r.RUnlock() + return r.regionChange[storeID] +} + +// ResetRegionChange resets RegionInfo on a specific store with a given Region ID +func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) { + r.Lock() + defer r.Unlock() + regionIDs := r.regionChange[storeID] + for i, id := range regionIDs { + if id == regionID { + r.regionChange[storeID] = append(r.regionChange[storeID][:i], r.regionChange[storeID][i+1:]...) + return + } + } +} + // GetRegions gets all RegionInfo from regionMap func (r *RaftEngine) GetRegions() []*core.RegionInfo { r.RLock() diff --git a/tools/pd-simulator/simulator/statistics.go b/tools/pd-simulator/simulator/statistics.go index 49977a08e8a..c353197c748 100644 --- a/tools/pd-simulator/simulator/statistics.go +++ b/tools/pd-simulator/simulator/statistics.go @@ -16,9 +16,11 @@ package simulator import ( "fmt" "math" + "sync" ) type taskStatistics struct { + sync.RWMutex addPeer map[uint64]int removePeer map[uint64]int addLearner map[uint64]int @@ -38,6 +40,8 @@ func newTaskStatistics() *taskStatistics { } func (t *taskStatistics) getStatistics() map[string]int { + t.RLock() + defer t.RUnlock() stats := make(map[string]int) addpeer := getSum(t.addPeer) removePeer := getSum(t.removePeer) @@ -61,7 +65,51 @@ func (t *taskStatistics) getStatistics() map[string]int { return stats } +func (t *taskStatistics) incAddPeer(regionID uint64) { + t.Lock() + defer t.Unlock() + t.addPeer[regionID]++ +} + +func (t *taskStatistics) incAddLeaner(regionID uint64) { + t.Lock() + defer t.Unlock() + t.addLearner[regionID]++ +} + +func (t *taskStatistics) incPromoteLeaner(regionID uint64) { + t.Lock() + defer t.Unlock() + t.promoteLeaner[regionID]++ +} + +func (t *taskStatistics) incRemovePeer(regionID uint64) { + t.Lock() + defer t.Unlock() + t.removePeer[regionID]++ +} + +func (t *taskStatistics) incMergeRegion() { + t.Lock() + defer t.Unlock() + t.mergeRegion++ +} + +func (t *taskStatistics) incTransferLeader(fromPeerID, toPeerID uint64) { + t.Lock() + defer t.Unlock() + _, ok := t.transferLeader[fromPeerID] + if ok { + t.transferLeader[fromPeerID][toPeerID]++ + } else { + m := make(map[uint64]int) + m[toPeerID]++ + t.transferLeader[fromPeerID] = m + } +} + type snapshotStatistics struct { + sync.RWMutex receive map[uint64]int send map[uint64]int } @@ -86,6 +134,8 @@ func newSchedulerStatistics() *schedulerStatistics { } func (s *snapshotStatistics) getStatistics() map[string]int { + s.RLock() + defer s.RUnlock() maxSend := getMax(s.send) maxReceive := getMax(s.receive) minSend := getMin(s.send) @@ -104,6 +154,18 @@ func (s *snapshotStatistics) getStatistics() map[string]int { return stats } +func (s *snapshotStatistics) incSendSnapshot(storeID uint64) { + s.Lock() + defer s.Unlock() + s.send[storeID]++ +} + +func (s *snapshotStatistics) incReceiveSnapshot(storeID uint64) { + s.Lock() + defer s.Unlock() + s.receive[storeID]++ +} + // PrintStatistics prints the statistics of the scheduler. func (s *schedulerStatistics) PrintStatistics() { task := s.taskStats.getStatistics() diff --git a/tools/pd-simulator/simulator/task.go b/tools/pd-simulator/simulator/task.go index e5436de94df..cb9f81cc65c 100644 --- a/tools/pd-simulator/simulator/task.go +++ b/tools/pd-simulator/simulator/task.go @@ -152,7 +152,7 @@ func (m *mergeRegion) Step(r *RaftEngine) { ) r.SetRegion(mergeRegion) r.recordRegionChange(mergeRegion) - r.schedulerStats.taskStats.mergeRegion++ + r.schedulerStats.taskStats.incMergeRegion() m.finished = true } @@ -194,14 +194,7 @@ func (t *transferLeader) Step(r *RaftEngine) { r.recordRegionChange(newRegion) fromPeerID := t.fromPeer.GetId() toPeerID := t.peer.GetId() - _, ok := r.schedulerStats.taskStats.transferLeader[fromPeerID] - if ok { - r.schedulerStats.taskStats.transferLeader[fromPeerID][toPeerID]++ - } else { - m := make(map[uint64]int) - m[toPeerID]++ - r.schedulerStats.taskStats.transferLeader[fromPeerID] = m - } + r.schedulerStats.taskStats.incTransferLeader(fromPeerID, toPeerID) } func (t *transferLeader) RegionID() uint64 { @@ -248,7 +241,7 @@ func (a *addPeer) Step(r *RaftEngine) { if !processSnapshot(sendNode, a.sendingStat, snapshotSize) { return } - r.schedulerStats.snapshotStats.send[sendNode.Id]++ + r.schedulerStats.snapshotStats.incSendSnapshot(sendNode.Id) recvNode := r.conn.Nodes[a.peer.GetStoreId()] if recvNode == nil { @@ -259,17 +252,17 @@ func (a *addPeer) Step(r *RaftEngine) { if !processSnapshot(recvNode, a.receivingStat, snapshotSize) { return } - r.schedulerStats.snapshotStats.receive[recvNode.Id]++ + r.schedulerStats.snapshotStats.incReceiveSnapshot(recvNode.Id) a.size -= a.speed if a.size < 0 { var opts []core.RegionCreateOption if region.GetPeer(a.peer.GetId()) == nil { opts = append(opts, core.WithAddPeer(a.peer)) - r.schedulerStats.taskStats.addPeer[region.GetID()]++ + r.schedulerStats.taskStats.incAddPeer(region.GetID()) } else { opts = append(opts, core.WithPromoteLearner(a.peer.GetId())) - r.schedulerStats.taskStats.promoteLeaner[region.GetID()]++ + r.schedulerStats.taskStats.incPromoteLeaner(region.GetID()) } opts = append(opts, core.WithIncConfVer()) newRegion := region.Clone(opts...) @@ -333,7 +326,7 @@ func (a *removePeer) Step(r *RaftEngine) { ) r.SetRegion(newRegion) r.recordRegionChange(newRegion) - r.schedulerStats.taskStats.removePeer[region.GetID()]++ + r.schedulerStats.taskStats.incRemovePeer(region.GetID()) if r.conn.Nodes[storeID] == nil { a.finished = true return @@ -387,7 +380,7 @@ func (a *addLearner) Step(r *RaftEngine) { ) r.SetRegion(newRegion) r.recordRegionChange(newRegion) - r.schedulerStats.taskStats.addLearner[region.GetID()]++ + r.schedulerStats.taskStats.incAddLeaner(region.GetID()) } a.finished = true } From 3da0220ab6000dab4bf350ba585980dd891cb1d9 Mon Sep 17 00:00:00 2001 From: rleungx Date: Fri, 25 Jan 2019 13:21:27 +0800 Subject: [PATCH 2/3] start heartbeat randomly Signed-off-by: rleungx --- tools/pd-simulator/simulator/node.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 2f8d9f2e824..8ad4330f16e 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -16,6 +16,7 @@ package simulator import ( "context" "fmt" + "math/rand" "sync" "time" @@ -79,6 +80,7 @@ func NewNode(s *cases.Store, pdAddr string, ioRate int64) (*Node, error) { tasks: make(map[uint64]Task), receiveRegionHeartbeatCh: receiveRegionHeartbeatCh, ioRate: ioRate * cases.MB, + tick: uint64(rand.Intn(10)), }, nil } From 4ae77b19be8b569dd441bcfcf5853a8b4869a9f3 Mon Sep 17 00:00:00 2001 From: rleungx Date: Tue, 19 Feb 2019 11:00:57 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: rleungx --- tools/pd-simulator/simulator/drive.go | 2 +- tools/pd-simulator/simulator/node.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index ebb3c8656fd..2fefbc6e17c 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -109,7 +109,7 @@ func (d *Driver) Tick() { for _, n := range d.conn.Nodes { n.reportRegionChange() d.wg.Add(1) - go n.Tick(d) + go n.Tick(&d.wg) } d.wg.Wait() } diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 8ad4330f16e..dabc140383b 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -80,7 +80,7 @@ func NewNode(s *cases.Store, pdAddr string, ioRate int64) (*Node, error) { tasks: make(map[uint64]Task), receiveRegionHeartbeatCh: receiveRegionHeartbeatCh, ioRate: ioRate * cases.MB, - tick: uint64(rand.Intn(10)), + tick: uint64(rand.Intn(storeHeartBeatPeriod)), }, nil } @@ -114,8 +114,8 @@ func (n *Node) receiveRegionHeartbeat() { } // Tick steps node status change. -func (n *Node) Tick(d *Driver) { - defer d.wg.Done() +func (n *Node) Tick(wg *sync.WaitGroup) { + defer wg.Done() if n.GetState() != metapb.StoreState_Up { return }