From 457bb90a46d88905ba721136229f739e75741470 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Sat, 14 Jan 2017 13:40:06 +0200 Subject: [PATCH] FAB-1166 Gossip leader election Gossip leader election module This commit adds a leader election module which is an autonomous logical unit. It will be connected into the gossip layer via an implementation of LeaderElectionAdapter Which will be implemented after this commit. Change log: 1) Initial commit 2) Rebase 3) Changed comment in test 4) Rephrased a log message 5) Changed comments in algorithm outline Signed-off-by: Yacov Manevich Change-Id: I36f523b4a9fa358b6e4254d04fd1f47569246600 --- gossip/election/election.go | 356 ++++++++++++++++++++++++++++++- gossip/election/election_test.go | 305 ++++++++++++++++++++++++++ 2 files changed, 651 insertions(+), 10 deletions(-) create mode 100644 gossip/election/election_test.go diff --git a/gossip/election/election.go b/gossip/election/election.go index 5210a73f443..9111ceef2fd 100644 --- a/gossip/election/election.go +++ b/gossip/election/election.go @@ -17,29 +17,365 @@ limitations under the License. package election import ( - "github.com/hyperledger/fabric/gossip/common" - "github.com/hyperledger/fabric/gossip/proto" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/hyperledger/fabric/gossip/util" + "github.com/op/go-logging" +) + +var ( + startupGracePeriod = time.Second * 15 + membershipSampleInterval = time.Second + leaderAliveThreshold = time.Second * 10 + leadershipDeclarationInterval = leaderAliveThreshold / 2 + leaderElectionDuration = time.Second * 5 ) +// Gossip leader election module +// Algorithm properties: +// - Peers break symmetry by comparing IDs +// - Each peer is either a leader or a follower, +// and the aim is to have exactly 1 leader if the membership view +// is the same for all peers +// - If the network is partitioned into 2 or more sets, the number of leaders +// is the number of network partitions, but when the partition heals, +// only 1 leader should be left eventually +// - Peers communicate by gossiping leadership proposal or declaration messages + + +// The Algorithm, in pseudo code: +// +// +// variables: +// leaderKnown = false +// +// Invariant: +// Peer listens for messages from remote peers +// and whenever it receives a leadership declaration, +// leaderKnown is set to true +// +// Startup(): +// wait for membership view to stabilize, or for a leadership declaration is received +// or the startup timeout expires. +// goto SteadyState() +// +// SteadyState(): +// while true: +// If leaderKnown is false: +// LeaderElection() +// If you are the leader: +// Broadcast leadership declaration +// If a leadership declaration was received from +// a peer with a lower ID, +// become a follower +// Else, you're a follower: +// If haven't received a leadership declaration within +// a time threshold: +// set leaderKnown to false +// +// LeaderElection(): +// Gossip leadership proposal message +// Collect messages from other peers sent within a time period +// If received a leadership declaration: +// return +// Iterate over all proposal messages collected. +// If a proposal message from a peer with an ID lower +// than yourself was received, return. +// Else, declare yourself a leader + + + // LeaderElectionAdapter is used by the leader election module -// to send and receive messages, as well as notify a leader change +// to send and receive messages and to get membership information type LeaderElectionAdapter interface { // Gossip gossips a message to other peers - Gossip(msg *proto.GossipMessage) + Gossip(Msg) - // Accept returns a channel that emits messages that fit - // the given predicate - Accept(common.MessageAcceptor) <-chan *proto.GossipMessage + // Accept returns a channel that emits messages + Accept() <-chan Msg + + // CreateProposalMessage + CreateMessage(isDeclaration bool) Msg + + // Peers returns a list of peers considered alive + Peers() []Peer } // LeaderElectionService is the object that runs the leader election algorithm type LeaderElectionService interface { // IsLeader returns whether this peer is a leader or not IsLeader() bool + + // Stop stops the LeaderElectionService + Stop() +} + +// Peer describes a remote peer +type Peer interface { + // ID returns the ID of the peer + ID() string +} + +// Msg describes a message sent from a remote peer +type Msg interface { + // SenderID returns the ID of the peer sent the message + SenderID() string + // IsProposal returns whether this message is a leadership proposal + IsProposal() bool + // IsDeclaration returns whether this message is a leadership declaration + IsDeclaration() bool +} + +// NewLeaderElectionService returns a new LeaderElectionService +func NewLeaderElectionService(adapter LeaderElectionAdapter, id string) LeaderElectionService { + if len(id) == 0 { + panic(fmt.Errorf("Empty id")) + } + le := &leaderElectionSvcImpl{ + id: id, + proposals: util.NewSet(), + adapter: adapter, + stopChan: make(chan struct{}, 1), + interruptChan: make(chan struct{}, 1), + logger: logging.MustGetLogger("LeaderElection"), + } + // TODO: This will be configured using the core.yaml when FAB-1217 (Integrate peer logging with gossip logging) is done + logging.SetLevel(logging.WARNING, "LeaderElection") + go le.start() + return le +} + +// leaderElectionSvcImpl is an implementation of a LeaderElectionService +type leaderElectionSvcImpl struct { + id string + proposals *util.Set + sync.Mutex + stopChan chan struct{} + interruptChan chan struct{} + stopWG sync.WaitGroup + isLeader int32 + toDie int32 + leaderExists int32 + sleeping bool + adapter LeaderElectionAdapter + logger *logging.Logger +} + +func (le *leaderElectionSvcImpl) start() { + le.stopWG.Add(2) + go le.handleMessages() + le.waitForMembershipStabilization(startupGracePeriod) + go le.run() +} + +func (le *leaderElectionSvcImpl) handleMessages() { + le.logger.Info(le.id, ": Entering") + defer le.logger.Info(le.id, ": Exiting") + defer le.stopWG.Done() + msgChan := le.adapter.Accept() + for { + select { + case <-le.stopChan: + le.stopChan <- struct{}{} + return + case msg := <-msgChan: + if !le.isAlive(msg.SenderID()) { + le.logger.Debug(le.id, ": Got message from", msg.SenderID(), "but it is not in the view") + break + } + le.handleMessage(msg) + } + } +} + +func (le *leaderElectionSvcImpl) handleMessage(msg Msg) { + msgType := "proposal" + if msg.IsDeclaration() { + msgType = "declaration" + } + le.logger.Debug(le.id, ":", msg.SenderID(), "sent us", msgType) + le.Lock() + defer le.Unlock() + + if msg.IsProposal() { + le.proposals.Add(msg.SenderID()) + } else if msg.IsDeclaration() { + atomic.StoreInt32(&le.leaderExists, int32(1)) + if le.sleeping && len(le.interruptChan) == 0 { + le.interruptChan <- struct{}{} + } + if msg.SenderID() < le.id && le.IsLeader() { + le.stopBeingLeader() + } + } else { + // We shouldn't get here + le.logger.Error("Got a message that's not a proposal and not a declaration") + } +} + +// waitForInterrupt sleeps until the interrupt channel is triggered +// or given timeout expires +func (le *leaderElectionSvcImpl) waitForInterrupt(timeout time.Duration) { + le.logger.Debug(le.id, ": Entering") + defer le.logger.Debug(le.id, ": Exiting") + le.Lock() + le.sleeping = true + le.Unlock() + + select { + case <-le.interruptChan: + case <-le.stopChan: + le.stopChan <- struct{}{} + case <-time.After(timeout): + } + + le.Lock() + le.sleeping = false + // We drain the interrupt channel + // because we might get 2 leadership declarations messages + // while sleeping, but we would only read 1 of them in the select block above + le.drainInterruptChannel() + le.Unlock() +} + +func (le *leaderElectionSvcImpl) run() { + defer le.stopWG.Done() + for !le.shouldStop() { + if !le.isLeaderExists() { + le.leaderElection() + } + if le.shouldStop() { + return + } + if le.IsLeader() { + le.leader() + } else { + le.follower() + } + } +} + +func (le *leaderElectionSvcImpl) leaderElection() { + le.logger.Info(le.id, ": Entering") + defer le.logger.Info(le.id, ": Exiting") + le.propose() + le.waitForInterrupt(leaderElectionDuration) + // If someone declared itself as a leader, give up + // on trying to become a leader too + if le.isLeaderExists() { + le.logger.Info(le.id, ": Some peer is already a leader") + return + } + // Leader doesn't exist, let's see if there is a better candidate than us + // for being a leader + for _, o := range le.proposals.ToArray() { + id := o.(string) + if id < le.id { + return + } + } + // If we got here, there is no one that proposed being a leader + // that's a better candidate than us. + le.beLeader() + atomic.StoreInt32(&le.leaderExists, int32(1)) +} + +// propose sends a leadership proposal message to remote peers +func (le *leaderElectionSvcImpl) propose() { + le.logger.Info(le.id, ": Entering") + le.logger.Info(le.id, ": Exiting") + leadershipProposal := le.adapter.CreateMessage(false) + le.adapter.Gossip(leadershipProposal) +} + +func (le *leaderElectionSvcImpl) follower() { + le.logger.Debug(le.id, ": Entering") + defer le.logger.Debug(le.id, ": Exiting") + + le.proposals.Clear() + atomic.StoreInt32(&le.leaderExists, int32(0)) + select { + case <-time.After(leaderAliveThreshold): + case <-le.stopChan: + le.stopChan <- struct{}{} + } +} + +func (le *leaderElectionSvcImpl) leader() { + leaderDeclaration := le.adapter.CreateMessage(true) + le.adapter.Gossip(leaderDeclaration) + le.waitForInterrupt(leadershipDeclarationInterval) +} + +// waitForMembershipStabilization waits for membership view to stabilize +// or until a time limit expires, or until a peer declares itself as a leader +func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.Duration) { + le.logger.Info(le.id, ": Entering") + defer le.logger.Info(le.id, ": Exiting") + endTime := time.Now().Add(timeLimit) + viewSize := len(le.adapter.Peers()) + for !le.shouldStop() { + time.Sleep(membershipSampleInterval) + newSize := len(le.adapter.Peers()) + if newSize == viewSize || time.Now().After(endTime) || le.isLeaderExists() { + return + } + viewSize = newSize + } +} + +// drainInterruptChannel clears the interruptChannel +// if needed +func (le *leaderElectionSvcImpl) drainInterruptChannel() { + if len(le.interruptChan) == 1 { + <-le.interruptChan + } +} + +// isAlive returns whether peer of given id is considered alive +func (le *leaderElectionSvcImpl) isAlive(id string) bool { + for _, p := range le.adapter.Peers() { + if p.ID() == id { + return true + } + } + return false +} + +func (le *leaderElectionSvcImpl) isLeaderExists() bool { + return atomic.LoadInt32(&le.leaderExists) == int32(1) +} + +// IsLeader returns whether this peer is a leader +func (le *leaderElectionSvcImpl) IsLeader() bool { + isLeader := atomic.LoadInt32(&le.isLeader) == int32(1) + le.logger.Debug(le.id, ": Returning", isLeader) + return isLeader +} + +func (le *leaderElectionSvcImpl) beLeader() { + le.logger.Info(le.id, ": Becoming a leader") + atomic.StoreInt32(&le.isLeader, int32(1)) +} + +func (le *leaderElectionSvcImpl) stopBeingLeader() { + le.logger.Info(le.id, "Stopped being a leader") + atomic.StoreInt32(&le.isLeader, int32(0)) +} + +func (le *leaderElectionSvcImpl) shouldStop() bool { + return atomic.LoadInt32(&le.toDie) == int32(1) } -// LeaderElectionService is the implementation of LeaderElectionService -type leaderElectionServiceImpl struct { - adapter LeaderElectionAdapter +// Stop stops the LeaderElectionService +func (le *leaderElectionSvcImpl) Stop() { + le.logger.Info(le.id, ": Entering") + defer le.logger.Info(le.id, ": Exiting") + atomic.StoreInt32(&le.toDie, int32(1)) + le.stopChan <- struct{}{} + le.stopWG.Wait() } diff --git a/gossip/election/election_test.go b/gossip/election/election_test.go new file mode 100644 index 00000000000..afc164b237b --- /dev/null +++ b/gossip/election/election_test.go @@ -0,0 +1,305 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package election + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +const ( + testTimeout = 5 * time.Second + testPollInterval = time.Millisecond * 300 +) + +func init() { + startupGracePeriod = time.Millisecond * 500 + membershipSampleInterval = time.Millisecond * 100 + leaderAliveThreshold = time.Millisecond * 500 + leadershipDeclarationInterval = leaderAliveThreshold / 2 + leaderElectionDuration = time.Millisecond * 500 +} + +type msg struct { + sender string + proposal bool +} + +func (m *msg) SenderID() string { + return m.sender +} + +func (m *msg) IsProposal() bool { + return m.proposal +} + +func (m *msg) IsDeclaration() bool { + return !m.proposal +} + +type peer struct { + mockedMethods map[string]struct{} + mock.Mock + id string + peers map[string]*peer + sharedLock *sync.RWMutex + msgChan chan Msg + LeaderElectionService +} + +func (p *peer) On(methodName string, arguments ...interface{}) *mock.Call { + p.sharedLock.Lock() + defer p.sharedLock.Unlock() + p.mockedMethods[methodName] = struct{}{} + return p.Mock.On(methodName, arguments...) +} + +func (p *peer) ID() string { + return p.id +} + +func (p *peer) Gossip(m Msg) { + p.sharedLock.RLock() + defer p.sharedLock.RUnlock() + + if _, isMocked := p.mockedMethods["Gossip"]; isMocked { + p.Called(m) + return + } + + for _, peer := range p.peers { + if peer.id == p.id { + continue + } + peer.msgChan <- m.(*msg) + } +} + +func (p *peer) Accept() <-chan Msg { + p.sharedLock.RLock() + defer p.sharedLock.RUnlock() + + if _, isMocked := p.mockedMethods["Accept"]; isMocked { + args := p.Called() + return args.Get(0).(<-chan Msg) + } + return (<-chan Msg)(p.msgChan) +} + +func (p *peer) CreateMessage(isDeclaration bool) Msg { + return &msg{proposal: !isDeclaration, sender: p.id} +} + +func (p *peer) Peers() []Peer { + p.sharedLock.RLock() + defer p.sharedLock.RUnlock() + + if _, isMocked := p.mockedMethods["Peers"]; isMocked { + args := p.Called() + return args.Get(0).([]Peer) + } + + var peers []Peer + for id := range p.peers { + peers = append(peers, &peer{id: id}) + } + return peers +} + +func createPeers(spawnInterval time.Duration, ids ...int) []*peer { + peers := make([]*peer, len(ids)) + peerMap := make(map[string]*peer) + l := &sync.RWMutex{} + for i, id := range ids { + p := createPeer(id, peerMap, l) + if spawnInterval != 0 { + time.Sleep(spawnInterval) + } + peers[i] = p + } + return peers +} + +func createPeer(id int, peerMap map[string]*peer, l *sync.RWMutex) *peer { + idStr := fmt.Sprintf("p%d", id) + c := make(chan Msg, 100) + p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{})} + p.LeaderElectionService = NewLeaderElectionService(p, idStr) + l.Lock() + peerMap[idStr] = p + l.Unlock() + return p + +} + +func waitForLeaderElection(t *testing.T, peers []*peer) []string { + end := time.Now().Add(testTimeout) + for time.Now().Before(end) { + var leaders []string + for _, p := range peers { + if p.IsLeader() { + leaders = append(leaders, p.id) + } + } + if len(leaders) > 0 { + return leaders + } + time.Sleep(testPollInterval) + } + t.Fatal("No leader detected") + return nil +} + +func TestInitPeersAtSameTime(t *testing.T) { + t.Parallel() + // Scenario: Peers are spawned at the same time + // expected outcome: the peer that has the lowest ID is the leader + peers := createPeers(0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0) + time.Sleep(startupGracePeriod + leaderElectionDuration) + leaders := waitForLeaderElection(t, peers) + isP0leader := peers[len(peers)-1].IsLeader() + assert.True(t, isP0leader, "p0 isn't a leader. Leaders are: %v", leaders) + assert.Len(t, leaders, 1, "More than 1 leader elected") +} + +func TestInitPeersStartAtIntervals(t *testing.T) { + t.Parallel() + // Scenario: Peers are spawned one by one in a slow rate + // expected outcome: the first peer is the leader although its ID is lowest + peers := createPeers(startupGracePeriod+leadershipDeclarationInterval, 3, 2, 1, 0) + waitForLeaderElection(t, peers) + assert.True(t, peers[0].IsLeader()) +} + +func TestStop(t *testing.T) { + t.Parallel() + // Scenario: peers are spawned at the same time + // and then are stopped. We count the number of Gossip() invocations they invoke + // after they stop, and it should not increase after they are stopped + peers := createPeers(0, 3, 2, 1, 0) + var gossipCounter int32 + for i, p := range peers { + p.On("Gossip", mock.Anything).Run(func(args mock.Arguments) { + msg := args.Get(0).(Msg) + atomic.AddInt32(&gossipCounter, int32(1)) + for j := range peers { + if i == j { + continue + } + peers[j].msgChan <- msg + } + }) + } + waitForLeaderElection(t, peers) + for _, p := range peers { + p.Stop() + } + time.Sleep(leaderAliveThreshold) + gossipCounterAfterStop := atomic.LoadInt32(&gossipCounter) + time.Sleep(leaderAliveThreshold * 5) + assert.Equal(t, gossipCounterAfterStop, atomic.LoadInt32(&gossipCounter)) +} + +func TestConvergence(t *testing.T) { + // Scenario: 2 peer group converge their views + // expected outcome: only 1 leader is left out of the 2 + // and that leader is the leader with the lowest ID + t.Parallel() + peers1 := createPeers(0, 3, 2, 1, 0) + peers2 := createPeers(0, 4, 5, 6, 7) + leaders1 := waitForLeaderElection(t, peers1) + leaders2 := waitForLeaderElection(t, peers2) + assert.Len(t, leaders1, 1, "Peer group 1 was suppose to have 1 leader exactly") + assert.Len(t, leaders2, 1, "Peer group 2 was suppose to have 1 leader exactly") + combinedPeers := append(peers1, peers2...) + + var allPeerIds []Peer + for _, p := range combinedPeers { + allPeerIds = append(allPeerIds, &peer{id: p.id}) + } + + for i, p := range combinedPeers { + index := i + gossipFunc := func(args mock.Arguments) { + msg := args.Get(0).(Msg) + for j := range combinedPeers { + if index == j { + continue + } + combinedPeers[j].msgChan <- msg + } + } + p.On("Gossip", mock.Anything).Run(gossipFunc) + p.On("Peers").Return(allPeerIds) + } + + time.Sleep(leaderAliveThreshold * 5) + finalLeaders := waitForLeaderElection(t, combinedPeers) + assert.Len(t, finalLeaders, 1, "Combined peer group was suppose to have 1 leader exactly") + assert.Equal(t, leaders1[0], finalLeaders[0], "Combined peer group has different leader than expected:") +} + +func TestLeadershipTakeover(t *testing.T) { + t.Parallel() + // Scenario: Peers spawn one by one in descending order. + // After a while, the leader peer stops. + // expected outcome: the peer that takes over is the peer with lowest ID + peers := createPeers(startupGracePeriod+leadershipDeclarationInterval, 5, 4, 3, 2) + leaders := waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p5", leaders[0]) + peers[0].Stop() + time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*3) + leaders = waitForLeaderElection(t, peers[1:]) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p2", leaders[0]) +} + +func TestPartition(t *testing.T) { + t.Parallel() + // Scenario: peers spawn together, and then after a while a network partition occurs + // and no peer can communicate with another peer + // Expected outcome 1: each peer is a leader + // After this, we heal the partition to be a unified view again + // Expected outcome 2: p0 is the leader once again + peers := createPeers(0, 5, 4, 3, 2, 1, 0) + leaders := waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p0", leaders[0]) + for _, p := range peers { + p.On("Peers").Return([]Peer{}) + p.On("Gossip", mock.Anything) + } + time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2) + leaders = waitForLeaderElection(t, peers) + assert.Len(t, leaders, len(leaders)) + for _, p := range peers { + p.sharedLock.Lock() + p.mockedMethods = make(map[string]struct{}) + p.sharedLock.Unlock() + } + time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2) + leaders = waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p0", leaders[0]) +}