Skip to content

Commit

Permalink
Merge "[FAB-3446] Bug - Alive msgs in MemReq and MemResp"
Browse files Browse the repository at this point in the history
  • Loading branch information
C0rWin authored and Gerrit Code Review committed May 8, 2017
2 parents 7f1256d + c01a433 commit 78d59b1
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 53 deletions.
72 changes: 48 additions & 24 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type gossipDiscoveryImpl struct {
aliveMembership *util.MembershipStore
deadMembership *util.MembershipStore

msgStore msgstore.MessageStore
msgStore *aliveMsgStore

bootstrapPeers []string

Expand Down Expand Up @@ -114,25 +114,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
disclosurePolicy: disPol,
}

policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}

d.msgStore = msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback)
d.msgStore = newAliveMsgStore(d)

go d.periodicalSendAlive()
go d.periodicalCheckAlive()
Expand Down Expand Up @@ -325,7 +307,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

if d.msgStore.CheckValid(m) {
if d.msgStore.CheckValid(selfInfoGossipMsg) {
d.handleAliveMessage(selfInfoGossipMsg)
}

Expand Down Expand Up @@ -364,10 +346,9 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
return
}

if d.msgStore.CheckValid(m) {
if d.msgStore.CheckValid(am) {
d.handleAliveMessage(am)
}

}

for _, env := range memResp.Dead {
Expand All @@ -381,7 +362,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
continue
}

if !d.msgStore.CheckValid(m) {
if !d.msgStore.CheckValid(dm) {
//Newer alive message exist
return
}
Expand Down Expand Up @@ -965,3 +946,46 @@ func filterOutLocalhost(endpoints []string, port int) []string {
}
return returnedEndpoints
}

type aliveMsgStore struct {
msgstore.MessageStore
}

func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}

s := &aliveMsgStore{
MessageStore: msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback),
}
return s
}

func (s *aliveMsgStore) Add(msg interface{}) bool {
if !msg.(*proto.SignedGossipMessage).IsAliveMsg() {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.Add(msg)
}

func (s *aliveMsgStore) CheckValid(msg interface{}) bool {
if !msg.(*proto.SignedGossipMessage).IsAliveMsg() {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.CheckValid(msg)
}
Loading

0 comments on commit 78d59b1

Please sign in to comment.