Skip to content

Commit

Permalink
[FAB-2061] Gossip inter-org confidentiality - P3
Browse files Browse the repository at this point in the history
In the previous commit, I extended the pull mechanism to support
filtering based on the remote peer.

The goal was to ensure that peers only gossip identities of either
their own organization, or of organizations they are gossiping with.
(Meaning- if A gossips with B identities, the identities that are sent
are either identities of A or of B).

In this commit I hook a filter to the constructor of the identity puller
that extracts the organization of the remote peer, and based on the
organization of the digest (which is the PKI-ID of the identity),
the filter ensures the confidentiality explained above.

I also extended the test of the first commit in the series, to inspect
identities that travel between organizations, to make sure
that the confidentiality rules are preserved.

I also did the following test, to make sure that state transfer between
peers still works:

network_setup.sh: Overided the docker-compose file with docker-compose-no-tls
peer-base/peer-base-no-tls.yaml: I made both options:
      - CORE_PEER_GOSSIP_ORGLEADER=false
      - CORE_PEER_GOSSIP_USELEADERELECTION=false
And in docker-compose-no-tls.yaml I made ONLY peer0 to be ORGLEADER-true

Then I ran network_setup.sh up and it passed, so this proves that
peer2 and peer3 got their blocks from peer0 (or peer1).

Change-Id: Ied4979dd5cfc24ef7b0d1bc092653e9f037a64f2
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Apr 22, 2017
1 parent 88ac3ba commit 9d04269
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 12 deletions.
29 changes: 28 additions & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,17 +938,44 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
g.logger.Warning("Failed associating PKI-ID with certificate:", err)
}
g.logger.Info("Learned of a new certificate:", idMsg.Cert)

}
adapter := pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
DigFilter: g.sameOrgOrOurOrgPullFilter,
}
return pull.NewPullMediator(conf, adapter)
}

func (g *gossipServiceImpl) sameOrgOrOurOrgPullFilter(msg proto.ReceivedMessage) func(string) bool {
peersOrg := g.secAdvisor.OrgByPeerIdentity(msg.GetConnectionInfo().Identity)
if len(peersOrg) == 0 {
g.logger.Warning("Failed determining organization of", msg.GetConnectionInfo())
return func(_ string) bool {
return false
}
}

// If the peer is from our org, gossip all identities
if bytes.Equal(g.selfOrg, peersOrg) {
return func(_ string) bool {
return true
}
}
return func(item string) bool {
pkiID := common.PKIidType(item)
msgsOrg := g.getOrgOfPeer(pkiID)
if len(msgsOrg) == 0 {
g.logger.Warning("Failed determining organization of", pkiID)
return false
}
// Peer from our org or identity from our org or identity from peer's org
return bytes.Equal(msgsOrg, g.selfOrg) || bytes.Equal(msgsOrg, peersOrg)
}
}

func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) {
pkiID := g.comm.GetPKIid()
stateInfMsg := &proto.StateInfo{
Expand Down
61 changes: 50 additions & 11 deletions gossip/gossip/orgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package gossip
import (
"bytes"
"fmt"
"testing"
"time"

"sync"
"sync/atomic"
"testing"
"time"

"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/gossip/api"
Expand Down Expand Up @@ -271,6 +270,10 @@ func TestConfidentiality(t *testing.T) {
peersInOrg := 3
externalEndpointsInOrg := 2

// orgA: {12610, 12611, 12612}
// orgB: {12613, 12614, 12615}
// orgC: {12616, 12617, 12618}
// orgD: {12619, 12620, 12621}
orgs := []string{"A", "B", "C", "D"}
channels := []string{"C0", "C1", "C2", "C3"}
isOrgInChan := func(org string, channel string) bool {
Expand Down Expand Up @@ -335,15 +338,16 @@ func TestConfidentiality(t *testing.T) {
finished := int32(0)
var wg sync.WaitGroup

membershipMsgs := func(o interface{}) bool {
membershipAndIdentitiesMsgs := func(o interface{}) bool {
msg := o.(proto.ReceivedMessage).GetGossipMessage()
return msg.IsAliveMsg() || msg.GetMemRes() != nil
identitiesPull := msg.IsPullMsg() && msg.GetPullMsgType() == proto.PullMsgType_IDENTITY_MSG
return msg.IsAliveMsg() || msg.GetMemRes() != nil || identitiesPull
}
// Listen to all peers membership messages and forward them to the inspection channel
// where they will be inspected, and the test would fail if a confidentiality violation is found
for _, p := range peers {
wg.Add(1)
_, msgs := p.Accept(membershipMsgs, true)
_, msgs := p.Accept(membershipAndIdentitiesMsgs, true)
peerNetMember := p.(*gossipServiceImpl).selfNetworkMember()
targetORg := string(cs.OrgByPeerIdentity(api.PeerIdentityType(peerNetMember.InternalEndpoint)))
go func(targetOrg string, msgs <-chan proto.ReceivedMessage) {
Expand Down Expand Up @@ -380,11 +384,15 @@ func TestConfidentiality(t *testing.T) {
if isOrgInChan(org, ch) {
for _, p := range peers {
p.JoinChan(joinChanMsgsByChan[ch], common.ChainID(ch))
p.UpdateChannelMetadata([]byte{}, common.ChainID(ch))
}
}
}
}

// Sleep a bit, to let peers gossip with each other
time.Sleep(time.Second * 7)

assertMembership := func() bool {
for _, org := range orgs {
for i, p := range orgs2Peers[org] {
Expand Down Expand Up @@ -448,13 +456,44 @@ func extractOrgsFromMsg(msg *proto.GossipMessage, sec api.SecurityAdvisor) []str
if msg.IsAliveMsg() {
return []string{string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))}
}

orgs := map[string]struct{}{}
alive := msg.GetMemRes().Alive
dead := msg.GetMemRes().Dead
for _, envp := range append(alive, dead...) {
msg, _ := envp.ToGossipMessage()
orgs[string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))] = struct{}{}

if msg.IsPullMsg() {
if msg.IsDigestMsg() || msg.IsDataReq() {
var digests []string
if msg.IsDigestMsg() {
digests = msg.GetDataDig().Digests
} else {
digests = msg.GetDataReq().Digests
}

for _, dig := range digests {
org := sec.OrgByPeerIdentity(api.PeerIdentityType(dig))
orgs[string(org)] = struct{}{}
}
}

if msg.IsDataUpdate() {
for _, identityMsg := range msg.GetDataUpdate().Data {
gMsg, _ := identityMsg.ToGossipMessage()
id := string(gMsg.GetPeerIdentity().Cert)
org := sec.OrgByPeerIdentity(api.PeerIdentityType(id))
orgs[string(org)] = struct{}{}
}
}

}

if msg.GetMemRes() != nil {
alive := msg.GetMemRes().Alive
dead := msg.GetMemRes().Dead
for _, envp := range append(alive, dead...) {
msg, _ := envp.ToGossipMessage()
orgs[string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))] = struct{}{}
}
}

res := []string{}
for org := range orgs {
res = append(res, org)
Expand Down

0 comments on commit 9d04269

Please sign in to comment.