Skip to content

Commit

Permalink
Merge "[FAB-2630] Integration election with core.yaml"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Mar 5, 2017
2 parents 509d8ed + 035c51c commit 59c6772
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 27 deletions.
55 changes: 42 additions & 13 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ import (

"github.com/hyperledger/fabric/gossip/util"
"github.com/op/go-logging"
)

var (
startupGracePeriod = time.Second * 15
membershipSampleInterval = time.Second
leaderAliveThreshold = time.Second * 10
leadershipDeclarationInterval = leaderAliveThreshold / 2
leaderElectionDuration = time.Second * 5
"github.com/spf13/viper"
)

// Gossip leader election module
Expand Down Expand Up @@ -178,7 +171,7 @@ type leaderElectionSvcImpl struct {
func (le *leaderElectionSvcImpl) start() {
le.stopWG.Add(2)
go le.handleMessages()
le.waitForMembershipStabilization(startupGracePeriod)
le.waitForMembershipStabilization(getStartupGracePeriod())
go le.run()
}

Expand Down Expand Up @@ -273,7 +266,7 @@ func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
le.propose()
le.waitForInterrupt(leaderElectionDuration)
le.waitForInterrupt(getLeaderElectionDuration())
// If someone declared itself as a leader, give up
// on trying to become a leader too
if le.isLeaderExists() {
Expand Down Expand Up @@ -309,7 +302,7 @@ func (le *leaderElectionSvcImpl) follower() {
le.proposals.Clear()
atomic.StoreInt32(&le.leaderExists, int32(0))
select {
case <-time.After(leaderAliveThreshold):
case <-time.After(getLeaderAliveThreshold()):
case <-le.stopChan:
le.stopChan <- struct{}{}
}
Expand All @@ -318,7 +311,7 @@ func (le *leaderElectionSvcImpl) follower() {
func (le *leaderElectionSvcImpl) leader() {
leaderDeclaration := le.adapter.CreateMessage(true)
le.adapter.Gossip(leaderDeclaration)
le.waitForInterrupt(leadershipDeclarationInterval)
le.waitForInterrupt(getLeadershipDeclarationInterval())
}

// waitForMembershipStabilization waits for membership view to stabilize
Expand All @@ -329,7 +322,7 @@ func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.D
endTime := time.Now().Add(timeLimit)
viewSize := len(le.adapter.Peers())
for !le.shouldStop() {
time.Sleep(membershipSampleInterval)
time.Sleep(getMembershipSampleInterval())
newSize := len(le.adapter.Peers())
if newSize == viewSize || time.Now().After(endTime) || le.isLeaderExists() {
return
Expand Down Expand Up @@ -391,3 +384,39 @@ func (le *leaderElectionSvcImpl) Stop() {
le.stopChan <- struct{}{}
le.stopWG.Wait()
}

func SetStartupGracePeriod(t time.Duration) {
viper.Set("peer.gossip.election.startupGracePeriod", t)
}

func SetMembershipSampleInterval(t time.Duration) {
viper.Set("peer.gossip.election.membershipSampleInterval", t)
}

func SetLeaderAliveThreshold(t time.Duration) {
viper.Set("peer.gossip.election.leaderAliveThreshold", t)
}

func SetLeaderElectionDuration(t time.Duration) {
viper.Set("peer.gossip.election.leaderElectionDuration", t)
}

func getStartupGracePeriod() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.startupGracePeriod", time.Second*15)
}

func getMembershipSampleInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.membershipSampleInterval", time.Second)
}

func getLeaderAliveThreshold() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.leaderAliveThreshold", time.Second*10)
}

func getLeadershipDeclarationInterval() time.Duration {
return time.Duration(getLeaderAliveThreshold() / 2)
}

func getLeaderElectionDuration() time.Duration {
return util.GetDurationOrDefault("peer.gossip.election.leaderElectionDuration", time.Second*5)
}
69 changes: 55 additions & 14 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"testing"
"time"

"strings"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand All @@ -33,11 +36,11 @@ const (
)

func init() {
startupGracePeriod = time.Millisecond * 500
membershipSampleInterval = time.Millisecond * 100
leaderAliveThreshold = time.Millisecond * 500
leadershipDeclarationInterval = leaderAliveThreshold / 2
leaderElectionDuration = time.Millisecond * 500

SetStartupGracePeriod(time.Millisecond * 500)
SetMembershipSampleInterval(time.Millisecond * 100)
SetLeaderAliveThreshold(time.Millisecond * 500)
SetLeaderElectionDuration(time.Millisecond * 500)
}

type msg struct {
Expand Down Expand Up @@ -186,7 +189,7 @@ func TestInitPeersAtSameTime(t *testing.T) {
// Scenario: Peers are spawned at the same time
// expected outcome: the peer that has the lowest ID is the leader
peers := createPeers(0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0)
time.Sleep(startupGracePeriod + leaderElectionDuration)
time.Sleep(getStartupGracePeriod() + getLeaderElectionDuration())
leaders := waitForLeaderElection(t, peers)
isP0leader := peers[len(peers)-1].IsLeader()
assert.True(t, isP0leader, "p0 isn't a leader. Leaders are: %v", leaders)
Expand All @@ -198,7 +201,7 @@ func TestInitPeersStartAtIntervals(t *testing.T) {
t.Parallel()
// Scenario: Peers are spawned one by one in a slow rate
// expected outcome: the first peer is the leader although its ID is lowest
peers := createPeers(startupGracePeriod+leadershipDeclarationInterval, 3, 2, 1, 0)
peers := createPeers(getStartupGracePeriod()+getLeadershipDeclarationInterval(), 3, 2, 1, 0)
waitForLeaderElection(t, peers)
assert.True(t, peers[0].IsLeader())
}
Expand Down Expand Up @@ -226,9 +229,9 @@ func TestStop(t *testing.T) {
for _, p := range peers {
p.Stop()
}
time.Sleep(leaderAliveThreshold)
time.Sleep(getLeaderAliveThreshold())
gossipCounterAfterStop := atomic.LoadInt32(&gossipCounter)
time.Sleep(leaderAliveThreshold * 5)
time.Sleep(getLeaderAliveThreshold() * 5)
assert.Equal(t, gossipCounterAfterStop, atomic.LoadInt32(&gossipCounter))
}

Expand Down Expand Up @@ -265,7 +268,7 @@ func TestConvergence(t *testing.T) {
p.On("Peers").Return(allPeerIds)
}

time.Sleep(leaderAliveThreshold * 5)
time.Sleep(getLeaderAliveThreshold() * 5)
finalLeaders := waitForLeaderElection(t, combinedPeers)
assert.Len(t, finalLeaders, 1, "Combined peer group was suppose to have 1 leader exactly")
assert.Equal(t, leaders1[0], finalLeaders[0], "Combined peer group has different leader than expected:")
Expand All @@ -288,12 +291,12 @@ func TestLeadershipTakeover(t *testing.T) {
// Scenario: Peers spawn one by one in descending order.
// After a while, the leader peer stops.
// expected outcome: the peer that takes over is the peer with lowest ID
peers := createPeers(startupGracePeriod+leadershipDeclarationInterval, 5, 4, 3, 2)
peers := createPeers(getStartupGracePeriod()+getLeadershipDeclarationInterval(), 5, 4, 3, 2)
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p5", leaders[0])
peers[0].Stop()
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*3)
time.Sleep(getLeadershipDeclarationInterval() + getLeaderAliveThreshold()*3)
leaders = waitForLeaderElection(t, peers[1:])
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p2", leaders[0])
Expand All @@ -316,7 +319,7 @@ func TestPartition(t *testing.T) {
p.On("Peers").Return([]Peer{})
p.On("Gossip", mock.Anything)
}
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2)
time.Sleep(getLeadershipDeclarationInterval() + getLeaderAliveThreshold()*2)
leaders = waitForMultipleLeadersElection(t, peers, 6)
assert.Len(t, leaders, 6)
for _, p := range peers {
Expand All @@ -329,7 +332,7 @@ func TestPartition(t *testing.T) {
p.callbackInvoked = false
p.sharedLock.Unlock()
}
time.Sleep(leadershipDeclarationInterval + leaderAliveThreshold*2)
time.Sleep(getLeadershipDeclarationInterval() + getLeaderAliveThreshold()*2)
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
Expand All @@ -343,3 +346,41 @@ func TestPartition(t *testing.T) {
}

}

func TestConfigFromFile(t *testing.T) {
preStartupGracePeriod := getStartupGracePeriod()
preMembershipSampleInterval := getMembershipSampleInterval()
preLeaderAliveThreshold := getLeaderAliveThreshold()
preLeaderElectionDuration := getLeaderElectionDuration()

// Recover the config values in order to avoid impacting other tests
defer func() {
SetStartupGracePeriod(preStartupGracePeriod)
SetMembershipSampleInterval(preMembershipSampleInterval)
SetLeaderAliveThreshold(preLeaderAliveThreshold)
SetLeaderElectionDuration(preLeaderElectionDuration)
}()

// Verify if using default values when config is missing
viper.Reset()
assert.Equal(t, time.Second*15, getStartupGracePeriod())
assert.Equal(t, time.Second, getMembershipSampleInterval())
assert.Equal(t, time.Second*10, getLeaderAliveThreshold())
assert.Equal(t, time.Second*5, getLeaderElectionDuration())
assert.Equal(t, getLeaderAliveThreshold()/2, getLeadershipDeclarationInterval())

//Verify reading the values from config file
viper.Reset()
viper.SetConfigName("core")
viper.SetEnvPrefix("CORE")
viper.AddConfigPath("./../../peer")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
err := viper.ReadInConfig()
assert.NoError(t, err)
assert.Equal(t, time.Second*15, getStartupGracePeriod())
assert.Equal(t, time.Second, getMembershipSampleInterval())
assert.Equal(t, time.Second*10, getLeaderAliveThreshold())
assert.Equal(t, time.Second*5, getLeaderElectionDuration())
assert.Equal(t, getLeaderAliveThreshold()/2, getLeadershipDeclarationInterval())
}
11 changes: 11 additions & 0 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ peer:
# If this isn't set, the peer will not be known to other organizations.
externalEndpoint:

# Leader election service configuration
election:
# Longest time peer wait for stable membership during leader election startup (unit: second)
startupGracePeriod: 15s
# Interval gossip membership sampled to check its stability (unit: second)
membershipSampleInterval: 1s
# Time pass since last declaration message before peer decide to go to election (unit: second)
leaderAliveThreshold: 10s
# Time between peer sends propose message and declare itself as a leader (sends declaration message) (unit: second)
leaderElectionDuration: 5s

# Sync related configuration
sync:
blocks:
Expand Down

0 comments on commit 59c6772

Please sign in to comment.