From 6962ee36d9399bbc6296afef24a14fd9e7a88b40 Mon Sep 17 00:00:00 2001 From: yacovm Date: Wed, 19 Jul 2017 18:10:28 +0300 Subject: [PATCH] [FAB-5313] Leader election yield if deliver unavailable This commit makes the peer relinquish its leadership if it can't connect to the ordering service. Change-Id: I5fe679d5f23e539828fea4a9398c7dd4d9fd0f93 Signed-off-by: yacovm --- core/deliverservice/deliveryclient.go | 17 ++- core/deliverservice/deliveryclient_test.go | 14 +- core/peer/peer_test.go | 2 +- core/scc/cscc/configure_test.go | 2 +- gossip/election/election.go | 55 ++++++++ gossip/election/election_test.go | 64 +++++++++ gossip/service/gossip_service.go | 8 +- gossip/service/gossip_service_test.go | 2 +- gossip/service/integration_test.go | 146 +++++++++++++++++++++ 9 files changed, 295 insertions(+), 15 deletions(-) create mode 100644 gossip/service/integration_test.go diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 82031849a0f..0f5fb4678e5 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -34,12 +34,20 @@ var ( reConnectBackoffThreshold = float64(time.Hour) ) +// SetReconnectTotalTimeThreshold sets the total time the delivery service +// may spend in reconnection attempts until its retry logic gives up +// and returns an error +func SetReconnectTotalTimeThreshold(duration time.Duration) { + reConnectTotalTimeThreshold = duration +} + // DeliverService used to communicate with orderers to obtain // new blocks and send them to the committer service type DeliverService interface { // StartDeliverForChannel dynamically starts delivery of new blocks from ordering service // to channel peers. - StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error + // When the delivery finishes, the finalizer func is called + StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error // StopDeliverForChannel dynamically stops delivery of new blocks from ordering service // to channel peers. @@ -117,7 +125,7 @@ func (d *deliverServiceImpl) validateConfiguration() error { // initializes the grpc stream for given chainID, creates blocks provider instance // that spawns in go routine to read new blocks starting from the position provided by ledger // info instance. -func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { +func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error { d.lock.Lock() defer d.lock.Unlock() if d.stopping { @@ -133,7 +141,10 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b client := d.newClient(chainID, ledgerInfo) logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID) d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc) - go d.blockProviders[chainID].DeliverBlocks() + go func() { + d.blockProviders[chainID].DeliverBlocks() + finalizer() + }() } return nil } diff --git a/core/deliverservice/deliveryclient_test.go b/core/deliverservice/deliveryclient_test.go index 198384ce477..3d6f306a3df 100644 --- a/core/deliverservice/deliveryclient_test.go +++ b/core/deliverservice/deliveryclient_test.go @@ -113,10 +113,10 @@ func TestNewDeliverService(t *testing.T) { ConnFactory: connFactory, }) assert.NoError(t, err) - assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0})) + assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {})) // Lets start deliver twice - assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "can't start delivery") + assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "can't start delivery") // Lets stop deliver that not started assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID2"), "can't stop delivery") @@ -130,7 +130,7 @@ func TestNewDeliverService(t *testing.T) { assert.Equal(t, 0, connNumber) assertBlockDissemination(0, gossipServiceAdapter.GossipBlockDisseminations, t) assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.AddPayloadsCnt)) - assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "Delivery service is stopping") + assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "Delivery service is stopping") assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID"), "Delivery service is stopping") } @@ -157,7 +157,7 @@ func TestDeliverServiceRestart(t *testing.T) { li := &mocks.MockLedgerInfo{Height: uint64(100)} os.SetNextExpectedSeek(uint64(100)) - err = service.StartDeliverForChannel("TEST_CHAINID", li) + err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {}) assert.NoError(t, err, "can't start delivery") // Check that delivery client requests blocks in order go os.SendBlock(uint64(100)) @@ -203,7 +203,7 @@ func TestDeliverServiceFailover(t *testing.T) { os1.SetNextExpectedSeek(uint64(100)) os2.SetNextExpectedSeek(uint64(100)) - err = service.StartDeliverForChannel("TEST_CHAINID", li) + err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {}) assert.NoError(t, err, "can't start delivery") // We need to discover to which instance the client connected to go os1.SendBlock(uint64(100)) @@ -278,7 +278,7 @@ func TestDeliverServiceServiceUnavailable(t *testing.T) { os1.SetNextExpectedSeek(li.Height) os2.SetNextExpectedSeek(li.Height) - err = service.StartDeliverForChannel("TEST_CHAINID", li) + err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {}) assert.NoError(t, err, "can't start delivery") waitForConnectionToSomeOSN := func() (*mocks.Orderer, *mocks.Orderer) { @@ -367,7 +367,7 @@ func TestDeliverServiceShutdown(t *testing.T) { li := &mocks.MockLedgerInfo{Height: uint64(100)} os.SetNextExpectedSeek(uint64(100)) - err = service.StartDeliverForChannel("TEST_CHAINID", li) + err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {}) assert.NoError(t, err, "can't start delivery") // Check that delivery service requests blocks in order diff --git a/core/peer/peer_test.go b/core/peer/peer_test.go index 794bcea745e..23ab6a0cc88 100644 --- a/core/peer/peer_test.go +++ b/core/peer/peer_test.go @@ -48,7 +48,7 @@ type mockDeliveryClient struct { // StartDeliverForChannel dynamically starts delivery of new blocks from ordering service // to channel peers. -func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { +func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error { return nil } diff --git a/core/scc/cscc/configure_test.go b/core/scc/cscc/configure_test.go index 5db75950edc..9fd9dfff0ab 100644 --- a/core/scc/cscc/configure_test.go +++ b/core/scc/cscc/configure_test.go @@ -55,7 +55,7 @@ type mockDeliveryClient struct { // StartDeliverForChannel dynamically starts delivery of new blocks from ordering service // to channel peers. -func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { +func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error { return nil } diff --git a/gossip/election/election.go b/gossip/election/election.go index 8979ecea648..7a2b937c6f1 100644 --- a/gossip/election/election.go +++ b/gossip/election/election.go @@ -93,6 +93,10 @@ type LeaderElectionService interface { // Stop stops the LeaderElectionService Stop() + + // Yield relinquishes the leadership until a new leader is elected, + // or a timeout expires + Yield() } type peerID []byte @@ -150,10 +154,12 @@ type leaderElectionSvcImpl struct { isLeader int32 toDie int32 leaderExists int32 + yield int32 sleeping bool adapter LeaderElectionAdapter logger *logging.Logger callback leadershipCallback + yieldTimer *time.Timer } func (le *leaderElectionSvcImpl) start() { @@ -239,6 +245,11 @@ func (le *leaderElectionSvcImpl) run() { if !le.isLeaderExists() { le.leaderElection() } + // If we are yielding and some leader has been elected, + // stop yielding + if le.isLeaderExists() && le.isYielding() { + le.stopYielding() + } if le.shouldStop() { return } @@ -253,7 +264,14 @@ func (le *leaderElectionSvcImpl) run() { func (le *leaderElectionSvcImpl) leaderElection() { le.logger.Debug(le.id, ": Entering") defer le.logger.Debug(le.id, ": Exiting") + // If we're yielding to other peers, do not participate + // in leader election + if le.isYielding() { + return + } + // Propose ourselves as a leader le.propose() + // Collect other proposals le.waitForInterrupt(getLeaderElectionDuration()) // If someone declared itself as a leader, give up // on trying to become a leader too @@ -261,6 +279,11 @@ func (le *leaderElectionSvcImpl) leaderElection() { le.logger.Debug(le.id, ": Some peer is already a leader") return } + + if le.isYielding() { + le.logger.Debug(le.id, ": Aborting leader election because yielding") + return + } // Leader doesn't exist, let's see if there is a better candidate than us // for being a leader for _, o := range le.proposals.ToArray() { @@ -364,6 +387,38 @@ func (le *leaderElectionSvcImpl) shouldStop() bool { return atomic.LoadInt32(&le.toDie) == int32(1) } +func (le *leaderElectionSvcImpl) isYielding() bool { + return atomic.LoadInt32(&le.yield) == int32(1) +} + +func (le *leaderElectionSvcImpl) stopYielding() { + le.logger.Debug("Stopped yielding") + le.Lock() + defer le.Unlock() + atomic.StoreInt32(&le.yield, int32(0)) + le.yieldTimer.Stop() +} + +// Yield relinquishes the leadership until a new leader is elected, +// or a timeout expires +func (le *leaderElectionSvcImpl) Yield() { + le.Lock() + defer le.Unlock() + if !le.IsLeader() || le.isYielding() { + return + } + // Turn on the yield flag + atomic.StoreInt32(&le.yield, int32(1)) + // Stop being a leader + le.stopBeingLeader() + // Clear the leader exists flag since it could be that we are the leader + atomic.StoreInt32(&le.leaderExists, int32(0)) + // Clear the yield flag in any case afterwards + le.yieldTimer = time.AfterFunc(getLeaderAliveThreshold()*6, func() { + atomic.StoreInt32(&le.yield, int32(0)) + }) +} + // Stop stops the LeaderElectionService func (le *leaderElectionSvcImpl) Stop() { le.logger.Debug(le.id, ": Entering") diff --git a/gossip/election/election_test.go b/gossip/election/election_test.go index 612a8c739ab..ec4c3369edd 100644 --- a/gossip/election/election_test.go +++ b/gossip/election/election_test.go @@ -308,6 +308,70 @@ func TestLeadershipTakeover(t *testing.T) { assert.Equal(t, "p2", leaders[0]) } +func TestYield(t *testing.T) { + t.Parallel() + // Scenario: Peers spawn and a leader is elected. + // After a while, the leader yields. + // (Call yield twice to ensure only one callback is called) + // Expected outcome: + // (1) A new leader is elected + // (2) The old leader doesn't take back its leadership + peers := createPeers(0, 0, 1, 2, 3, 4, 5) + leaders := waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p0", leaders[0]) + peers[0].Yield() + // Ensure the callback was called with 'false' + assert.True(t, peers[0].isCallbackInvoked()) + assert.False(t, peers[0].isLeaderFromCallback()) + // Clear the callback invoked flag + peers[0].lock.Lock() + peers[0].callbackInvoked = false + peers[0].lock.Unlock() + // Yield again and ensure it isn't called again + peers[0].Yield() + assert.False(t, peers[0].isCallbackInvoked()) + + ensureP0isNotAleader := func() bool { + leaders := waitForLeaderElection(t, peers) + return len(leaders) == 1 && leaders[0] != "p0" + } + // A new leader is elected, and it is not p0 + waitForBoolFunc(t, ensureP0isNotAleader, true) + time.Sleep(getLeaderAliveThreshold() * 2) + // After a while, p0 doesn't restore its leadership status + waitForBoolFunc(t, ensureP0isNotAleader, true) +} + +func TestYieldSinglePeer(t *testing.T) { + t.Parallel() + // Scenario: spawn a single peer and have it yield. + // Ensure it recovers its leadership after a while. + peers := createPeers(0, 0) + waitForLeaderElection(t, peers) + peers[0].Yield() + assert.False(t, peers[0].IsLeader()) + waitForLeaderElection(t, peers) +} + +func TestYieldAllPeers(t *testing.T) { + t.Parallel() + // Scenario: spawn 2 peers and have them all yield after regaining leadership. + // Ensure the first peer is the leader in the end after both peers yield + peers := createPeers(0, 0, 1) + leaders := waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p0", leaders[0]) + peers[0].Yield() + leaders = waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p1", leaders[0]) + peers[1].Yield() + leaders = waitForLeaderElection(t, peers) + assert.Len(t, leaders, 1, "Only 1 leader should have been elected") + assert.Equal(t, "p0", leaders[0]) +} + func TestPartition(t *testing.T) { t.Parallel() // Scenario: peers spawn together, and then after a while a network partition occurs diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 8a81de7ec7e..e3bf1e5bd19 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -196,7 +196,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer)) } else if isStaticOrgLeader { logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID) - g.deliveryService.StartDeliverForChannel(chainID, committer) + g.deliveryService.StartDeliverForChannel(chainID, committer, func() {}) } else { logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID) } @@ -282,8 +282,12 @@ func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool { func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) { return func(isLeader bool) { if isLeader { + yield := func() { + le := g.leaderElection[chainID] + le.Yield() + } logger.Info("Elected as a leader, starting delivery service for channel", chainID) - if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil { + if err := g.deliveryService.StartDeliverForChannel(chainID, committer, yield); err != nil { logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err) } } else { diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index f58ad561a59..ef358709f36 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -276,7 +276,7 @@ type mockDeliverService struct { running map[string]bool } -func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error { +func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error { ds.running[chainID] = true return nil } diff --git a/gossip/service/integration_test.go b/gossip/service/integration_test.go new file mode 100644 index 00000000000..b0a5b6c058d --- /dev/null +++ b/gossip/service/integration_test.go @@ -0,0 +1,146 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package service + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/hyperledger/fabric/core/deliverservice" + "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/election" + "github.com/hyperledger/fabric/gossip/identity" + "github.com/hyperledger/fabric/gossip/state" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" +) + +type embeddingDeliveryService struct { + deliverclient.DeliverService + startSignal sync.WaitGroup + stopSignal sync.WaitGroup +} + +func newEmbeddingDeliveryService(ds deliverclient.DeliverService) *embeddingDeliveryService { + eds := &embeddingDeliveryService{ + DeliverService: ds, + } + eds.startSignal.Add(1) + eds.stopSignal.Add(1) + return eds +} + +func (eds *embeddingDeliveryService) waitForDeliveryServiceActivation() { + eds.startSignal.Wait() +} + +func (eds *embeddingDeliveryService) waitForDeliveryServiceTermination() { + eds.stopSignal.Wait() +} + +func (eds *embeddingDeliveryService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error { + eds.startSignal.Done() + return eds.DeliverService.StartDeliverForChannel(chainID, ledgerInfo, finalizer) +} + +func (eds *embeddingDeliveryService) StopDeliverForChannel(chainID string) error { + eds.stopSignal.Done() + return eds.DeliverService.StopDeliverForChannel(chainID) +} + +func (eds *embeddingDeliveryService) Stop() { + eds.DeliverService.Stop() +} + +type embeddingDeliveryServiceFactory struct { + DeliveryServiceFactory +} + +func (edsf *embeddingDeliveryServiceFactory) Service(g GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) { + ds, _ := edsf.DeliveryServiceFactory.Service(g, endpoints, mcs) + return newEmbeddingDeliveryService(ds), nil +} + +func TestLeaderYield(t *testing.T) { + // Scenario: Spawn 2 peers and wait for the first one to be the leader + // There isn't any orderer present so the leader peer won't be able to + // connect to the orderer, and should relinquish its leadership after a while. + // Make sure the other peer declares itself as the leader soon after. + deliverclient.SetReconnectTotalTimeThreshold(time.Second * 5) + viper.Set("peer.gossip.useLeaderElection", true) + viper.Set("peer.gossip.orgLeader", false) + n := 2 + portPrefix := 30000 + gossips := startPeers(t, n, portPrefix) + defer stopPeers(gossips) + channelName := "channelA" + peerIndexes := []int{0, 1} + // Add peers to the channel + addPeersToChannel(t, n, portPrefix, channelName, gossips, peerIndexes) + // Prime the membership view of the peers + waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2) + mcs := &naiveCryptoService{} + // Helper function that creates a gossipService instance + newGossipService := func(i int) *gossipServiceImpl { + peerIdentity := api.PeerIdentityType(fmt.Sprintf("localhost:%d", portPrefix+i)) + gs := &gossipServiceImpl{ + mcs: mcs, + gossipSvc: gossips[i], + chains: make(map[string]state.GossipStateProvider), + leaderElection: make(map[string]election.LeaderElectionService), + deliveryFactory: &embeddingDeliveryServiceFactory{&deliveryFactoryImpl{}}, + idMapper: identity.NewIdentityMapper(mcs, peerIdentity), + peerIdentity: peerIdentity, + secAdv: &secAdvMock{}, + } + gossipServiceInstance = gs + gs.InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:7050"}) + return gs + } + + p0 := newGossipService(0) + p1 := newGossipService(1) + + // Returns index of the leader or -1 if no leader elected + getLeader := func() int { + if p0.leaderElection[channelName].IsLeader() { + // Ensure p1 isn't a leader at the same time + assert.False(t, p1.leaderElection[channelName].IsLeader()) + return 0 + } + if p1.leaderElection[channelName].IsLeader() { + return 1 + } + return -1 + } + + ds0 := p0.deliveryService.(*embeddingDeliveryService) + ds1 := p1.deliveryService.(*embeddingDeliveryService) + + // Wait for p0 to connect to the ordering service + ds0.waitForDeliveryServiceActivation() + t.Log("p0 started its delivery service") + // Ensure it's a leader + assert.Equal(t, 0, getLeader()) + // Wait for p0 to lose its leadership + ds0.waitForDeliveryServiceTermination() + t.Log("p0 stopped its delivery service") + // Ensure there is no leader + assert.Equal(t, -1, getLeader()) + // Wait for p1 to take over + ds1.waitForDeliveryServiceActivation() + t.Log("p1 started its delivery service") + // Ensure it's a leader now + assert.Equal(t, 1, getLeader()) + p0.chains[channelName].Stop() + p1.chains[channelName].Stop() + p0.deliveryService.Stop() + p1.deliveryService.Stop() +}