Skip to content

Commit

Permalink
[FAB-2198] Adjust gossip membership layer
Browse files Browse the repository at this point in the history
Continuation of:
	https://gerrit.hyperledger.org/r/#/c/5903/
	Introduce envelopes to gossip message

The gossip membership module has a storage of GossipMessages.
It needs to be refactored to hold SignedMessages instead, but
SignedMessages don't have any meaningful methods because they are
only envelopes to GossipMessages.

Therefore, this commit introduces an internal message type to
the gossip membership layer, that has both the SignedGossipMessage
and the GossipMessage as pointers.

Also- previously the cachedMembership field was used but now
can't be used because it's a raw protobuf structure.
Therefore, I refactored it to a 'membershipStore' which is just
a nice wrapper around a map from PKI-ID --> message.

Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Change-Id: I2251c912b6a4610f6565f51c67bbb89d5d26b94f
  • Loading branch information
yacovm committed Feb 12, 2017
1 parent 5dbe29e commit 248d48c
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 69 deletions.
115 changes: 46 additions & 69 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func SetReconnectInterval(interval time.Duration) {
reconnectInterval = interval
}

func samePKIidAliveMessage(a interface{}, b interface{}) bool {
return equalPKIid(a.(*proto.GossipMessage).GetAliveMsg().Membership.PkiID, b.(*proto.GossipMessage).GetAliveMsg().Membership.PkiID)
}

type timestamp struct {
incTime time.Time
seqNum uint64
Expand All @@ -71,13 +67,14 @@ func (ts *timestamp) String() string {
}

type gossipDiscoveryImpl struct {
incTime uint64
seqNum uint64
self NetworkMember
deadLastTS map[string]*timestamp // H
aliveLastTS map[string]*timestamp // V
id2Member map[string]*NetworkMember // all known members
cachedMembership *proto.MembershipResponse
incTime uint64
seqNum uint64
self NetworkMember
deadLastTS map[string]*timestamp // H
aliveLastTS map[string]*timestamp // V
id2Member map[string]*NetworkMember // all known members
aliveMembership membershipStore
deadMembership membershipStore

bootstrapPeers []string

Expand All @@ -93,22 +90,20 @@ type gossipDiscoveryImpl struct {
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery {
d := &gossipDiscoveryImpl{
self: self,
incTime: uint64(time.Now().UnixNano()),
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
id2Member: make(map[string]*NetworkMember),
cachedMembership: &proto.MembershipResponse{
Alive: make([]*proto.GossipMessage, 0),
Dead: make([]*proto.GossipMessage, 0),
},
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint.Endpoint),
self: self,
incTime: uint64(time.Now().UnixNano()),
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
id2Member: make(map[string]*NetworkMember),
aliveMembership: make(membershipStore, 0),
deadMembership: make(membershipStore, 0),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint.Endpoint),
}

go d.periodicalSendAlive()
Expand Down Expand Up @@ -188,14 +183,15 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {

d.lock.RLock()

n := len(d.cachedMembership.Alive)
n := len(d.aliveMembership)
k := peerNum
if k > n {
k = n
}

aliveMembersAsSlice := d.aliveMembership.ToSlice()
for _, i := range util.GetRandomIndices(k, n-1) {
pulledPeer := d.cachedMembership.Alive[i].GetAliveMsg().Membership
pulledPeer := aliveMembersAsSlice[i].GetAliveMsg().Membership
netMember := &NetworkMember{
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
Expand Down Expand Up @@ -335,7 +331,7 @@ func (d *gossipDiscoveryImpl) createMembershipResponse(known [][]byte) *proto.Me

deadPeers := []*proto.GossipMessage{}

for _, dm := range d.cachedMembership.Dead {
for _, dm := range d.deadMembership.ToSlice() {
isKnown := false
for _, knownPeer := range known {
if equalPKIid(knownPeer, dm.GetAliveMsg().Membership.PkiID) {
Expand All @@ -344,13 +340,19 @@ func (d *gossipDiscoveryImpl) createMembershipResponse(known [][]byte) *proto.Me
}
}
if !isKnown {
deadPeers = append(deadPeers, dm)
deadPeers = append(deadPeers, dm.GossipMessage)
break
}
}

aliveMembersAsSlice := d.aliveMembership.ToSlice()
aliveSnapshot := make([]*proto.GossipMessage, len(aliveMembersAsSlice))
for i, msg := range aliveMembersAsSlice {
aliveSnapshot[i] = msg.GossipMessage
}

return &proto.MembershipResponse{
Alive: append(d.cachedMembership.Alive, aliveMsg),
Alive: append(aliveSnapshot, aliveMsg),
Dead: deadPeers,
}
}
Expand Down Expand Up @@ -441,24 +443,10 @@ func (d *gossipDiscoveryImpl) resurrectMember(am *proto.GossipMessage, t proto.P
PKIid: member.PkiID,
InternalEndpoint: member.InternalEndpoint,
}
delete(d.deadLastTS, string(pkiID))

aliveMsgWithID := &proto.GossipMessage{
Content: &proto.GossipMessage_AliveMsg{
AliveMsg: &proto.AliveMessage{
Membership: &proto.Member{PkiID: pkiID},
},
},
}

i := util.IndexInSlice(d.cachedMembership.Dead, aliveMsgWithID, samePKIidAliveMessage)
if i != -1 {
d.cachedMembership.Dead = append(d.cachedMembership.Dead[:i], d.cachedMembership.Dead[i+1:]...)
}

if util.IndexInSlice(d.cachedMembership.Alive, am, samePKIidAliveMessage) == -1 {
d.cachedMembership.Alive = append(d.cachedMembership.Alive, am)
}
delete(d.deadLastTS, string(pkiID))
d.deadMembership.Remove(common.PKIidType(pkiID))
d.aliveMembership.Put(common.PKIidType(pkiID), &message{GossipMessage: am})
}

func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
Expand Down Expand Up @@ -559,19 +547,9 @@ func (d *gossipDiscoveryImpl) expireDeadMembers(dead []common.PKIidType) {
delete(d.aliveLastTS, string(pkiID))
}

aliveMsgWithPKIid := &proto.GossipMessage{
Content: &proto.GossipMessage_AliveMsg{
AliveMsg: &proto.AliveMessage{
Membership: &proto.Member{PkiID: pkiID},
},
},
}
aliveMemberIndex := util.IndexInSlice(d.cachedMembership.Alive, aliveMsgWithPKIid, samePKIidAliveMessage)
if aliveMemberIndex != -1 {
// Move the alive member to the dead members
d.cachedMembership.Dead = append(d.cachedMembership.Dead, d.cachedMembership.Alive[aliveMemberIndex])
// Delete the alive member from the cached membership
d.cachedMembership.Alive = append(d.cachedMembership.Alive[:aliveMemberIndex], d.cachedMembership.Alive[aliveMemberIndex+1:]...)
if am := d.aliveMembership.msgByID(pkiID); am != nil {
d.deadMembership.Put(pkiID, am)
d.aliveMembership.Remove(pkiID)
}
}

Expand Down Expand Up @@ -677,13 +655,12 @@ func (d *gossipDiscoveryImpl) learnExistingMembers(aliveArr []*proto.GossipMessa
alive.lastSeen = time.Now()
alive.seqNum = am.Timestamp.SeqNum

i := util.IndexInSlice(d.cachedMembership.Alive, m, samePKIidAliveMessage)
if i == -1 {
if am := d.aliveMembership.msgByID(m.GetAliveMsg().Membership.PkiID); am == nil {
d.logger.Debug("Appended", am, "to d.cachedMembership.Alive")
d.cachedMembership.Alive = append(d.cachedMembership.Alive, m)
d.aliveMembership.Put(m.GetAliveMsg().Membership.PkiID, &message{GossipMessage: m})
} else {
d.logger.Debug("Replaced", am, "in d.cachedMembership.Alive")
d.cachedMembership.Alive[i] = m
am.GossipMessage = m
}
}
}
Expand All @@ -706,7 +683,7 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
seqNum: am.GetAliveMsg().Timestamp.SeqNum,
}

d.cachedMembership.Alive = append(d.cachedMembership.Alive, am)
d.aliveMembership.Put(am.GetAliveMsg().Membership.PkiID, &message{GossipMessage: am})
d.logger.Infof("Learned about a new alive member: %v", am)
}

Expand All @@ -720,7 +697,7 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
seqNum: dm.GetAliveMsg().Timestamp.SeqNum,
}

d.cachedMembership.Dead = append(d.cachedMembership.Dead, dm)
d.deadMembership.Put(dm.GetAliveMsg().Membership.PkiID, &message{GossipMessage: dm})
d.logger.Infof("Learned about a new dead member: %v", dm)
}

Expand Down Expand Up @@ -750,7 +727,7 @@ func (d *gossipDiscoveryImpl) GetMembership() []NetworkMember {
defer d.lock.RUnlock()

response := []NetworkMember{}
for _, m := range d.cachedMembership.Alive {
for _, m := range d.aliveMembership.ToSlice() {
member := m.GetAliveMsg()
response = append(response, NetworkMember{
PKIid: member.Membership.PkiID,
Expand Down
60 changes: 60 additions & 0 deletions gossip/discovery/msgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"github.com/hyperledger/fabric/gossip/common"
proto "github.com/hyperledger/fabric/protos/gossip"
)

type message struct {
*proto.SignedGossipMessage
*proto.GossipMessage
}

type membershipStore map[string]*message

// msgByID returns a message stored by a certain ID, or nil
// if such an ID isn't found
func (m membershipStore) msgByID(pkiID common.PKIidType) *message {
if msg, exists := m[string(pkiID)]; exists {
return msg
}
return nil
}

// Put associates msg with the given pkiID
func (m membershipStore) Put(pkiID common.PKIidType, msg *message) {
m[string(pkiID)] = msg
}

// Remove removes a message with a given pkiID
func (m membershipStore) Remove(pkiID common.PKIidType) {
delete(m, string(pkiID))
}

// ToSlice returns a slice backed by the elements
// of the membershipStore
func (m membershipStore) ToSlice() []*message {
members := make([]*message, len(m))
i := 0
for _, member := range m {
members[i] = member
i++
}
return members
}
95 changes: 95 additions & 0 deletions gossip/discovery/msgs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"testing"

"github.com/hyperledger/fabric/gossip/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/stretchr/testify/assert"
)

func TestMembershipStore(t *testing.T) {
membershipStore := make(membershipStore, 0)

id1 := common.PKIidType("id1")
id2 := common.PKIidType("id2")

msg1 := &message{}
msg2 := &message{SignedGossipMessage: &proto.SignedGossipMessage{}}

// Test initially created store is empty
assert.Nil(t, membershipStore.msgByID(id1))
assert.Len(t, membershipStore, 0)
// Test put works as expected
membershipStore.Put(id1, msg1)
assert.NotNil(t, membershipStore.msgByID(id1))
// Test msgByID returns the right instance stored
membershipStore.Put(id2, msg2)
assert.Equal(t, msg1, membershipStore.msgByID(id1))
assert.NotEqual(t, msg2, membershipStore.msgByID(id1))
// Test capacity grows
assert.Len(t, membershipStore, 2)
// Test remove works
membershipStore.Remove(id1)
assert.Nil(t, membershipStore.msgByID(id1))
assert.Len(t, membershipStore, 1)
// Test returned instance is not a copy
msg3 := &message{GossipMessage: &proto.GossipMessage{}}
msg3Clone := &message{GossipMessage: &proto.GossipMessage{}}
id3 := common.PKIidType("id3")
membershipStore.Put(id3, msg3)
assert.Equal(t, msg3Clone, msg3)
membershipStore.msgByID(id3).Channel = []byte{0, 1, 2, 3}
assert.NotEqual(t, msg3Clone, msg3)
}

func TestToSlice(t *testing.T) {
membershipStore := make(membershipStore, 0)
id1 := common.PKIidType("id1")
id2 := common.PKIidType("id2")
id3 := common.PKIidType("id3")
id4 := common.PKIidType("id4")

msg1 := &message{}
msg2 := &message{SignedGossipMessage: &proto.SignedGossipMessage{}}
msg3 := &message{GossipMessage: &proto.GossipMessage{}}
msg4 := &message{GossipMessage: &proto.GossipMessage{}, SignedGossipMessage: &proto.SignedGossipMessage{}}

membershipStore.Put(id1, msg1)
membershipStore.Put(id2, msg2)
membershipStore.Put(id3, msg3)
membershipStore.Put(id4, msg4)

assert.Len(t, membershipStore.ToSlice(), 4)

existsInSlice := func(slice []*message, msg *message) bool {
for _, m := range slice {
if assert.ObjectsAreEqual(m, msg) {
return true
}
}
return false
}

expectedMsgs := []*message{msg1, msg2, msg3, msg4}
for _, msg := range membershipStore.ToSlice() {
assert.True(t, existsInSlice(expectedMsgs, msg))
}

}

0 comments on commit 248d48c

Please sign in to comment.