Skip to content

Commit

Permalink
[FAB-2630] Integration election with core.yaml
Browse files Browse the repository at this point in the history
Leader election configuration parameters should be set using core.yaml
and be configurable by various tests.

Change-Id: Ief5d0a4ab0fdfe99669bd8577063127b71fe77c0
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed Mar 4, 2017
1 parent ebe1b4d commit 035c51c
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 27 deletions.
55 changes: 42 additions & 13 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ import (

"github.com/hyperledger/fabric/gossip/util"
"github.com/op/go-logging"
)

var (
startupGracePeriod = time.Second * 15
membershipSampleInterval = time.Second
leaderAliveThreshold = time.Second * 10
leadershipDeclarationInterval = leaderAliveThreshold / 2
leaderElectionDuration = time.Second * 5
"github.com/spf13/viper"
)

// Gossip leader election module
Expand Down Expand Up @@ -178,7 +171,7 @@ type leaderElectionSvcImpl struct {
func (le *leaderElectionSvcImpl) start() {
le.stopWG.Add(2)
go le.handleMessages()
le.waitForMembershipStabilization(startupGracePeriod)
le.waitForMembershipStabilization(getStartupGracePeriod())
go le.run()
}

Expand Down Expand Up @@ -273,7 +266,7 @@ func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
le.propose()
le.waitForInterrupt(leaderElectionDuration)
le.waitForInterrupt(getLeaderElectionDuration())
// If someone declared itself as a leader, give up
// on trying to become a leader too
if le.isLeaderExists() {
Expand Down Expand Up @@ -309,7 +302,7 @@ func (le *leaderElectionSvcImpl) follower() {
le.proposals.Clear()
atomic.StoreInt32(&le.leaderExists, int32(0))
select {
case <-time.After(leaderAliveThreshold):
case <-time.After(getLeaderAliveThreshold()):
case <-le.stopChan:
le.stopChan <- struct{}{}
}
Expand All @@ -318,7 +311,7 @@ func (le *leaderElectionSvcImpl) follower() {
func (le *leaderElectionSvcImpl) leader() {
leaderDeclaration := le.adapter.CreateMessage(true)
le.adapter.Gossip(leaderDeclaration)
le.waitForInterrupt(leadershipDeclarationInterval)
le.waitForInterrupt(getLeadershipDeclarationInterval())
}

// waitForMembershipStabilization waits for membership view to stabilize
Expand All @@ -329,7 +322,7 @@ func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.D
endTime := time.Now().Add(timeLimit)
viewSize := len(le.adapter.Peers())
for !le.shouldStop() {
time.Sleep(membershipSampleInterval)
time.Sleep(getMembershipSampleInterval())
newSize := len(le.adapter.Peers())
if newSize == viewSize || time.Now().After(endTime) || le.isLeaderExists() {
return
Expand Down Expand Up @@ -391,3 +384,39 @@ func (le *leaderElectionSvcImpl) Stop() {
le.stopChan <- struct{}{}
le.stopWG.Wait()
}

func SetStartupGracePeriod(t time.Duration) {
viper.Set("peer.gossip.election.startupGracePeriod", t)
}

func SetMembershipSampleInterval(t time.Duration) {
viper.Set("peer.gossip.election.membershipSampleInterval", t)
}

func SetLeaderAliveThreshold(t time.Duration) {
viper.Set("peer.gossip.election.leaderAliveThreshold", t)
}

func SetLeaderElectionDuration(t time.Duration) {
viper.Set("peer.gossip.election.leaderElectionDuration", t)
}

func getStartupGracePeriod() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.startupGracePeriod", time.Second*15)
}

func getMembershipSampleInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.membershipSampleInterval", time.Second)
}

func getLeaderAliveThreshold() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.leaderAliveThreshold", time.Second*10)
}

func getLeadershipDeclarationInterval() time.Duration {
return time.Duration(getLeaderAliveThreshold() / 2)
}

func getLeaderElectionDuration() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.leaderElectionDuration", time.Second*5)
}
69 changes: 55 additions & 14 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"testing"
"time"

"strings"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand All @@ -33,11 +36,11 @@ const (
)

func init() {
startupGracePeriod = time.Millisecond * 500
membershipSampleInterval = time.Millisecond * 100
leaderAliveThreshold = time.Millisecond * 500
leadershipDeclarationInterval = leaderAliveThreshold / 2
leaderElectionDuration = time.Millisecond * 500

SetStartupGracePeriod(time.Millisecond * 500)
SetMembershipSampleInterval(time.Millisecond * 100)
SetLeaderAliveThreshold(time.Millisecond * 500)
SetLeaderElectionDuration(time.Millisecond * 500)
}

type msg struct {
Expand Down Expand Up @@ -186,7 +189,7 @@ func TestInitPeersAtSameTime(t *testing.T) {
// Scenario: Peers are spawned at the same time
// expected outcome: the peer that has the lowest ID is the leader
peers := createPeers(0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0)
time.Sleep(startupGracePeriod + leaderElectionDuration)
time.Sleep(getStartupGracePeriod() + getLeaderElectionDuration())
leaders := waitForLeaderElection(t, peers)
isP0leader := peers[len(peers)-1].IsLeader()
assert.True(t, isP0leader, "p0 isn't a leader. Leaders are: %v", leaders)
Expand All @@ -198,7 +201,7 @@ func TestInitPeersStartAtIntervals(t *testing.T) {
t.Parallel()
// Scenario: Peers are spawned one by one in a slow rate
// expected outcome: the first peer is the leader although its ID is lowest
peers := createPeers(startupGracePeriod+leadershipDeclarationInterval, 3, 2, 1, 0)
peers := createPeers(getStartupGracePeriod()+getLeadershipDeclarationInterval(), 3, 2, 1, 0)
waitForLeaderElection(t, peers)
assert.True(t, peers[0].IsLeader())
}
Expand Down Expand Up @@ -226,9 +229,9 @@ func TestStop(t *testing.T) {
for _, p := range peers {
p.Stop()
}
time.Sleep(leaderAliveThreshold)
time.Sleep(getLeaderAliveThreshold())
gossipCounterAfterStop := atomic.LoadInt32(&gossipCounter)
time.Sleep(leaderAliveThreshold * 5)
time.Sleep(getLeaderAliveThreshold() * 5)
assert.Equal(t, gossipCounterAfterStop, atomic.LoadInt32(&gossipCounter))
}

Expand Down Expand Up @@ -265,7 +268,7 @@ func TestConvergence(t *testing.T) {
p.On("Peers").Return(allPeerIds)
}

time.Sleep(leaderAliveThreshold * 5)
time.Sleep(getLeaderAliveThreshold() * 5)
finalLeaders := waitForLeaderElection(t, combinedPeers)
assert.Len(t, finalLeaders, 1, "Combined peer group was suppose to have 1 leader exactly")
assert.Equal(t, leaders1[0], finalLeaders[0], "Combined peer group has different leader than expected:")
Expand All @@ -288,12 +291,12 @@ func TestLeadershipTakeover(t *testing.T) {
// Scenario: Peers spawn one by one in descending order.
// After a while, the leader peer stops.
// expected outcome: the peer that takes over is the peer with lowest ID
peers := createPeers(startupGracePeriod+leadershipDeclarationInterval, 5, 4, 3, 2)
peers := createPeers(getStartupGracePeriod()+getLeadershipDeclarationInterval(), 5, 4, 3, 2)
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p5", leaders[0])
peers[0].Stop()
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*3)
time.Sleep(getLeadershipDeclarationInterval() + getLeaderAliveThreshold()*3)
leaders = waitForLeaderElection(t, peers[1:])
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p2", leaders[0])
Expand All @@ -316,7 +319,7 @@ func TestPartition(t *testing.T) {
p.On("Peers").Return([]Peer{})
p.On("Gossip", mock.Anything)
}
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2)
time.Sleep(getLeadershipDeclarationInterval() + getLeaderAliveThreshold()*2)
leaders = waitForMultipleLeadersElection(t, peers, 6)
assert.Len(t, leaders, 6)
for _, p := range peers {
Expand All @@ -329,7 +332,7 @@ func TestPartition(t *testing.T) {
p.callbackInvoked = false
p.sharedLock.Unlock()
}
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2)
time.Sleep(getLeadershipDeclarationInterval() + getLeaderAliveThreshold()*2)
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
Expand All @@ -343,3 +346,41 @@ func TestPartition(t *testing.T) {
}

}

func TestConfigFromFile(t *testing.T) {
preStartupGracePeriod := getStartupGracePeriod()
preMembershipSampleInterval := getMembershipSampleInterval()
preLeaderAliveThreshold := getLeaderAliveThreshold()
preLeaderElectionDuration := getLeaderElectionDuration()

// Recover the config values in order to avoid impacting other tests
defer func() {
SetStartupGracePeriod(preStartupGracePeriod)
SetMembershipSampleInterval(preMembershipSampleInterval)
SetLeaderAliveThreshold(preLeaderAliveThreshold)
SetLeaderElectionDuration(preLeaderElectionDuration)
}()

// Verify if using default values when config is missing
viper.Reset()
assert.Equal(t, time.Second*15, getStartupGracePeriod())
assert.Equal(t, time.Second, getMembershipSampleInterval())
assert.Equal(t, time.Second*10, getLeaderAliveThreshold())
assert.Equal(t, time.Second*5, getLeaderElectionDuration())
assert.Equal(t, getLeaderAliveThreshold()/2, getLeadershipDeclarationInterval())

//Verify reading the values from config file
viper.Reset()
viper.SetConfigName("core")
viper.SetEnvPrefix("CORE")
viper.AddConfigPath("./../../peer")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
err := viper.ReadInConfig()
assert.NoError(t, err)
assert.Equal(t, time.Second*15, getStartupGracePeriod())
assert.Equal(t, time.Second, getMembershipSampleInterval())
assert.Equal(t, time.Second*10, getLeaderAliveThreshold())
assert.Equal(t, time.Second*5, getLeaderElectionDuration())
assert.Equal(t, getLeaderAliveThreshold()/2, getLeadershipDeclarationInterval())
}
11 changes: 11 additions & 0 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ peer:
# If this isn't set, the peer will not be known to other organizations.
externalEndpoint:

# Leader election service configuration
election:
# Longest time peer wait for stable membership during leader election startup (unit: second)
startupGracePeriod: 15s
# Interval gossip membership sampled to check its stability (unit: second)
membershipSampleInterval: 1s
# Time pass since last declaration message before peer decide to go to election (unit: second)
leaderAliveThreshold: 10s
# Time between peer sends propose message and declare itself as a leader (sends declaration message) (unit: second)
leaderElectionDuration: 5s

# Sync related configuration
sync:
blocks:
Expand Down

0 comments on commit 035c51c

Please sign in to comment.