From 216ae653662d5f8dbe423fe0433a393f181976db Mon Sep 17 00:00:00 2001 From: YACOVM Date: Mon, 7 Nov 2016 12:42:20 +0200 Subject: [PATCH] gossip discovery tests improvements Added some more tests to discovery module and also did some refactoring of existing tests code-coverage is now 84% instead of 70% Changes in discovery_impl are only gofmt stuff Change-Id: I1125d08365e653ec87d6409ce90f8f389261ef17 Signed-off-by: Yacov Manevich --- gossip/discovery/discovery_impl.go | 22 ++--- gossip/discovery/discovery_test.go | 143 +++++++++++++++++++++-------- 2 files changed, 115 insertions(+), 50 deletions(-) diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 785d70adc15..c78c67d901a 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -96,11 +96,11 @@ type gossipDiscoveryImpl struct { // NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery { d := &gossipDiscoveryImpl{ - endpoint: self.Endpoint, - incTime: uint64(time.Now().UnixNano()), - metadata: self.Metadata, - pkiID: self.PKIid, - seqNum: uint64(0), + endpoint: self.Endpoint, + incTime: uint64(time.Now().UnixNano()), + metadata: self.Metadata, + pkiID: self.PKIid, + seqNum: uint64(0), deadLastTS: make(map[string]*timestamp), aliveLastTS: make(map[string]*timestamp), id2Member: make(map[string]*NetworkMember), @@ -108,13 +108,13 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS Alive: make([]*proto.AliveMessage, 0), Dead: make([]*proto.AliveMessage, 0), }, - crpypt: crypt, + crpypt: crypt, bootstrapPeers: bootstrapPeers, - comm: comm, - lock: &sync.RWMutex{}, - toDieChan: make(chan struct{}, 1), - toDieFlag: int32(0), - logger: util.GetLogger(util.LOGGING_DISCOVERY_MODULE, self.Endpoint), + comm: comm, + lock: &sync.RWMutex{}, + toDieChan: make(chan struct{}, 1), + toDieFlag: int32(0), + logger: util.GetLogger(util.LOGGING_DISCOVERY_MODULE, self.Endpoint), } d.logger.SetLevel(logging.WARNING) diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index 522fa06453b..6c1cec90c2f 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -21,6 +21,7 @@ import ( "io" "net" "sync" + "sync/atomic" "testing" "time" @@ -42,6 +43,7 @@ type dummyCommModule struct { lock *sync.RWMutex incMsgs chan *proto.GossipMessage lastSeqs map[string]uint64 + shouldGossip bool } type gossipMsg struct { @@ -55,8 +57,9 @@ func (m *gossipMsg) GetGossipMessage() *proto.GossipMessage { type gossipInstance struct { comm *dummyCommModule Discovery - gRGCserv *grpc.Server - lsnr net.Listener + gRGCserv *grpc.Server + lsnr net.Listener + shouldGossip bool } func (comm *dummyCommModule) ValidateAliveMsg(am *proto.AliveMessage) bool { @@ -68,6 +71,9 @@ func (comm *dummyCommModule) SignMessage(am *proto.AliveMessage) *proto.AliveMes } func (comm *dummyCommModule) Gossip(msg *proto.GossipMessage) { + if !comm.shouldGossip { + return + } comm.lock.Lock() defer comm.lock.Unlock() for _, conn := range comm.streams { @@ -203,6 +209,14 @@ func (g *gossipInstance) Ping(context.Context, *proto.Empty) (*proto.Empty, erro } func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance { + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true) +} + +func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance { + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false) +} + +func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool) *gossipInstance { comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), @@ -212,6 +226,7 @@ func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *goss detectedDead: make(chan string, 10000), lock: &sync.RWMutex{}, lastSeqs: make(map[string]uint64), + shouldGossip: shouldGossip, } endpoint := fmt.Sprintf("localhost:%d", port) @@ -230,7 +245,7 @@ func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *goss discSvc := NewDiscoveryService(bootstrapPeers, self, comm, comm) discSvc.(*gossipDiscoveryImpl).logger.SetLevel(logging.WARNING) - gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll} + gossInst := &gossipInstance{comm: comm, gRGCserv: s, Discovery: discSvc, lsnr: ll, shouldGossip: shouldGossip} proto.RegisterGossipServer(s, gossInst) go s.Serve(ll) @@ -287,19 +302,34 @@ func TestUpdate(t *testing.T) { return true } - waitUntilOrFail(t, checkMembership) + stopInstances(t, instances) +} - stopAction := &sync.WaitGroup{} - for _, inst := range instances { - stopAction.Add(1) - go func(inst *gossipInstance) { - defer stopAction.Done() - inst.Stop() - }(inst) - } +func TestInitiateSync(t *testing.T) { + nodeNum := 10 + bootPeers := []string{bootPeer(3611), bootPeer(3612)} + instances := []*gossipInstance{} - waitUntilOrFailBlocking(t, stopAction.Wait) + toDie := int32(0) + for i := 1; i <= nodeNum; i++ { + id := fmt.Sprintf("d%d", i) + inst := createDiscoveryInstanceWithNoGossip(3610+i, id, bootPeers) + instances = append(instances, inst) + go func() { + for { + if atomic.LoadInt32(&toDie) == int32(1) { + return + } + time.Sleep(aliveExpirationTimeout / 3) + inst.InitiateSync(9) + } + }() + } + time.Sleep(aliveExpirationTimeout * 4) + assertMembership(t, instances, nodeNum-1) + atomic.StoreInt32(&toDie, int32(1)) + stopInstances(t, instances) } func TestExpiration(t *testing.T) { @@ -319,21 +349,12 @@ func TestExpiration(t *testing.T) { instances = append(instances, inst) } - fullMembership := func() bool { - return nodeNum-1 == len(instances[nodeNum-1].GetMembership()) - } - - waitUntilOrFail(t, fullMembership) + assertMembership(t, instances, nodeNum-1) waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop) waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop) - time.Sleep(time.Duration(2) * time.Second) - membershipReduced := func() bool { - return nodeNum-3 == len(instances[0].GetMembership()) - } - - waitUntilOrFail(t, membershipReduced) + assertMembership(t, instances, nodeNum-3) stopAction := &sync.WaitGroup{} for i, inst := range instances { @@ -367,21 +388,8 @@ func TestGetFullMembership(t *testing.T) { instances = append(instances, inst) } - fullMembership := func() bool { - return nodeNum - 1 == len(instances[nodeNum-1].GetMembership()) - } - waitUntilOrFail(t, fullMembership) - - stopAction := &sync.WaitGroup{} - for _, inst := range instances { - stopAction.Add(1) - go func(inst *gossipInstance) { - defer stopAction.Done() - inst.Stop() - }(inst) - } - - waitUntilOrFailBlocking(t, stopAction.Wait) + assertMembership(t, instances, nodeNum-1) + stopInstances(t, instances) } func TestGossipDiscoveryStopping(t *testing.T) { @@ -391,6 +399,38 @@ func TestGossipDiscoveryStopping(t *testing.T) { } +func TestConvergence(t *testing.T) { + // scenario: + // {boot peer: [peer list]} + // {d1: d2, d3, d4} + // {d5: d6, d7, d8} + // {d9: d10, d11, d12} + // connect all boot peers with d13 + // take down d13 + // ensure still full membership + instances := []*gossipInstance{} + for _, i := range []int{1, 5, 9} { + bootPort := 4610 + i + id := fmt.Sprintf("d%d", i) + leader := createDiscoveryInstance(bootPort, id, []string{}) + instances = append(instances, leader) + for minionIndex := 1; minionIndex <= 3; minionIndex++ { + id := fmt.Sprintf("d%d", i+minionIndex) + minion := createDiscoveryInstance(4610+minionIndex+i, id, []string{bootPeer(bootPort)}) + instances = append(instances, minion) + } + } + + assertMembership(t, instances, 3) + connector := createDiscoveryInstance(4623, fmt.Sprintf("d13"), []string{bootPeer(4611), bootPeer(4615), bootPeer(4619)}) + instances = append(instances, connector) + assertMembership(t, instances, 12) + connector.Stop() + instances = instances[:len(instances)-1] + assertMembership(t, instances, 11) + stopInstances(t, instances) +} + func waitUntilOrFail(t *testing.T, pred func() bool) { start := time.Now() limit := start.UnixNano() + timeout.Nanoseconds() @@ -417,3 +457,28 @@ func waitUntilOrFailBlocking(t *testing.T, f func()) { } assert.Fail(t, "Timeout expired!") } + +func stopInstances(t *testing.T, instances []*gossipInstance) { + stopAction := &sync.WaitGroup{} + for _, inst := range instances { + stopAction.Add(1) + go func(inst *gossipInstance) { + defer stopAction.Done() + inst.Stop() + }(inst) + } + + waitUntilOrFailBlocking(t, stopAction.Wait) +} + +func assertMembership(t *testing.T, instances []*gossipInstance, expectedNum int) { + fullMembership := func() bool { + for _, inst := range instances { + if len(inst.GetMembership()) == expectedNum { + return true + } + } + return false + } + waitUntilOrFail(t, fullMembership) +}