Skip to content

Commit

Permalink
gossip discovery tests improvements
Browse files Browse the repository at this point in the history
Added some more tests to discovery module
and also did some refactoring of existing tests
code-coverage is now 84% instead of 70%

Changes in discovery_impl are only gofmt stuff

Change-Id: I1125d08365e653ec87d6409ce90f8f389261ef17
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Nov 8, 2016
1 parent b8c6b2f commit 216ae65
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 50 deletions.
22 changes: 11 additions & 11 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ type gossipDiscoveryImpl struct {
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery {
d := &gossipDiscoveryImpl{
endpoint: self.Endpoint,
incTime: uint64(time.Now().UnixNano()),
metadata: self.Metadata,
pkiID: self.PKIid,
seqNum: uint64(0),
endpoint: self.Endpoint,
incTime: uint64(time.Now().UnixNano()),
metadata: self.Metadata,
pkiID: self.PKIid,
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
id2Member: make(map[string]*NetworkMember),
cachedMembership: &proto.MembershipResponse{
Alive: make([]*proto.AliveMessage, 0),
Dead: make([]*proto.AliveMessage, 0),
},
crpypt: crypt,
crpypt: crypt,
bootstrapPeers: bootstrapPeers,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LOGGING_DISCOVERY_MODULE, self.Endpoint),
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LOGGING_DISCOVERY_MODULE, self.Endpoint),
}

d.logger.SetLevel(logging.WARNING)
Expand Down
143 changes: 104 additions & 39 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -42,6 +43,7 @@ type dummyCommModule struct {
lock *sync.RWMutex
incMsgs chan *proto.GossipMessage
lastSeqs map[string]uint64
shouldGossip bool
}

type gossipMsg struct {
Expand All @@ -55,8 +57,9 @@ func (m *gossipMsg) GetGossipMessage() *proto.GossipMessage {
type gossipInstance struct {
comm *dummyCommModule
Discovery
gRGCserv *grpc.Server
lsnr net.Listener
gRGCserv *grpc.Server
lsnr net.Listener
shouldGossip bool
}

func (comm *dummyCommModule) ValidateAliveMsg(am *proto.AliveMessage) bool {
Expand All @@ -68,6 +71,9 @@ func (comm *dummyCommModule) SignMessage(am *proto.AliveMessage) *proto.AliveMes
}

func (comm *dummyCommModule) Gossip(msg *proto.GossipMessage) {
if !comm.shouldGossip {
return
}
comm.lock.Lock()
defer comm.lock.Unlock()
for _, conn := range comm.streams {
Expand Down Expand Up @@ -203,6 +209,14 @@ func (g *gossipInstance) Ping(context.Context, *proto.Empty) (*proto.Empty, erro
}

func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance {
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true)
}

func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance {
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false)
}

func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool) *gossipInstance {
comm := &dummyCommModule{
conns: make(map[string]*grpc.ClientConn),
streams: make(map[string]proto.Gossip_GossipStreamClient),
Expand All @@ -212,6 +226,7 @@ func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *goss
detectedDead: make(chan string, 10000),
lock: &sync.RWMutex{},
lastSeqs: make(map[string]uint64),
shouldGossip: shouldGossip,
}

endpoint := fmt.Sprintf("localhost:%d", port)
Expand All @@ -230,7 +245,7 @@ func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *goss

discSvc := NewDiscoveryService(bootstrapPeers, self, comm, comm)
discSvc.(*gossipDiscoveryImpl).logger.SetLevel(logging.WARNING)
gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll}
gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip}

proto.RegisterGossipServer(s, gossInst)
go s.Serve(ll)
Expand Down Expand Up @@ -287,19 +302,34 @@ func TestUpdate(t *testing.T) {
return true
}


waitUntilOrFail(t, checkMembership)
stopInstances(t, instances)
}

stopAction := &sync.WaitGroup{}
for _, inst := range instances {
stopAction.Add(1)
go func(inst *gossipInstance) {
defer stopAction.Done()
inst.Stop()
}(inst)
}
func TestInitiateSync(t *testing.T) {
nodeNum := 10
bootPeers := []string{bootPeer(3611), bootPeer(3612)}
instances := []*gossipInstance{}

waitUntilOrFailBlocking(t, stopAction.Wait)
toDie := int32(0)
for i := 1; i <= nodeNum; i++ {
id := fmt.Sprintf("d%d", i)
inst := createDiscoveryInstanceWithNoGossip(3610+i, id, bootPeers)
instances = append(instances, inst)
go func() {
for {
if atomic.LoadInt32(&toDie) == int32(1) {
return
}
time.Sleep(aliveExpirationTimeout / 3)
inst.InitiateSync(9)
}
}()
}
time.Sleep(aliveExpirationTimeout * 4)
assertMembership(t, instances, nodeNum-1)
atomic.StoreInt32(&toDie, int32(1))
stopInstances(t, instances)
}

func TestExpiration(t *testing.T) {
Expand All @@ -319,21 +349,12 @@ func TestExpiration(t *testing.T) {
instances = append(instances, inst)
}

fullMembership := func() bool {
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
}

waitUntilOrFail(t, fullMembership)
assertMembership(t, instances, nodeNum-1)

waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)

time.Sleep(time.Duration(2) * time.Second)
membershipReduced := func() bool {
return nodeNum-3 == len(instances[0].GetMembership())
}

waitUntilOrFail(t, membershipReduced)
assertMembership(t, instances, nodeNum-3)

stopAction := &sync.WaitGroup{}
for i, inst := range instances {
Expand Down Expand Up @@ -367,21 +388,8 @@ func TestGetFullMembership(t *testing.T) {
instances = append(instances, inst)
}

fullMembership := func() bool {
return nodeNum - 1 == len(instances[nodeNum-1].GetMembership())
}
waitUntilOrFail(t, fullMembership)

stopAction := &sync.WaitGroup{}
for _, inst := range instances {
stopAction.Add(1)
go func(inst *gossipInstance) {
defer stopAction.Done()
inst.Stop()
}(inst)
}

waitUntilOrFailBlocking(t, stopAction.Wait)
assertMembership(t, instances, nodeNum-1)
stopInstances(t, instances)
}

func TestGossipDiscoveryStopping(t *testing.T) {
Expand All @@ -391,6 +399,38 @@ func TestGossipDiscoveryStopping(t *testing.T) {

}

func TestConvergence(t *testing.T) {
// scenario:
// {boot peer: [peer list]}
// {d1: d2, d3, d4}
// {d5: d6, d7, d8}
// {d9: d10, d11, d12}
// connect all boot peers with d13
// take down d13
// ensure still full membership
instances := []*gossipInstance{}
for _, i := range []int{1, 5, 9} {
bootPort := 4610 + i
id := fmt.Sprintf("d%d", i)
leader := createDiscoveryInstance(bootPort, id, []string{})
instances = append(instances, leader)
for minionIndex := 1; minionIndex <= 3; minionIndex++ {
id := fmt.Sprintf("d%d", i+minionIndex)
minion := createDiscoveryInstance(4610+minionIndex+i, id, []string{bootPeer(bootPort)})
instances = append(instances, minion)
}
}

assertMembership(t, instances, 3)
connector := createDiscoveryInstance(4623, fmt.Sprintf("d13"), []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)})
instances = append(instances, connector)
assertMembership(t, instances, 12)
connector.Stop()
instances = instances[:len(instances)-1]
assertMembership(t, instances, 11)
stopInstances(t, instances)
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
start := time.Now()
limit := start.UnixNano() + timeout.Nanoseconds()
Expand All @@ -417,3 +457,28 @@ func waitUntilOrFailBlocking(t *testing.T, f func()) {
}
assert.Fail(t, "Timeout expired!")
}

func stopInstances(t *testing.T, instances []*gossipInstance) {
stopAction := &sync.WaitGroup{}
for _, inst := range instances {
stopAction.Add(1)
go func(inst *gossipInstance) {
defer stopAction.Done()
inst.Stop()
}(inst)
}

waitUntilOrFailBlocking(t, stopAction.Wait)
}

func assertMembership(t *testing.T, instances []*gossipInstance, expectedNum int) {
fullMembership := func() bool {
for _, inst := range instances {
if len(inst.GetMembership()) == expectedNum {
return true
}
}
return false
}
waitUntilOrFail(t, fullMembership)
}

0 comments on commit 216ae65

Please sign in to comment.