Skip to content

Commit

Permalink
[FAB-2779] Clear data in discovery
Browse files Browse the repository at this point in the history
Removing expired alive msgs from discovery module msg store
and references to those messages inside discovery module itself

Change-Id: I76579209d0ff67e48d230864fff7a867217b84c6
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed Apr 9, 2017
1 parent aa84135 commit 5dbb05a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 7 deletions.
45 changes: 38 additions & 7 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
)

const defaultHelloInterval = time.Duration(5) * time.Second
const msgExpirationFactor = 20

var aliveExpirationCheckInterval time.Duration
var maxConnectionAttempts = 120
Expand Down Expand Up @@ -105,7 +106,6 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
id2Member: make(map[string]*NetworkMember),
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
msgStore: msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
Expand All @@ -115,6 +115,26 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
disclosurePolicy: disPol,
}

policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}

d.msgStore = msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback)

go d.periodicalSendAlive()
go d.periodicalCheckAlive()
go d.handleMessages()
Expand Down Expand Up @@ -308,7 +328,9 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

d.handleAliveMessage(selfInfoGossipMsg)
if d.msgStore.CheckValid(m) {
d.handleAliveMessage(selfInfoGossipMsg)
}

var internalEndpoint string
if m.Envelope.SecretEnvelope != nil {
Expand All @@ -323,13 +345,13 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
}

if m.IsAliveMsg() {
added := d.msgStore.Add(m)
if !added {

if !d.msgStore.Add(m) {
return
}
d.comm.Gossip(m)

d.handleAliveMessage(m)

d.comm.Gossip(m)
return
}

Expand All @@ -345,7 +367,10 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

d.handleAliveMessage(am)
if d.msgStore.CheckValid(m) {
d.handleAliveMessage(am)
}

}

for _, env := range memResp.Dead {
Expand All @@ -359,6 +384,11 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
continue
}

if !d.msgStore.CheckValid(m) {
//Newer alive message exist
return
}

newDeadMembers := []*proto.SignedGossipMessage{}
d.lock.RLock()
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
Expand Down Expand Up @@ -902,6 +932,7 @@ func (d *gossipDiscoveryImpl) Stop() {
defer d.logger.Info("Stopped")
d.logger.Info("Stopping")
atomic.StoreInt32(&d.toDieFlag, int32(1))
d.msgStore.Stop()
d.toDieChan <- struct{}{}
}

Expand Down
89 changes: 89 additions & 0 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,96 @@ func TestFilterOutLocalhost(t *testing.T) {
assert.NotEqual(t, endpoints[2], endpoints[0])
}

func TestMsgStoreExpiration(t *testing.T) {
t.Parallel()
nodeNum := 4
bootPeers := []string{bootPeer(12611), bootPeer(12612)}
instances := []*gossipInstance{}

inst := createDiscoveryInstance(12611, "d1", bootPeers)
instances = append(instances, inst)

inst = createDiscoveryInstance(12612, "d2", bootPeers)
instances = append(instances, inst)

for i := 3; i <= nodeNum; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstance(12610+i, id, bootPeers)
instances = append(instances, inst)
}

assertMembership(t, instances, nodeNum-1)

waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)

assertMembership(t, instances, nodeNum-3)

checkMessages := func() bool {
for _, inst := range instances[:len(instances)-2] {
for _, downInst := range instances[len(instances)-2:] {
downCastInst := inst.Discovery.(*gossipDiscoveryImpl)
downCastInst.lock.RLock()
if _, exist := downCastInst.aliveLastTS[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist {
downCastInst.lock.RUnlock()
return false
}
if _, exist := downCastInst.deadLastTS[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist {
downCastInst.lock.RUnlock()
return false
}
if _, exist := downCastInst.id2Member[string(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid)]; exist {
downCastInst.lock.RUnlock()
return false
}
if downCastInst.aliveMembership.MsgByID(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid) != nil {
downCastInst.lock.RUnlock()
return false
}
if downCastInst.deadMembership.MsgByID(downInst.Discovery.(*gossipDiscoveryImpl).self.PKIid) != nil {
downCastInst.lock.RUnlock()
return false
}
downCastInst.lock.RUnlock()
}
}
return true
}

waitUntilTimeoutOrFail(t, checkMessages, timeout*2)

assertMembership(t, instances[:len(instances)-2], nodeNum-3)

peerToResponse := &NetworkMember{
Metadata: []byte{},
PKIid: []byte(fmt.Sprintf("localhost:%d", 12612)),
Endpoint: fmt.Sprintf("localhost:%d", 12612),
InternalEndpoint: fmt.Sprintf("localhost:%d", 12612),
}

downCastInstance := instances[0].Discovery.(*gossipDiscoveryImpl)
memResp := downCastInstance.createMembershipResponse(peerToResponse)

downCastInstance.comm.SendToPeer(peerToResponse, (&proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Content: &proto.GossipMessage_MemRes{
MemRes: memResp,
},
}).NoopSign())

time.Sleep(getAliveExpirationTimeout())

assert.True(t, checkMessages(), "Validating lost message with already dead and expired nodes failed")

stopInstances(t, instances[:len(instances)-2])
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
waitUntilTimeoutOrFail(t, pred, timeout)
}

func waitUntilTimeoutOrFail(t *testing.T, pred func() bool, timeout time.Duration) {
start := time.Now()
limit := start.UnixNano() + timeout.Nanoseconds()
for time.Now().UnixNano() < limit {
Expand Down

0 comments on commit 5dbb05a

Please sign in to comment.