Skip to content

Commit

Permalink
[FAB-2007] External and internal endpoints III
Browse files Browse the repository at this point in the history
This is the last commit of the series, in which we enforce
publishing of external endpoints to peers of different orgs
 and publishing of both external and internal endpoints only
 within the peer's organization.

The commit addesses this only in the gossip layer,
as the discovery layer has been taken care of in previous commits.

It contains a test in which we:
- Create 2 orgs with 5 peers each, while the first 2 peers have external
  endpoints, and the rest don't.
- The first org has a bootstrap peer, the second doesn't.
- JoinChannel is invoked with the first peer of each org as anchor peers.
- It is tested that:
  - Peers know peers from other orgs only if they both
    have external endpoints.
  - Peers do not know the internal endpoints of peers of other orgs.

Change-Id: I157c8cea29b35adb84314fcb695413b005f2b236
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Mar 8, 2017
1 parent 24dc7d9 commit 821c9d8
Show file tree
Hide file tree
Showing 2 changed files with 350 additions and 19 deletions.
131 changes: 112 additions & 19 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis

g.discAdapter = g.newDiscoveryAdapter()
g.disSecAdap = g.newDiscoverySecurityAdapter()

noopDisclosurePol := func(remotePeer *discovery.NetworkMember) (discovery.Sieve, discovery.EnvelopeFilter) {
return func(msg *proto.SignedGossipMessage) bool {
return true
}, func(message *proto.SignedGossipMessage) *proto.Envelope {
return message.Envelope
}
}
g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap, noopDisclosurePol)
g.disc = discovery.NewDiscoveryService(conf.BootstrapPeers, g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember())

g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)
Expand All @@ -148,6 +140,9 @@ func (g *gossipServiceImpl) selfNetworkMember() discovery.NetworkMember {
Metadata: []byte{},
InternalEndpoint: g.conf.InternalEndpoint,
}
if g.disc != nil {
self.Metadata = g.disc.Self().Metadata
}
return self
}

Expand Down Expand Up @@ -197,6 +192,10 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
}

inOurOrg := bytes.Equal(g.selfOrg, ap.OrgID)
if !inOurOrg && g.selfNetworkMember().Endpoint == "" {
g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(ap.OrgID))
continue
}
g.disc.Connect(discovery.NetworkMember{
InternalEndpoint: endpoint, Endpoint: endpoint}, inOurOrg)
}
Expand Down Expand Up @@ -346,6 +345,7 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg proto.ReceivedMessage) {
defer func() { // can be closed while shutting down
recover()
}()

g.discAdapter.incChan <- msg.GetGossipMessage()
}

Expand Down Expand Up @@ -427,8 +427,16 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
isAStateInfoMsg := func(o interface{}) bool {
return o.(*proto.SignedGossipMessage).IsStateInfoMsg()
}
aliveMsgsWithNoEndpointAndInOurOrg := func(o interface{}) bool {
msg := o.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return false
}
member := msg.GetAliveMsg().Membership
return member.Endpoint == "" && g.isInMyorg(discovery.NetworkMember{PKIid: member.PkiId})
}
isOrgRestricted := func(o interface{}) bool {
return o.(*proto.SignedGossipMessage).IsOrgRestricted()
return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*proto.SignedGossipMessage).IsOrgRestricted()
}
isLeadershipMsg := func(o interface{}) bool {
return o.(*proto.SignedGossipMessage).IsLeadershipMsg()
Expand Down Expand Up @@ -462,7 +470,29 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
// Finally, gossip the remaining messages
peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership())
for _, msg := range msgs {
g.comm.Send(msg, peers2Send...)
g.sendAndFilterSecrets(msg, peers2Send...)
}
}

func (g *gossipServiceImpl) sendAndFilterSecrets(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
for _, peer := range peers {
// Prevent forwarding alive messages of external organizations
// to peers that have no external endpoints
remotePeerEndpoint := g.disc.Lookup(peer.PKIID)
if remotePeerEndpoint == nil {
g.logger.Warning("Peer", peer, "isn't in the membership anymore, will not send to it")
continue
}
aliveMsgFromDiffOrg := msg.IsAliveMsg() && !g.isInMyorg(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId})
if aliveMsgFromDiffOrg && remotePeerEndpoint.Endpoint == "" {
continue
}
// Don't gossip secrets
if !g.isInMyorg(discovery.NetworkMember{PKIid: peer.PKIID}) {
msg.Envelope.SecretEnvelope = nil
}

g.comm.Send(msg, peer)
}
}

Expand Down Expand Up @@ -670,19 +700,21 @@ func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter {
}
g.emitter.Add(msg)
},
incChan: make(chan *proto.SignedGossipMessage),
presumedDead: g.presumedDead,
incChan: make(chan *proto.SignedGossipMessage),
presumedDead: g.presumedDead,
disclosurePolicy: g.disclosurePolicy,
}
}

// discoveryAdapter is used to supply the discovery module with needed abilities
// that the comm interface in the discovery module declares
type discoveryAdapter struct {
stopping int32
c comm.Comm
presumedDead chan common.PKIidType
incChan chan *proto.SignedGossipMessage
gossipFunc func(message *proto.SignedGossipMessage)
stopping int32
c comm.Comm
presumedDead chan common.PKIidType
incChan chan *proto.SignedGossipMessage
gossipFunc func(message *proto.SignedGossipMessage)
disclosurePolicy discovery.DisclosurePolicy
}

func (da *discoveryAdapter) close() {
Expand All @@ -698,13 +730,40 @@ func (da *discoveryAdapter) Gossip(msg *proto.SignedGossipMessage) {
if da.toDie() {
return
}

da.gossipFunc(msg)
}

func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto.SignedGossipMessage) {
if da.toDie() {
return
}
// Check membership requests for peers that we know of their PKI-ID.
// The only peers we don't know about their PKI-IDs are bootstrap peers.
if memReq := msg.GetMemReq(); memReq != nil && len(peer.PKIid) != 0 {
selfMsg, err := memReq.SelfInformation.ToGossipMessage()
if err != nil {
// Shouldn't happen
panic("Tried to send a membership request with a malformed AliveMessage")
}
// Apply the EnvelopeFilter of the disclosure policy
// on the alive message of the selfInfo field of the membership request
_, omitConcealedFields := da.disclosurePolicy(peer)
selfMsg.Envelope = omitConcealedFields(selfMsg)
// Backup old known field
oldKnown := memReq.Known
// Override new SelfInfo message with updated envelope
memReq = &proto.MembershipRequest{
SelfInformation: selfMsg.Envelope,
Known: oldKnown,
}
// Update original message
msg.Content = &proto.GossipMessage_MemReq{
MemReq: memReq,
}
// Update the envelope of the outer message, no need to sign (point2point)
msg = msg.NoopSign()
}
da.c.Send(msg, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()})
}

Expand Down Expand Up @@ -892,7 +951,6 @@ func (g *gossipServiceImpl) isInMyorg(member discovery.NetworkMember) bool {
func (g *gossipServiceImpl) getOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
cert, err := g.idMapper.Get(PKIID)
if err != nil {
g.logger.Error("Failed getting certificate by PKIid:", PKIID, ":", err)
return nil
}

Expand Down Expand Up @@ -928,6 +986,41 @@ func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.SignedGossipMessage)
return msg.Verify(identity, verifier)
}

func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember) (discovery.Sieve, discovery.EnvelopeFilter) {
remotePeerOrg := g.getOrgOfPeer(remotePeer.PKIid)

if len(remotePeerOrg) == 0 {
g.logger.Warning("Cannot determine organization of", remotePeer)
return func(msg *proto.SignedGossipMessage) bool {
return false
}, func(msg *proto.SignedGossipMessage) *proto.Envelope {
return msg.Envelope
}
}

return func(msg *proto.SignedGossipMessage) bool {
if !msg.IsAliveMsg() {
g.logger.Panic("Programming error, this should be used only on alive messages")
}
org := g.getOrgOfPeer(msg.GetAliveMsg().Membership.PkiId)
if len(org) == 0 {
// Panic here, because we are somehow trying to send an AliveMessage
// without having its matching identity beforehand, and the message
// should have never reached this far- but should've been dropped
// at signature validation.
g.logger.Panic("Unable to determine org of message", msg.GossipMessage)
}
// Pass the alive message only if the alive message is in the same org as the remote peer
// or the message has an external endpoint, and the remote peer also has one
return bytes.Equal(org, remotePeerOrg) || msg.GetAliveMsg().Membership.Endpoint != "" && remotePeer.Endpoint != ""
}, func(msg *proto.SignedGossipMessage) *proto.Envelope {
if !bytes.Equal(g.selfOrg, remotePeerOrg) {
msg.SecretEnvelope = nil
}
return msg.Envelope
}
}

// partitionMessages receives a predicate and a slice of gossip messages
// and returns a tuple of two slices: the messages that hold for the predicate
// and the rest
Expand Down
Loading

0 comments on commit 821c9d8

Please sign in to comment.