Skip to content

Commit

Permalink
Merge "[FAB-2007] Gossip:Add support of external endpoint"
Browse files Browse the repository at this point in the history
  • Loading branch information
hacera-jonathan authored and Gerrit Code Review committed Feb 11, 2017
2 parents 78be2f5 + 40fb3a7 commit d58349a
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 211 deletions.
21 changes: 18 additions & 3 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,29 @@ type CommService interface {

// NetworkMember is a peer's representation
type NetworkMember struct {
Endpoint string
Metadata []byte
PKIid common.PKIidType
Endpoint string
Metadata []byte
PKIid common.PKIidType
InternalEndpoint *proto.SignedEndpoint
}

// PreferredEndpoint computes the endpoint to connect to,
// while preferring internal endpoint over the standard
// endpoint
func (nm NetworkMember) PreferredEndpoint() string {
if nm.InternalEndpoint != nil && nm.InternalEndpoint.Endpoint != "" {
return nm.InternalEndpoint.Endpoint
}
return nm.Endpoint
}

// Discovery is the interface that represents a discovery module
type Discovery interface {

// Exists returns whether a peer with given
// PKI-ID is known
Exists(PKIID common.PKIidType) bool

// Self returns this instance's membership information
Self() NetworkMember

Expand Down
124 changes: 71 additions & 53 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ func (ts *timestamp) String() string {
}

type gossipDiscoveryImpl struct {
pkiID common.PKIidType
endpoint string
incTime uint64
metadata []byte

seqNum uint64

incTime uint64
seqNum uint64
self NetworkMember
deadLastTS map[string]*timestamp // H
aliveLastTS map[string]*timestamp // V
id2Member map[string]*NetworkMember // all known members
Expand All @@ -97,10 +93,8 @@ type gossipDiscoveryImpl struct {
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService) Discovery {
d := &gossipDiscoveryImpl{
endpoint: self.Endpoint,
self: self,
incTime: uint64(time.Now().UnixNano()),
metadata: self.Metadata,
pkiID: self.PKIid,
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
Expand All @@ -109,13 +103,12 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
Alive: make([]*proto.GossipMessage, 0),
Dead: make([]*proto.GossipMessage, 0),
},
crypt: crypt,
bootstrapPeers: bootstrapPeers,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.Endpoint),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint.Endpoint),
}

go d.periodicalSendAlive()
Expand All @@ -131,6 +124,15 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
return d
}

// Exists returns whether a peer with given
// PKI-ID is known
func (d *gossipDiscoveryImpl) Exists(PKIID common.PKIidType) bool {
d.lock.RLock()
defer d.lock.RUnlock()
_, exists := d.id2Member[string(PKIID)]
return exists
}

func (d *gossipDiscoveryImpl) Connect(member NetworkMember) {
d.logger.Debug("Entering", member)
defer d.logger.Debug("Exiting")
Expand Down Expand Up @@ -167,6 +169,9 @@ func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
defer wg.Done()
peer := &NetworkMember{
Endpoint: endpoint,
InternalEndpoint: &proto.SignedEndpoint{
Endpoint: endpoint,
},
}
d.comm.SendToPeer(peer, req)
}(endpoint)
Expand All @@ -192,9 +197,10 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
for _, i := range util.GetRandomIndices(k, n-1) {
pulledPeer := d.cachedMembership.Alive[i].GetAliveMsg().Membership
netMember := &NetworkMember{
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiID,
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiID,
InternalEndpoint: pulledPeer.InternalEndpoint,
}
peers2SendTo = append(peers2SendTo, netMember)
}
Expand Down Expand Up @@ -285,7 +291,7 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.GossipMessage) {

for _, dm := range memResp.Dead {
if !d.crypt.ValidateAliveMsg(m) {
d.logger.Warningf("Alive message isn't authentic, someone spoofed %s's identity", dm.GetAliveMsg().Membership.Endpoint)
d.logger.Warningf("Alive message isn't authentic, someone spoofed %s's identity", dm.GetAliveMsg().Membership)
continue
}

Expand All @@ -308,9 +314,10 @@ func (d *gossipDiscoveryImpl) sendMemResponse(member *proto.Member, known [][]by
defer d.logger.Debug("Exiting, replying with", memResp)

d.comm.SendToPeer(&NetworkMember{
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
InternalEndpoint: member.InternalEndpoint,
}, &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Expand Down Expand Up @@ -353,12 +360,12 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
defer d.logger.Debug("Exiting")

if !d.crypt.ValidateAliveMsg(m) {
d.logger.Warningf("Alive message isn't authentic, someone must be spoofing %s's identity", m.GetAliveMsg().Membership.Endpoint)
d.logger.Warningf("Alive message isn't authentic, someone must be spoofing %s's identity", m.GetAliveMsg())
return
}

pkiID := m.GetAliveMsg().Membership.PkiID
if equalPKIid(pkiID, d.pkiID) {
if equalPKIid(pkiID, d.self.PKIid) {
d.logger.Debug("Got alive message about ourselves,", m)
return
}
Expand All @@ -385,7 +392,7 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
}

if isAlive && isDead {
d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership.Endpoint)
d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership)
return
}

Expand All @@ -394,7 +401,7 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
// resurrect peer
d.resurrectMember(m, *ts)
} else if !same(lastDeadTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership.Endpoint, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
d.logger.Debug(m.GetAliveMsg().Membership, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
}
return
}
Expand All @@ -407,7 +414,7 @@ func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.GossipMessage) {
if before(lastAliveTS, ts) {
d.learnExistingMembers([]*proto.GossipMessage{m})
} else if !same(lastAliveTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership.Endpoint, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
d.logger.Debug(m.GetAliveMsg().Membership, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
}

}
Expand All @@ -429,9 +436,10 @@ func (d *gossipDiscoveryImpl) resurrectMember(am *proto.GossipMessage, t proto.P
}

d.id2Member[string(pkiID)] = &NetworkMember{
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiID,
InternalEndpoint: member.InternalEndpoint,
}
delete(d.deadLastTS, string(pkiID))

Expand Down Expand Up @@ -570,7 +578,7 @@ func (d *gossipDiscoveryImpl) expireDeadMembers(dead []common.PKIidType) {
d.lock.Unlock()

for _, member2Expire := range deadMembers2Expire {
d.logger.Warning("Closing connection to", member2Expire.Endpoint)
d.logger.Warning("Closing connection to", member2Expire)
d.comm.CloseConn(member2Expire)
}
}
Expand Down Expand Up @@ -605,9 +613,10 @@ func (d *gossipDiscoveryImpl) createAliveMessage() *proto.GossipMessage {
d.seqNum++
seqNum := d.seqNum

endpoint := d.endpoint
meta := d.metadata
pkiID := d.pkiID
endpoint := d.self.Endpoint
meta := d.self.Metadata
pkiID := d.self.PKIid
internalEndpoint := d.self.InternalEndpoint

d.lock.Unlock()

Expand All @@ -616,9 +625,10 @@ func (d *gossipDiscoveryImpl) createAliveMessage() *proto.GossipMessage {
Content: &proto.GossipMessage_AliveMsg{
AliveMsg: &proto.AliveMessage{
Membership: &proto.Member{
Endpoint: endpoint,
Metadata: meta,
PkiID: pkiID,
Endpoint: endpoint,
Metadata: meta,
PkiID: pkiID,
InternalEndpoint: internalEndpoint,
},
Timestamp: &proto.PeerTime{
IncNumber: uint64(d.incTime),
Expand Down Expand Up @@ -649,14 +659,15 @@ func (d *gossipDiscoveryImpl) learnExistingMembers(aliveArr []*proto.GossipMessa
member := d.id2Member[string(am.Membership.PkiID)]
member.Endpoint = am.Membership.Endpoint
member.Metadata = am.Membership.Metadata
member.InternalEndpoint = am.Membership.InternalEndpoint

if _, isKnownAsDead := d.deadLastTS[string(am.Membership.PkiID)]; isKnownAsDead {
d.logger.Warning(am.Membership.Endpoint, "has already expired")
d.logger.Warning(am.Membership, "has already expired")
continue
}

if _, isKnownAsAlive := d.aliveLastTS[string(am.Membership.PkiID)]; !isKnownAsAlive {
d.logger.Warning(am.Membership.Endpoint, "has already expired")
d.logger.Warning(am.Membership, "has already expired")
continue
} else {
d.logger.Debug("Updating aliveness data:", am)
Expand Down Expand Up @@ -686,7 +697,7 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
defer d.lock.Unlock()

for _, am := range aliveMembers {
if equalPKIid(am.GetAliveMsg().Membership.PkiID, d.pkiID) {
if equalPKIid(am.GetAliveMsg().Membership.PkiID, d.self.PKIid) {
continue
}
d.aliveLastTS[string(am.GetAliveMsg().Membership.PkiID)] = &timestamp{
Expand All @@ -700,7 +711,7 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
}

for _, dm := range deadMembers {
if equalPKIid(dm.GetAliveMsg().Membership.PkiID, d.pkiID) {
if equalPKIid(dm.GetAliveMsg().Membership.PkiID, d.self.PKIid) {
continue
}
d.deadLastTS[string(dm.GetAliveMsg().Membership.PkiID)] = &timestamp{
Expand All @@ -722,9 +733,10 @@ func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.GossipMessag
return
}
d.id2Member[string(member.Membership.PkiID)] = &NetworkMember{
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiID,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiID,
InternalEndpoint: member.Membership.InternalEndpoint,
}
}
}
Expand All @@ -741,9 +753,10 @@ func (d *gossipDiscoveryImpl) GetMembership() []NetworkMember {
for _, m := range d.cachedMembership.Alive {
member := m.GetAliveMsg()
response = append(response, NetworkMember{
PKIid: member.Membership.PkiID,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiID,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
InternalEndpoint: member.Membership.InternalEndpoint,
})
}
return response
Expand All @@ -757,18 +770,23 @@ func tsToTime(ts uint64) time.Time {
func (d *gossipDiscoveryImpl) UpdateMetadata(md []byte) {
d.lock.Lock()
defer d.lock.Unlock()
d.metadata = md
d.self.Metadata = md
}

func (d *gossipDiscoveryImpl) UpdateEndpoint(endpoint string) {
d.lock.Lock()
defer d.lock.Unlock()

d.endpoint = endpoint
d.self.Endpoint = endpoint
}

func (d *gossipDiscoveryImpl) Self() NetworkMember {
return NetworkMember{Endpoint: d.endpoint, Metadata: d.metadata, PKIid: d.pkiID}
return NetworkMember{
Endpoint: d.self.Endpoint,
Metadata: d.self.Metadata,
PKIid: d.self.PKIid,
InternalEndpoint: d.self.InternalEndpoint,
}
}

func (d *gossipDiscoveryImpl) toDie() bool {
Expand All @@ -788,7 +806,7 @@ func equalPKIid(a, b common.PKIidType) bool {
}

func same(a *timestamp, b *proto.PeerTime) bool {
return (uint64(a.incTime.UnixNano()) == b.IncNumber && a.seqNum == b.SeqNum)
return uint64(a.incTime.UnixNano()) == b.IncNumber && a.seqNum == b.SeqNum
}

func before(a *timestamp, b *proto.PeerTime) bool {
Expand Down
20 changes: 20 additions & 0 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
Metadata: []byte{},
PKIid: []byte(endpoint),
Endpoint: endpoint,
InternalEndpoint: &proto.SignedEndpoint{
Endpoint: endpoint,
Signature: []byte{},
},
}

listenAddress := fmt.Sprintf("%s:%d", "", port)
Expand Down Expand Up @@ -414,6 +418,22 @@ func TestGetFullMembership(t *testing.T) {
}

assertMembership(t, instances, nodeNum-1)

// Ensure that internal endpoint was propagated to everyone
for _, inst := range instances {
for _, member := range inst.GetMembership() {
assert.NotEmpty(t, member.InternalEndpoint.Endpoint)
assert.NotEmpty(t, member.Endpoint)
}
}

// Check that Exists() is valid
for _, inst := range instances {
for _, member := range inst.GetMembership() {
assert.True(t, inst.Exists(member.PKIid))
}
}

stopInstances(t, instances)
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SelectPeers(k int, peerPool []discovery.NetworkMember, filters ...RoutingFi
var filteredPeers []*comm.RemotePeer
for _, peer := range peerPool {
if CombineRoutingFilters(filters...)(peer) {
filteredPeers = append(filteredPeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint})
filteredPeers = append(filteredPeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()})
}
}

Expand Down
4 changes: 3 additions & 1 deletion gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type Gossip interface {
type Config struct {
BindPort int // Port we bind to, used only for tests
ID string // ID of this instance
SelfEndpoint string // Endpoint we publish to remote peers
BootstrapPeers []string // Peers we connect to at startup
PropagateIterations int // Number of times a message is pushed to remote peers
PropagatePeerNum int // Number of peers selected to push messages to
Expand All @@ -89,4 +88,7 @@ type Config struct {
PublishStateInfoInterval time.Duration // Determines frequency of pushing state info messages to peers
RequestStateInfoInterval time.Duration // Determines frequency of pulling state info messages from peers
TLSServerCert *tls.Certificate // TLS certificate of the peer

InternalEndpoint string // Endpoint we publish to peers in our organization
ExternalEndpoint string // Peer publishes this endpoint instead of SelfEndpoint to foreign organizations
}
Loading

0 comments on commit d58349a

Please sign in to comment.