diff --git a/gossip/election/election.go b/gossip/election/election.go index 5cd33280e54..b19437d3412 100644 --- a/gossip/election/election.go +++ b/gossip/election/election.go @@ -18,7 +18,6 @@ package election import ( "bytes" - "fmt" "sync" "sync/atomic" "time" @@ -82,7 +81,6 @@ import ( // LeaderElectionAdapter is used by the leader election module // to send and receive messages and to get membership information type LeaderElectionAdapter interface { - // Gossip gossips a message to other peers Gossip(Msg) @@ -131,7 +129,7 @@ func noopCallback(_ bool) { // NewLeaderElectionService returns a new LeaderElectionService func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback) LeaderElectionService { if len(id) == 0 { - panic(fmt.Errorf("Empty id")) + panic("Empty id") } le := &leaderElectionSvcImpl{ id: peerID(id), @@ -363,13 +361,13 @@ func (le *leaderElectionSvcImpl) IsLeader() bool { func (le *leaderElectionSvcImpl) beLeader() { le.logger.Debug(le.id, ": Becoming a leader") atomic.StoreInt32(&le.isLeader, int32(1)) - le.callback(true) + go le.callback(true) } func (le *leaderElectionSvcImpl) stopBeingLeader() { le.logger.Debug(le.id, "Stopped being a leader") atomic.StoreInt32(&le.isLeader, int32(0)) - le.callback(false) + go le.callback(false) } func (le *leaderElectionSvcImpl) shouldStop() bool { diff --git a/gossip/election/election_test.go b/gossip/election/election_test.go index a960d69906b..56ec66a7cfe 100644 --- a/gossip/election/election_test.go +++ b/gossip/election/election_test.go @@ -62,12 +62,13 @@ func (m *msg) IsDeclaration() bool { type peer struct { mockedMethods map[string]struct{} mock.Mock - id string - peers map[string]*peer - sharedLock *sync.RWMutex - msgChan chan Msg - isLeaderFromCallback bool - callbackInvoked bool + id string + peers map[string]*peer + sharedLock *sync.RWMutex + msgChan chan Msg + leaderFromCallback bool + callbackInvoked bool + lock sync.RWMutex LeaderElectionService } @@ -131,10 +132,24 @@ func (p *peer) Peers() []Peer { } func (p *peer) leaderCallback(isLeader bool) { - p.isLeaderFromCallback = isLeader + p.lock.Lock() + defer p.lock.Unlock() + p.leaderFromCallback = isLeader p.callbackInvoked = true } +func (p *peer) isLeaderFromCallback() bool { + p.lock.RLock() + defer p.lock.RUnlock() + return p.leaderFromCallback +} + +func (p *peer) isCallbackInvoked() bool { + p.lock.RLock() + defer p.lock.RUnlock() + return p.callbackInvoked +} + func createPeers(spawnInterval time.Duration, ids ...int) []*peer { peers := make([]*peer, len(ids)) peerMap := make(map[string]*peer) @@ -152,7 +167,7 @@ func createPeers(spawnInterval time.Duration, ids ...int) []*peer { func createPeer(id int, peerMap map[string]*peer, l *sync.RWMutex) *peer { idStr := fmt.Sprintf("p%d", id) c := make(chan Msg, 100) - p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{}), isLeaderFromCallback: false, callbackInvoked: false} + p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{}), leaderFromCallback: false, callbackInvoked: false} p.LeaderElectionService = NewLeaderElectionService(p, idStr, p.leaderCallback) l.Lock() peerMap[idStr] = p @@ -192,7 +207,7 @@ func TestInitPeersAtSameTime(t *testing.T) { leaders := waitForLeaderElection(t, peers) isP0leader := peers[len(peers)-1].IsLeader() assert.True(t, isP0leader, "p0 isn't a leader. Leaders are: %v", leaders) - assert.True(t, peers[len(peers)-1].isLeaderFromCallback, "p0 didn't got leaderhip change callback invoked") + assert.True(t, peers[len(peers)-1].isLeaderFromCallback(), "p0 didn't got leaderhip change callback invoked") assert.Len(t, leaders, 1, "More than 1 leader elected") } @@ -274,12 +289,12 @@ func TestConvergence(t *testing.T) { for _, p := range combinedPeers { if p.id == finalLeaders[0] { - assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for ", p.id) - assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for ", p.id) + assert.True(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for ", p.id) + assert.True(t, p.isCallbackInvoked(), "Leadership callback wasn't invoked for ", p.id) } else { - assert.False(t, p.isLeaderFromCallback, "Leadership callback result is wrong for ", p.id) + assert.False(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for ", p.id) if p.id == leaders2[0] { - assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for ", p.id) + assert.True(t, p.isCallbackInvoked(), "Leadership callback wasn't invoked for ", p.id) } } } @@ -312,7 +327,7 @@ func TestPartition(t *testing.T) { leaders := waitForLeaderElection(t, peers) assert.Len(t, leaders, 1, "Only 1 leader should have been elected") assert.Equal(t, "p0", leaders[0]) - assert.True(t, peers[len(peers)-1].isLeaderFromCallback, "Leadership callback result is wrong for %s", peers[len(peers)-1].id) + assert.True(t, peers[len(peers)-1].isLeaderFromCallback(), "Leadership callback result is wrong for %s", peers[len(peers)-1].id) for _, p := range peers { p.On("Peers").Return([]Peer{}) @@ -322,7 +337,7 @@ func TestPartition(t *testing.T) { leaders = waitForMultipleLeadersElection(t, peers, 6) assert.Len(t, leaders, 6) for _, p := range peers { - assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id) + assert.True(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for %s", p.id) } for _, p := range peers { @@ -337,10 +352,10 @@ func TestPartition(t *testing.T) { assert.Equal(t, "p0", leaders[0]) for _, p := range peers { if p.id == leaders[0] { - assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id) + assert.True(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for %s", p.id) } else { - assert.False(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id) - assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for %s", p.id) + assert.False(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for %s", p.id) + assert.True(t, p.isCallbackInvoked(), "Leadership callback wasn't invoked for %s", p.id) } } diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index ed153ab8ee5..b4e245f51a5 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -22,6 +22,7 @@ import ( peerComm "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/deliverservice" + "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" "github.com/hyperledger/fabric/gossip/api" gossipCommon "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/election" @@ -186,23 +187,26 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe } } + // Delivery service might be nil only if it was not able to get connected + // to the ordering service if g.deliveryService != nil { + // Parameters: + // - peer.gossip.useLeaderElection + // - peer.gossip.orgLeader + // + // are mutual exclusive, setting both to true is not defined, hence + // peer will panic and terminate leaderElection := viper.GetBool("peer.gossip.useLeaderElection") - staticOrderConnection := viper.GetBool("peer.gossip.orgLeader") + isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader") - if leaderElection && staticOrderConnection { - msg := "Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution" - logger.Panic(msg) - } else if leaderElection { + if leaderElection && isStaticOrgLeader { + logger.Panic("Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution") + } + + if leaderElection { logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID) - connector := &leaderElectionDeliverConnector{ - deliverer: g.deliveryService, - committer: committer, - chainID: chainID, - } - electionService := g.newLeaderElectionComponent(gossipCommon.ChainID(connector.chainID), connector.leadershipStatusChange) - g.leaderElection[chainID] = electionService - } else if staticOrderConnection { + g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer)) + } else if isStaticOrgLeader { logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID) g.deliveryService.StartDeliverForChannel(chainID, committer) } else { @@ -270,9 +274,9 @@ func (g *gossipServiceImpl) Stop() { } } -func (g *gossipServiceImpl) newLeaderElectionComponent(channel gossipCommon.ChainID, callback func(bool)) election.LeaderElectionService { +func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService { PKIid := g.idMapper.GetPKIidOfCert(g.peerIdentity) - adapter := election.NewAdapter(g, PKIid, channel) + adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID)) return election.NewLeaderElectionService(adapter, string(PKIid), callback) } @@ -285,6 +289,22 @@ func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool { return false } +func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) { + return func(isLeader bool) { + if isLeader { + if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil { + logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err) + } + } else { + if err := g.deliveryService.StopDeliverForChannel(chainID); err != nil { + logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err) + } + + } + + } +} + func orgListFromConfig(config Config) []string { var orgList []string for orgName := range config.Organizations() { @@ -324,22 +344,3 @@ func (s *secImpl) VerifyByChannel(chainID gossipCommon.ChainID, peerIdentity api func (s *secImpl) ValidateIdentity(peerIdentity api.PeerIdentityType) error { return nil } - -type leaderElectionDeliverConnector struct { - deliverer deliverclient.DeliverService - chainID string - committer committer.Committer -} - -func (ledc *leaderElectionDeliverConnector) leadershipStatusChange(isLeader bool) { - if isLeader { - if err := ledc.deliverer.StartDeliverForChannel(ledc.chainID, ledc.committer); err != nil { - logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err) - } - } else { - if err := ledc.deliverer.StopDeliverForChannel(ledc.chainID); err != nil { - logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err) - } - - } -} diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 4b9f5af3bca..6f55b95d7e7 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -347,7 +347,7 @@ func TestLeaderElectionWithRealGossip(t *testing.T) { for i := 0; i < n; i++ { services[i] = &electionService{nil, false, 0} - services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(gossipCommon.ChainID(channelName), services[i].callback) + services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(channelName, services[i].callback) } logger.Warning("Waiting for leader election") @@ -373,7 +373,7 @@ func TestLeaderElectionWithRealGossip(t *testing.T) { for idx, i := range secondChannelPeerIndexes { secondChannelServices[idx] = &electionService{nil, false, 0} - secondChannelServices[idx].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(gossipCommon.ChainID(secondChannelName), secondChannelServices[idx].callback) + secondChannelServices[idx].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(secondChannelName, secondChannelServices[idx].callback) } assert.True(t, waitForLeaderElection(t, secondChannelServices, time.Second*30, time.Second*2), "One leader should be selected for chanB") diff --git a/peer/core.yaml b/peer/core.yaml index 6c07854009f..25f4fde3d86 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -68,13 +68,24 @@ peer: # Gossip related configuration gossip: + # Bootstrap set to initialize gossip with bootstrap: 127.0.0.1:7051 - # Use automatically chosen peer (high avalibility) to distribute blocks in channel or static one - # Setting this true and orgLeader true cause panic exit + + # NOTE: orgLeader and useLeaderElection parameters are mutual exclusive + # setting both to true would result in the termination of the peer, since this is undefined + # state. + + # Defines whenever peer will initialize dynamic algorithm for + # "leader" selection, where leader is the peer to establish + # connection with ordering service and use delivery protocol + # to pull ledger blocks from ordering service useLeaderElection: false - # For debug - is peer is its org leader and should pass blocks from orderer to other peers in org - # Works only if useLeaderElection set to false + # Statically defines peer to be an organization "leader", + # where this means that current peer will maintain connection + # with ordering service and disseminate block across peers in + # its own organization orgLeader: true + # ID of this instance endpoint: # Maximum count of blocks we store in memory