diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 4ac50d5c180..2f67458ea4a 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -80,7 +80,7 @@ type gossipDiscoveryImpl struct { aliveMembership *util.MembershipStore deadMembership *util.MembershipStore - msgStore msgstore.MessageStore + msgStore *aliveMsgStore bootstrapPeers []string @@ -114,25 +114,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS disclosurePolicy: disPol, } - policy := proto.NewGossipMessageComparator(0) - trigger := func(m interface{}) {} - aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor - externalLock := func() { d.lock.Lock() } - externalUnlock := func() { d.lock.Unlock() } - callback := func(m interface{}) { - msg := m.(*proto.SignedGossipMessage) - if !msg.IsAliveMsg() { - return - } - id := msg.GetAliveMsg().Membership.PkiId - d.aliveMembership.Remove(id) - d.deadMembership.Remove(id) - delete(d.id2Member, string(id)) - delete(d.deadLastTS, string(id)) - delete(d.aliveLastTS, string(id)) - } - - d.msgStore = msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback) + d.msgStore = newAliveMsgStore(d) go d.periodicalSendAlive() go d.periodicalCheckAlive() @@ -325,7 +307,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { return } - if d.msgStore.CheckValid(m) { + if d.msgStore.CheckValid(selfInfoGossipMsg) { d.handleAliveMessage(selfInfoGossipMsg) } @@ -364,10 +346,9 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { return } - if d.msgStore.CheckValid(m) { + if d.msgStore.CheckValid(am) { d.handleAliveMessage(am) } - } for _, env := range memResp.Dead { @@ -381,7 +362,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { continue } - if !d.msgStore.CheckValid(m) { + if !d.msgStore.CheckValid(dm) { //Newer alive message exist return } @@ -965,3 +946,46 @@ func filterOutLocalhost(endpoints []string, port int) []string { } return returnedEndpoints } + +type aliveMsgStore struct { + msgstore.MessageStore +} + +func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { + policy := proto.NewGossipMessageComparator(0) + trigger := func(m interface{}) {} + aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor + externalLock := func() { d.lock.Lock() } + externalUnlock := func() { d.lock.Unlock() } + callback := func(m interface{}) { + msg := m.(*proto.SignedGossipMessage) + if !msg.IsAliveMsg() { + return + } + id := msg.GetAliveMsg().Membership.PkiId + d.aliveMembership.Remove(id) + d.deadMembership.Remove(id) + delete(d.id2Member, string(id)) + delete(d.deadLastTS, string(id)) + delete(d.aliveLastTS, string(id)) + } + + s := &aliveMsgStore{ + MessageStore: msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback), + } + return s +} + +func (s *aliveMsgStore) Add(msg interface{}) bool { + if !msg.(*proto.SignedGossipMessage).IsAliveMsg() { + panic(fmt.Sprint("Msg ", msg, " is not AliveMsg")) + } + return s.MessageStore.Add(msg) +} + +func (s *aliveMsgStore) CheckValid(msg interface{}) bool { + if !msg.(*proto.SignedGossipMessage).IsAliveMsg() { + panic(fmt.Sprint("Msg ", msg, " is not AliveMsg")) + } + return s.MessageStore.CheckValid(msg) +} diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index a5b235f56ed..32cf9d629cb 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -17,6 +17,7 @@ limitations under the License. package discovery import ( + "bytes" "fmt" "io" "math/rand" @@ -184,6 +185,10 @@ func (comm *dummyCommModule) CloseConn(peer *NetworkMember) { comm.conns[peer.Endpoint].Close() } +func (g *gossipInstance) discoveryImpl() *gossipDiscoveryImpl { + return g.Discovery.(*gossipDiscoveryImpl) +} + func (g *gossipInstance) initiateSync(frequency time.Duration, peerNum int) { g.syncInitiator = time.NewTicker(frequency) g.stopChan = make(chan struct{}) @@ -510,7 +515,7 @@ func TestExpiration(t *testing.T) { waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop) waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop) - assertMembership(t, instances, nodeNum-3) + assertMembership(t, instances[:len(instances)-2], nodeNum-3) stopAction := &sync.WaitGroup{} for i, inst := range instances { @@ -823,6 +828,9 @@ func TestFilterOutLocalhost(t *testing.T) { } func TestMsgStoreExpiration(t *testing.T) { + // Starts 4 instances, wait for membership to build, stop 2 instances + // Check that membership in 2 running instances become 2 + // Wait for expiration and check that alive messages and related entities in maps are removed in running instances t.Parallel() nodeNum := 4 bootPeers := []string{bootPeer(12611), bootPeer(12612)} @@ -845,66 +853,293 @@ func TestMsgStoreExpiration(t *testing.T) { waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop) waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop) - assertMembership(t, instances, nodeNum-3) + assertMembership(t, instances[:len(instances)-2], nodeNum-3) checkMessages := func() bool { for _, inst := range instances[:len(instances)-2] { for _, downInst := range instances[len(instances)-2:] { - downCastInst := inst.Discovery.(*gossipDiscoveryImpl) + downCastInst := inst.discoveryImpl() downCastInst.lock.RLock() - if _, exist := downCastInst.aliveLastTS[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist { + if _, exist := downCastInst.aliveLastTS[string(downInst.discoveryImpl().self.PKIid)]; exist { downCastInst.lock.RUnlock() return false } - if _, exist := downCastInst.deadLastTS[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist { + if _, exist := downCastInst.deadLastTS[string(downInst.discoveryImpl().self.PKIid)]; exist { downCastInst.lock.RUnlock() return false } - if _, exist := downCastInst.id2Member[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist { + if _, exist := downCastInst.id2Member[string(downInst.discoveryImpl().self.PKIid)]; exist { downCastInst.lock.RUnlock() return false } - if downCastInst.aliveMembership.MsgByID(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid) != nil { + if downCastInst.aliveMembership.MsgByID(downInst.discoveryImpl().self.PKIid) != nil { downCastInst.lock.RUnlock() return false } - if downCastInst.deadMembership.MsgByID(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid) != nil { + if downCastInst.deadMembership.MsgByID(downInst.discoveryImpl().self.PKIid) != nil { downCastInst.lock.RUnlock() return false } + for _, am := range downCastInst.msgStore.Get() { + m := am.(*proto.SignedGossipMessage).GetAliveMsg() + if bytes.Equal(m.Membership.PkiId, downInst.discoveryImpl().self.PKIid) { + downCastInst.lock.RUnlock() + return false + } + } downCastInst.lock.RUnlock() } } return true } - waitUntilTimeoutOrFail(t, checkMessages, timeout*2) + waitUntilTimeoutOrFail(t, checkMessages, getAliveExpirationTimeout()*(msgExpirationFactor+5)) assertMembership(t, instances[:len(instances)-2], nodeNum-3) - peerToResponse := &NetworkMember{ - Metadata: []byte{}, - PKIid: []byte(fmt.Sprintf("localhost:%d", 12612)), - Endpoint: fmt.Sprintf("localhost:%d", 12612), - InternalEndpoint: fmt.Sprintf("localhost:%d", 12612), + stopInstances(t, instances[:len(instances)-2]) +} + +func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { + // Creates 3 discovery instances without gossip communication + // Generates MembershipRequest msg for each instance using createMembershipRequest + // Generates Alive msg for each instance using createAliveMessage + // Builds membership using Alive msgs + // Checks msgStore and related maps + // Generates MembershipResponse msgs for each instance using createMembershipResponse + // Generates new set of Alive msgs and processes them + // Checks msgStore and related maps + // Waits for expiration and checks msgStore and related maps + // Processes stored MembershipRequest msg and checks msgStore and related maps + // Processes stored MembershipResponse msg and checks msgStore and related maps + + t.Parallel() + bootPeers := []string{} + peersNum := 3 + instances := []*gossipInstance{} + aliveMsgs := []*proto.SignedGossipMessage{} + newAliveMsgs := []*proto.SignedGossipMessage{} + memReqMsgs := []*proto.SignedGossipMessage{} + memRespMsgs := make(map[int][]*proto.MembershipResponse) + + for i := 0; i < peersNum; i++ { + id := fmt.Sprintf("d%d", i) + inst := createDiscoveryInstanceWithNoGossip(22610+i, id, bootPeers) + instances = append(instances, inst) } - downCastInstance := instances[0].Discovery.(*gossipDiscoveryImpl) - memResp := downCastInstance.createMembershipResponse(peerToResponse) + // Creating MembershipRequest messages + for i := 0; i < peersNum; i++ { + memReqMsg := instances[i].discoveryImpl().createMembershipRequest(true) + memReqMsgs = append(memReqMsgs, memReqMsg) + } + // Creating Alive messages + for i := 0; i < peersNum; i++ { + aliveMsg := instances[i].discoveryImpl().createAliveMessage(true) + aliveMsgs = append(aliveMsgs, aliveMsg) + } - downCastInstance.comm.SendToPeer(peerToResponse, (&proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Nonce: uint64(0), - Content: &proto.GossipMessage_MemRes{ - MemRes: memResp, - }, - }).NoopSign()) + repeatForFiltered := func(n int, filter func(i int) bool, action func(i int)) { + for i := 0; i < n; i++ { + if filter(i) { + continue + } + action(i) + } + } - time.Sleep(getAliveExpirationTimeout()) + // Handling Alive + for i := 0; i < peersNum; i++ { + for k := 0; k < peersNum; k++ { + instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k]) + } + } - assert.True(t, checkMessages(), "Validating lost message with already dead and expired nodes failed") + checkExistence := func(instances []*gossipInstance, msgs []*proto.SignedGossipMessage, index int, i int, step string) { + _, exist := instances[index].discoveryImpl().aliveLastTS[string(instances[i].discoveryImpl().self.PKIid)] + assert.True(t, exist, fmt.Sprint(step, " Data from alive msg ", i, " doesn't exist in aliveLastTS of discovery inst ", index)) + + _, exist = instances[index].discoveryImpl().id2Member[string(string(instances[i].discoveryImpl().self.PKIid))] + assert.True(t, exist, fmt.Sprint(step, " id2Member mapping doesn't exist for alive msg ", i, " of discovery inst ", index)) + + assert.NotNil(t, instances[index].discoveryImpl().aliveMembership.MsgByID(instances[i].discoveryImpl().self.PKIid), fmt.Sprint(step, " Alive msg", i, " not exist in aliveMembership of discovery inst ", index)) + + assert.Contains(t, instances[index].discoveryImpl().msgStore.Get(), msgs[i], fmt.Sprint(step, " Alive msg ", i, "not stored in store of discovery inst ", index)) + } + + checkAliveMsgExist := func(instances []*gossipInstance, msgs []*proto.SignedGossipMessage, index int, step string) { + instances[index].discoveryImpl().lock.RLock() + defer instances[index].discoveryImpl().lock.RUnlock() + repeatForFiltered(peersNum, + func(k int) bool { + return k == index + }, + func(k int) { + checkExistence(instances, msgs, index, k, step) + }) + } + + // Checking is Alive was processed + for i := 0; i < peersNum; i++ { + checkAliveMsgExist(instances, aliveMsgs, i, "[Step 1 - proccessing aliveMsg]") + } + + // Creating MembershipResponse while all instances have full membership + for i := 0; i < peersNum; i++ { + peerToResponse := &NetworkMember{ + Metadata: []byte{}, + PKIid: []byte(fmt.Sprintf("localhost:%d", 22610+i)), + Endpoint: fmt.Sprintf("localhost:%d", 22610+i), + InternalEndpoint: fmt.Sprintf("localhost:%d", 22610+i), + } + memRespMsgs[i] = []*proto.MembershipResponse{} + repeatForFiltered(peersNum, + func(k int) bool { + return k == i + }, + func(k int) { + memResp := instances[k].discoveryImpl().createMembershipResponse(peerToResponse) + memRespMsgs[i] = append(memRespMsgs[i], memResp) + }) + } + + // Re-creating Alive msgs with highest seq_num, to make sure Alive msgs in memReq and memResp are older + for i := 0; i < peersNum; i++ { + aliveMsg := instances[i].discoveryImpl().createAliveMessage(true) + newAliveMsgs = append(newAliveMsgs, aliveMsg) + } + + // Handling new Alive set + for i := 0; i < peersNum; i++ { + for k := 0; k < peersNum; k++ { + instances[i].discoveryImpl().handleMsgFromComm(newAliveMsgs[k]) + } + } + + // Checking is new Alive was processed + for i := 0; i < peersNum; i++ { + checkAliveMsgExist(instances, newAliveMsgs, i, "[Step 2 - proccesing aliveMsg]") + } + + checkAliveMsgNotExist := func(instances []*gossipInstance, msgs []*proto.SignedGossipMessage, index int, step string) { + instances[index].discoveryImpl().lock.RLock() + defer instances[index].discoveryImpl().lock.RUnlock() + assert.Empty(t, instances[index].discoveryImpl().aliveLastTS, fmt.Sprint(step, " Data from alive msg still exists in aliveLastTS of discovery inst ", index)) + assert.Empty(t, instances[index].discoveryImpl().deadLastTS, fmt.Sprint(step, " Data from alive msg still exists in deadLastTS of discovery inst ", index)) + assert.Empty(t, instances[index].discoveryImpl().id2Member, fmt.Sprint(step, " id2Member mapping still still contains data related to Alive msg: discovery inst ", index)) + assert.Empty(t, instances[index].discoveryImpl().msgStore.Get(), fmt.Sprint(step, " Expired Alive msg still stored in store of discovery inst ", index)) + assert.Zero(t, instances[index].discoveryImpl().aliveMembership.Size(), fmt.Sprint(step, " Alive membership list is not empty, discovery instance", index)) + assert.Zero(t, instances[index].discoveryImpl().deadMembership.Size(), fmt.Sprint(step, " Dead membership list is not empty, discovery instance", index)) + } + + // Sleep until expire + time.Sleep(getAliveExpirationTimeout() * (msgExpirationFactor + 5)) + + // Checking Alive expired + for i := 0; i < peersNum; i++ { + checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step3 - expiration in msg store]") + } + + // Processing old MembershipRequest + for i := 0; i < peersNum; i++ { + repeatForFiltered(peersNum, + func(k int) bool { + return k == i + }, + func(k int) { + instances[i].discoveryImpl().handleMsgFromComm(memReqMsgs[k]) + }) + } + + // MembershipRequest processing didn't change anything + for i := 0; i < peersNum; i++ { + checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step4 - memReq processing after expiration]") + } + + // Processing old (later) Alive messages + for i := 0; i < peersNum; i++ { + for k := 0; k < peersNum; k++ { + instances[i].discoveryImpl().handleMsgFromComm(aliveMsgs[k]) + } + } + + // Alive msg processing didn't change anything + for i := 0; i < peersNum; i++ { + checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step5.1 - after lost old aliveMsg process]") + checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step5.2 - after lost new aliveMsg process]") + } + + // Handling old MembershipResponse messages + for i := 0; i < peersNum; i++ { + respForPeer := memRespMsgs[i] + for _, msg := range respForPeer { + instances[i].discoveryImpl().handleMsgFromComm((&proto.GossipMessage{ + Tag: proto.GossipMessage_EMPTY, + Nonce: uint64(0), + Content: &proto.GossipMessage_MemRes{ + MemRes: msg, + }, + }).NoopSign()) + } + } + + // MembershipResponse msg processing didn't change anything + for i := 0; i < peersNum; i++ { + checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step6 - after lost MembershipResp process]") + } + + for i := 0; i < peersNum; i++ { + instances[i].Stop() + } - stopInstances(t, instances[:len(instances)-2]) +} + +func TestAliveMsgStore(t *testing.T) { + t.Parallel() + + bootPeers := []string{} + peersNum := 2 + instances := []*gossipInstance{} + aliveMsgs := []*proto.SignedGossipMessage{} + memReqMsgs := []*proto.SignedGossipMessage{} + + for i := 0; i < peersNum; i++ { + id := fmt.Sprintf("d%d", i) + inst := createDiscoveryInstanceWithNoGossip(32610+i, id, bootPeers) + instances = append(instances, inst) + } + + // Creating MembershipRequest messages + for i := 0; i < peersNum; i++ { + memReqMsg := instances[i].discoveryImpl().createMembershipRequest(true) + memReqMsgs = append(memReqMsgs, memReqMsg) + } + // Creating Alive messages + for i := 0; i < peersNum; i++ { + aliveMsg := instances[i].discoveryImpl().createAliveMessage(true) + aliveMsgs = append(aliveMsgs, aliveMsg) + } + + //Check new alive msgs + for _, msg := range aliveMsgs { + assert.True(t, instances[0].discoveryImpl().msgStore.CheckValid(msg), "aliveMsgStore CheckValid returns false on new AliveMsg") + } + + // Add new alive msgs + for _, msg := range aliveMsgs { + assert.True(t, instances[0].discoveryImpl().msgStore.Add(msg), "aliveMsgStore Add returns false on new AliveMsg") + } + + // Check exist alive msgs + for _, msg := range aliveMsgs { + assert.False(t, instances[0].discoveryImpl().msgStore.CheckValid(msg), "aliveMsgStore CheckValid returns true on existing AliveMsg") + } + + // Check non-alive msgs + for _, msg := range memReqMsgs { + assert.Panics(t, func() { instances[1].discoveryImpl().msgStore.CheckValid(msg) }, "aliveMsgStore CheckValid should panic on new MembershipRequest msg") + assert.Panics(t, func() { instances[1].discoveryImpl().msgStore.Add(msg) }, "aliveMsgStore Add should panic on new MembershipRequest msg") + } } func waitUntilOrFail(t *testing.T, pred func() bool) { @@ -954,11 +1189,11 @@ func stopInstances(t *testing.T, instances []*gossipInstance) { func assertMembership(t *testing.T, instances []*gossipInstance, expectedNum int) { fullMembership := func() bool { for _, inst := range instances { - if len(inst.GetMembership()) == expectedNum { - return true + if len(inst.GetMembership()) != expectedNum { + return false } } - return false + return true } waitUntilOrFail(t, fullMembership) }