Skip to content

Commit

Permalink
[FAB-5313] Leader election yield if deliver unavailable
Browse files Browse the repository at this point in the history
This commit makes the peer relinquish its leadership if it can't
connect to the ordering service.

Change-Id: I5fe679d5f23e539828fea4a9398c7dd4d9fd0f93
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Jul 23, 2017
1 parent 3a4b1f2 commit 6962ee3
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 15 deletions.
17 changes: 14 additions & 3 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ var (
reConnectBackoffThreshold = float64(time.Hour)
)

// SetReconnectTotalTimeThreshold sets the total time the delivery service
// may spend in reconnection attempts until its retry logic gives up
// and returns an error
func SetReconnectTotalTimeThreshold(duration time.Duration) {
reConnectTotalTimeThreshold = duration
}

// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error
// When the delivery finishes, the finalizer func is called
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error

// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
Expand Down Expand Up @@ -117,7 +125,7 @@ func (d *deliverServiceImpl) validateConfiguration() error {
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
Expand All @@ -133,7 +141,10 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go d.blockProviders[chainID].DeliverBlocks()
go func() {
d.blockProviders[chainID].DeliverBlocks()
finalizer()
}()
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions core/deliverservice/deliveryclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ func TestNewDeliverService(t *testing.T) {
ConnFactory: connFactory,
})
assert.NoError(t, err)
assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}))
assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}))

// Lets start deliver twice
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "can't start delivery")
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "can't start delivery")
// Lets stop deliver that not started
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID2"), "can't stop delivery")

Expand All @@ -130,7 +130,7 @@ func TestNewDeliverService(t *testing.T) {
assert.Equal(t, 0, connNumber)
assertBlockDissemination(0, gossipServiceAdapter.GossipBlockDisseminations, t)
assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.AddPayloadsCnt))
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "Delivery service is stopping")
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "Delivery service is stopping")
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID"), "Delivery service is stopping")
}

Expand All @@ -157,7 +157,7 @@ func TestDeliverServiceRestart(t *testing.T) {
li := &mocks.MockLedgerInfo{Height: uint64(100)}
os.SetNextExpectedSeek(uint64(100))

err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")
// Check that delivery client requests blocks in order
go os.SendBlock(uint64(100))
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestDeliverServiceFailover(t *testing.T) {
os1.SetNextExpectedSeek(uint64(100))
os2.SetNextExpectedSeek(uint64(100))

err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")
// We need to discover to which instance the client connected to
go os1.SendBlock(uint64(100))
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestDeliverServiceServiceUnavailable(t *testing.T) {
os1.SetNextExpectedSeek(li.Height)
os2.SetNextExpectedSeek(li.Height)

err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")

waitForConnectionToSomeOSN := func() (*mocks.Orderer, *mocks.Orderer) {
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestDeliverServiceShutdown(t *testing.T) {

li := &mocks.MockLedgerInfo{Height: uint64(100)}
os.SetNextExpectedSeek(uint64(100))
err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")

// Check that delivery service requests blocks in order
Expand Down
2 changes: 1 addition & 1 deletion core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type mockDeliveryClient struct {

// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type mockDeliveryClient struct {

// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error {
return nil
}

Expand Down
55 changes: 55 additions & 0 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type LeaderElectionService interface {

// Stop stops the LeaderElectionService
Stop()

// Yield relinquishes the leadership until a new leader is elected,
// or a timeout expires
Yield()
}

type peerID []byte
Expand Down Expand Up @@ -150,10 +154,12 @@ type leaderElectionSvcImpl struct {
isLeader int32
toDie int32
leaderExists int32
yield int32
sleeping bool
adapter LeaderElectionAdapter
logger *logging.Logger
callback leadershipCallback
yieldTimer *time.Timer
}

func (le *leaderElectionSvcImpl) start() {
Expand Down Expand Up @@ -239,6 +245,11 @@ func (le *leaderElectionSvcImpl) run() {
if !le.isLeaderExists() {
le.leaderElection()
}
// If we are yielding and some leader has been elected,
// stop yielding
if le.isLeaderExists() && le.isYielding() {
le.stopYielding()
}
if le.shouldStop() {
return
}
Expand All @@ -253,14 +264,26 @@ func (le *leaderElectionSvcImpl) run() {
func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
// If we're yielding to other peers, do not participate
// in leader election
if le.isYielding() {
return
}
// Propose ourselves as a leader
le.propose()
// Collect other proposals
le.waitForInterrupt(getLeaderElectionDuration())
// If someone declared itself as a leader, give up
// on trying to become a leader too
if le.isLeaderExists() {
le.logger.Debug(le.id, ": Some peer is already a leader")
return
}

if le.isYielding() {
le.logger.Debug(le.id, ": Aborting leader election because yielding")
return
}
// Leader doesn't exist, let's see if there is a better candidate than us
// for being a leader
for _, o := range le.proposals.ToArray() {
Expand Down Expand Up @@ -364,6 +387,38 @@ func (le *leaderElectionSvcImpl) shouldStop() bool {
return atomic.LoadInt32(&le.toDie) == int32(1)
}

func (le *leaderElectionSvcImpl) isYielding() bool {
return atomic.LoadInt32(&le.yield) == int32(1)
}

func (le *leaderElectionSvcImpl) stopYielding() {
le.logger.Debug("Stopped yielding")
le.Lock()
defer le.Unlock()
atomic.StoreInt32(&le.yield, int32(0))
le.yieldTimer.Stop()
}

// Yield relinquishes the leadership until a new leader is elected,
// or a timeout expires
func (le *leaderElectionSvcImpl) Yield() {
le.Lock()
defer le.Unlock()
if !le.IsLeader() || le.isYielding() {
return
}
// Turn on the yield flag
atomic.StoreInt32(&le.yield, int32(1))
// Stop being a leader
le.stopBeingLeader()
// Clear the leader exists flag since it could be that we are the leader
atomic.StoreInt32(&le.leaderExists, int32(0))
// Clear the yield flag in any case afterwards
le.yieldTimer = time.AfterFunc(getLeaderAliveThreshold()*6, func() {
atomic.StoreInt32(&le.yield, int32(0))
})
}

// Stop stops the LeaderElectionService
func (le *leaderElectionSvcImpl) Stop() {
le.logger.Debug(le.id, ": Entering")
Expand Down
64 changes: 64 additions & 0 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,70 @@ func TestLeadershipTakeover(t *testing.T) {
assert.Equal(t, "p2", leaders[0])
}

func TestYield(t *testing.T) {
t.Parallel()
// Scenario: Peers spawn and a leader is elected.
// After a while, the leader yields.
// (Call yield twice to ensure only one callback is called)
// Expected outcome:
// (1) A new leader is elected
// (2) The old leader doesn't take back its leadership
peers := createPeers(0, 0, 1, 2, 3, 4, 5)
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
peers[0].Yield()
// Ensure the callback was called with 'false'
assert.True(t, peers[0].isCallbackInvoked())
assert.False(t, peers[0].isLeaderFromCallback())
// Clear the callback invoked flag
peers[0].lock.Lock()
peers[0].callbackInvoked = false
peers[0].lock.Unlock()
// Yield again and ensure it isn't called again
peers[0].Yield()
assert.False(t, peers[0].isCallbackInvoked())

ensureP0isNotAleader := func() bool {
leaders := waitForLeaderElection(t, peers)
return len(leaders) == 1 && leaders[0] != "p0"
}
// A new leader is elected, and it is not p0
waitForBoolFunc(t, ensureP0isNotAleader, true)
time.Sleep(getLeaderAliveThreshold() * 2)
// After a while, p0 doesn't restore its leadership status
waitForBoolFunc(t, ensureP0isNotAleader, true)
}

func TestYieldSinglePeer(t *testing.T) {
t.Parallel()
// Scenario: spawn a single peer and have it yield.
// Ensure it recovers its leadership after a while.
peers := createPeers(0, 0)
waitForLeaderElection(t, peers)
peers[0].Yield()
assert.False(t, peers[0].IsLeader())
waitForLeaderElection(t, peers)
}

func TestYieldAllPeers(t *testing.T) {
t.Parallel()
// Scenario: spawn 2 peers and have them all yield after regaining leadership.
// Ensure the first peer is the leader in the end after both peers yield
peers := createPeers(0, 0, 1)
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
peers[0].Yield()
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p1", leaders[0])
peers[1].Yield()
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
}

func TestPartition(t *testing.T) {
t.Parallel()
// Scenario: peers spawn together, and then after a while a network partition occurs
Expand Down
8 changes: 6 additions & 2 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
} else if isStaticOrgLeader {
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
g.deliveryService.StartDeliverForChannel(chainID, committer)
g.deliveryService.StartDeliverForChannel(chainID, committer, func() {})
} else {
logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
}
Expand Down Expand Up @@ -282,8 +282,12 @@ func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool {
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
yield := func() {
le := g.leaderElection[chainID]
le.Yield()
}
logger.Info("Elected as a leader, starting delivery service for channel", chainID)
if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil {
if err := g.deliveryService.StartDeliverForChannel(chainID, committer, yield); err != nil {
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type mockDeliverService struct {
running map[string]bool
}

func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
ds.running[chainID] = true
return nil
}
Expand Down
Loading

0 comments on commit 6962ee3

Please sign in to comment.