Skip to content

Commit

Permalink
Refactoring connection between gossip LE and delivery.
Browse files Browse the repository at this point in the history
While connecting leader election with delivery service
there was introduced several redundant entites, this commit
takes care to refactor and clean the relevant code.

It is crucial to keep this part minimal and compact to allow
to maintain reliable connectivty to ordering services.

Change-Id: I20737fe116b81d7c535364eb96b8f010b9f50cfe
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Mar 21, 2017
1 parent 5b59e06 commit 2e7d687
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 63 deletions.
8 changes: 3 additions & 5 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package election

import (
"bytes"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -82,7 +81,6 @@ import (
// LeaderElectionAdapter is used by the leader election module
// to send and receive messages and to get membership information
type LeaderElectionAdapter interface {

// Gossip gossips a message to other peers
Gossip(Msg)

Expand Down Expand Up @@ -131,7 +129,7 @@ func noopCallback(_ bool) {
// NewLeaderElectionService returns a new LeaderElectionService
func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback) LeaderElectionService {
if len(id) == 0 {
panic(fmt.Errorf("Empty id"))
panic("Empty id")
}
le := &leaderElectionSvcImpl{
id: peerID(id),
Expand Down Expand Up @@ -363,13 +361,13 @@ func (le *leaderElectionSvcImpl) IsLeader() bool {
func (le *leaderElectionSvcImpl) beLeader() {
le.logger.Debug(le.id, ": Becoming a leader")
atomic.StoreInt32(&le.isLeader, int32(1))
le.callback(true)
go le.callback(true)
}

func (le *leaderElectionSvcImpl) stopBeingLeader() {
le.logger.Debug(le.id, "Stopped being a leader")
atomic.StoreInt32(&le.isLeader, int32(0))
le.callback(false)
go le.callback(false)
}

func (le *leaderElectionSvcImpl) shouldStop() bool {
Expand Down
51 changes: 33 additions & 18 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ func (m *msg) IsDeclaration() bool {
type peer struct {
mockedMethods map[string]struct{}
mock.Mock
id string
peers map[string]*peer
sharedLock *sync.RWMutex
msgChan chan Msg
isLeaderFromCallback bool
callbackInvoked bool
id string
peers map[string]*peer
sharedLock *sync.RWMutex
msgChan chan Msg
leaderFromCallback bool
callbackInvoked bool
lock sync.RWMutex
LeaderElectionService
}

Expand Down Expand Up @@ -131,10 +132,24 @@ func (p *peer) Peers() []Peer {
}

func (p *peer) leaderCallback(isLeader bool) {
p.isLeaderFromCallback = isLeader
p.lock.Lock()
defer p.lock.Unlock()
p.leaderFromCallback = isLeader
p.callbackInvoked = true
}

func (p *peer) isLeaderFromCallback() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.leaderFromCallback
}

func (p *peer) isCallbackInvoked() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.callbackInvoked
}

func createPeers(spawnInterval time.Duration, ids ...int) []*peer {
peers := make([]*peer, len(ids))
peerMap := make(map[string]*peer)
Expand All @@ -152,7 +167,7 @@ func createPeers(spawnInterval time.Duration, ids ...int) []*peer {
func createPeer(id int, peerMap map[string]*peer, l *sync.RWMutex) *peer {
idStr := fmt.Sprintf("p%d", id)
c := make(chan Msg, 100)
p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{}), isLeaderFromCallback: false, callbackInvoked: false}
p := &peer{id: idStr, peers: peerMap, sharedLock: l, msgChan: c, mockedMethods: make(map[string]struct{}), leaderFromCallback: false, callbackInvoked: false}
p.LeaderElectionService = NewLeaderElectionService(p, idStr, p.leaderCallback)
l.Lock()
peerMap[idStr] = p
Expand Down Expand Up @@ -192,7 +207,7 @@ func TestInitPeersAtSameTime(t *testing.T) {
leaders := waitForLeaderElection(t, peers)
isP0leader := peers[len(peers)-1].IsLeader()
assert.True(t, isP0leader, "p0 isn't a leader. Leaders are: %v", leaders)
assert.True(t, peers[len(peers)-1].isLeaderFromCallback, "p0 didn't got leaderhip change callback invoked")
assert.True(t, peers[len(peers)-1].isLeaderFromCallback(), "p0 didn't got leaderhip change callback invoked")
assert.Len(t, leaders, 1, "More than 1 leader elected")
}

Expand Down Expand Up @@ -274,12 +289,12 @@ func TestConvergence(t *testing.T) {

for _, p := range combinedPeers {
if p.id == finalLeaders[0] {
assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for ", p.id)
assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for ", p.id)
assert.True(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for ", p.id)
assert.True(t, p.isCallbackInvoked(), "Leadership callback wasn't invoked for ", p.id)
} else {
assert.False(t, p.isLeaderFromCallback, "Leadership callback result is wrong for ", p.id)
assert.False(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for ", p.id)
if p.id == leaders2[0] {
assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for ", p.id)
assert.True(t, p.isCallbackInvoked(), "Leadership callback wasn't invoked for ", p.id)
}
}
}
Expand Down Expand Up @@ -312,7 +327,7 @@ func TestPartition(t *testing.T) {
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
assert.True(t, peers[len(peers)-1].isLeaderFromCallback, "Leadership callback result is wrong for %s", peers[len(peers)-1].id)
assert.True(t, peers[len(peers)-1].isLeaderFromCallback(), "Leadership callback result is wrong for %s", peers[len(peers)-1].id)

for _, p := range peers {
p.On("Peers").Return([]Peer{})
Expand All @@ -322,7 +337,7 @@ func TestPartition(t *testing.T) {
leaders = waitForMultipleLeadersElection(t, peers, 6)
assert.Len(t, leaders, 6)
for _, p := range peers {
assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id)
assert.True(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for %s", p.id)
}

for _, p := range peers {
Expand All @@ -337,10 +352,10 @@ func TestPartition(t *testing.T) {
assert.Equal(t, "p0", leaders[0])
for _, p := range peers {
if p.id == leaders[0] {
assert.True(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id)
assert.True(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for %s", p.id)
} else {
assert.False(t, p.isLeaderFromCallback, "Leadership callback result is wrong for %s", p.id)
assert.True(t, p.callbackInvoked, "Leadership callback wasn't invoked for %s", p.id)
assert.False(t, p.isLeaderFromCallback(), "Leadership callback result is wrong for %s", p.id)
assert.True(t, p.isCallbackInvoked(), "Leadership callback wasn't invoked for %s", p.id)
}
}

Expand Down
69 changes: 35 additions & 34 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
peerComm "github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/election"
Expand Down Expand Up @@ -186,23 +187,26 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
}
}

// Delivery service might be nil only if it was not able to get connected
// to the ordering service
if g.deliveryService != nil {
// Parameters:
// - peer.gossip.useLeaderElection
// - peer.gossip.orgLeader
//
// are mutual exclusive, setting both to true is not defined, hence
// peer will panic and terminate
leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
staticOrderConnection := viper.GetBool("peer.gossip.orgLeader")
isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")

if leaderElection && staticOrderConnection {
msg := "Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution"
logger.Panic(msg)
} else if leaderElection {
if leaderElection && isStaticOrgLeader {
logger.Panic("Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution")
}

if leaderElection {
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
connector := &leaderElectionDeliverConnector{
deliverer: g.deliveryService,
committer: committer,
chainID: chainID,
}
electionService := g.newLeaderElectionComponent(gossipCommon.ChainID(connector.chainID), connector.leadershipStatusChange)
g.leaderElection[chainID] = electionService
} else if staticOrderConnection {
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
} else if isStaticOrgLeader {
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
g.deliveryService.StartDeliverForChannel(chainID, committer)
} else {
Expand Down Expand Up @@ -270,9 +274,9 @@ func (g *gossipServiceImpl) Stop() {
}
}

func (g *gossipServiceImpl) newLeaderElectionComponent(channel gossipCommon.ChainID, callback func(bool)) election.LeaderElectionService {
func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService {
PKIid := g.idMapper.GetPKIidOfCert(g.peerIdentity)
adapter := election.NewAdapter(g, PKIid, channel)
adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID))
return election.NewLeaderElectionService(adapter, string(PKIid), callback)
}

Expand All @@ -285,6 +289,22 @@ func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool {
return false
}

func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil {
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
}
} else {
if err := g.deliveryService.StopDeliverForChannel(chainID); err != nil {
logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err)
}

}

}
}

func orgListFromConfig(config Config) []string {
var orgList []string
for orgName := range config.Organizations() {
Expand Down Expand Up @@ -324,22 +344,3 @@ func (s *secImpl) VerifyByChannel(chainID gossipCommon.ChainID, peerIdentity api
func (s *secImpl) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
return nil
}

type leaderElectionDeliverConnector struct {
deliverer deliverclient.DeliverService
chainID string
committer committer.Committer
}

func (ledc *leaderElectionDeliverConnector) leadershipStatusChange(isLeader bool) {
if isLeader {
if err := ledc.deliverer.StartDeliverForChannel(ledc.chainID, ledc.committer); err != nil {
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
}
} else {
if err := ledc.deliverer.StopDeliverForChannel(ledc.chainID); err != nil {
logger.Error("Delivery service is not able to stop blocks delivery for chain, due to", err)
}

}
}
4 changes: 2 additions & 2 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func TestLeaderElectionWithRealGossip(t *testing.T) {

for i := 0; i < n; i++ {
services[i] = &electionService{nil, false, 0}
services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(gossipCommon.ChainID(channelName), services[i].callback)
services[i].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(channelName, services[i].callback)
}

logger.Warning("Waiting for leader election")
Expand All @@ -373,7 +373,7 @@ func TestLeaderElectionWithRealGossip(t *testing.T) {

for idx, i := range secondChannelPeerIndexes {
secondChannelServices[idx] = &electionService{nil, false, 0}
secondChannelServices[idx].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(gossipCommon.ChainID(secondChannelName), secondChannelServices[idx].callback)
secondChannelServices[idx].LeaderElectionService = gossips[i].(*gossipServiceImpl).newLeaderElectionComponent(secondChannelName, secondChannelServices[idx].callback)
}

assert.True(t, waitForLeaderElection(t, secondChannelServices, time.Second*30, time.Second*2), "One leader should be selected for chanB")
Expand Down
19 changes: 15 additions & 4 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,24 @@ peer:

# Gossip related configuration
gossip:
# Bootstrap set to initialize gossip with
bootstrap: 127.0.0.1:7051
# Use automatically chosen peer (high avalibility) to distribute blocks in channel or static one
# Setting this true and orgLeader true cause panic exit

# NOTE: orgLeader and useLeaderElection parameters are mutual exclusive
# setting both to true would result in the termination of the peer, since this is undefined
# state.

# Defines whenever peer will initialize dynamic algorithm for
# "leader" selection, where leader is the peer to establish
# connection with ordering service and use delivery protocol
# to pull ledger blocks from ordering service
useLeaderElection: false
# For debug - is peer is its org leader and should pass blocks from orderer to other peers in org
# Works only if useLeaderElection set to false
# Statically defines peer to be an organization "leader",
# where this means that current peer will maintain connection
# with ordering service and disseminate block across peers in
# its own organization
orgLeader: true

# ID of this instance
endpoint:
# Maximum count of blocks we store in memory
Expand Down

0 comments on commit 2e7d687

Please sign in to comment.