From 077126ef538d01fb254fb0e3e625c4b3e80dd6d1 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Sat, 15 Apr 2017 15:32:44 +0300 Subject: [PATCH] [FAB-2061] Gossip inter-org confidentiality - P1 This change set makes every 2 orgs (that share a channel), gossip only alive messages that that originated in one of the two organizations. The motivation is to prevent a third organization from knowing that the two orgs share a common channel (unless of course, they all share it). So, if we have the following situation: Channel C0: { orgA, orgB } Channel C1: { orgB, orgC } [ A ]--C0--[ B ]--C1--[ C ] Then orgA has no idea of orgC's existence and vice-versa. But, also if we have the following situation: Channel C0: { orgA, orgB } Channel C1: { orgA, orgC } Channel C2: { orgB, orgC } [ A ]-C0-[ B ] | / | / C1 C2 | / | / [ C ] Then, alive message from orgA, are never sent to orgB via orgC (and the same for orgB, orgC) Or, in other words: an Alive message m about a peer in org X is sent to org Y, only if X is either in X or in Y. In the next change set I'll do the same for identity messages (messages that contain certificates) Change-Id: I816e6b23050d320f8b99a64b5c84d619de54aaa8 Signed-off-by: Yacov Manevich --- gossip/filter/filter.go | 16 ++- gossip/gossip/gossip_impl.go | 53 ++++++- gossip/gossip/orgs_test.go | 268 +++++++++++++++++++++++++++++++++++ 3 files changed, 328 insertions(+), 9 deletions(-) diff --git a/gossip/filter/filter.go b/gossip/filter/filter.go index 255b5893444..3a21d324f65 100644 --- a/gossip/filter/filter.go +++ b/gossip/filter/filter.go @@ -27,6 +27,16 @@ import ( // selected for be given a message type RoutingFilter func(discovery.NetworkMember) bool +// SelectNonePolicy selects an empty set of members +var SelectNonePolicy = func(discovery.NetworkMember) bool { + return false +} + +// SelectAllPolicy selects all members given +var SelectAllPolicy = func(discovery.NetworkMember) bool { + return true +} + // CombineRoutingFilters returns the logical AND of given routing filters func CombineRoutingFilters(filters ...RoutingFilter) RoutingFilter { return func(member discovery.NetworkMember) bool { @@ -39,11 +49,11 @@ func CombineRoutingFilters(filters ...RoutingFilter) RoutingFilter { } } -// SelectPeers returns a slice of peers that match a list of routing filters -func SelectPeers(k int, peerPool []discovery.NetworkMember, filters ...RoutingFilter) []*comm.RemotePeer { +// SelectPeers returns a slice of peers that match the routing filter +func SelectPeers(k int, peerPool []discovery.NetworkMember, filter RoutingFilter) []*comm.RemotePeer { var filteredPeers []*comm.RemotePeer for _, peer := range peerPool { - if CombineRoutingFilters(filters...)(peer) { + if filter(peer) { filteredPeers = append(filteredPeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()}) } } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 5634ea7c11b..d58607eec7b 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -483,8 +483,13 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { } // Finally, gossip the remaining messages - peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership()) for _, msg := range msgs { + if !msg.IsAliveMsg() { + g.logger.Error("Unknown message type", msg) + continue + } + selectByOriginOrg := g.peersByOriginOrgPolicy(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId}) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selectByOriginOrg) g.sendAndFilterSecrets(msg, peers2Send...) } } @@ -1021,12 +1026,20 @@ func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember } org := g.getOrgOfPeer(msg.GetAliveMsg().Membership.PkiId) if len(org) == 0 { - // Panic here, because we are somehow trying to send an AliveMessage - // without having its matching identity beforehand, and the message - // should have never reached this far- but should've been dropped - // at signature validation. - g.logger.Panic("Unable to determine org of message", msg.GossipMessage) + g.logger.Warning("Unable to determine org of message", msg.GossipMessage) + // Don't disseminate messages who's origin org is unknown + return false } + + // Target org and the message are from the same org + fromSameForeignOrg := bytes.Equal(remotePeerOrg, org) + // The message is from my org + fromMyOrg := bytes.Equal(g.selfOrg, org) + // Forward to target org only messages from our org, or from the target org itself. + if !(fromSameForeignOrg || fromMyOrg) { + return false + } + // Pass the alive message only if the alive message is in the same org as the remote peer // or the message has an external endpoint, and the remote peer also has one return bytes.Equal(org, remotePeerOrg) || msg.GetAliveMsg().Membership.Endpoint != "" && remotePeer.Endpoint != "" @@ -1038,6 +1051,34 @@ func (g *gossipServiceImpl) disclosurePolicy(remotePeer *discovery.NetworkMember } } +func (g *gossipServiceImpl) peersByOriginOrgPolicy(peer discovery.NetworkMember) filter.RoutingFilter { + peersOrg := g.getOrgOfPeer(peer.PKIid) + if len(peersOrg) == 0 { + g.logger.Warning("Unable to determine organization of peer", peer) + // Don't disseminate messages who's origin org is undetermined + return filter.SelectNonePolicy + } + + if bytes.Equal(g.selfOrg, peersOrg) { + // Disseminate messages from our org to all known organizations. + // IMPORTANT: Currently a peer cannot un-join a channel, so the only way + // of making gossip stop talking to an organization is by having the MSP + // refuse validating messages from it. + return filter.SelectAllPolicy + } + + // Else, select peers from the origin's organization, + // and also peers from our own organization + return func(member discovery.NetworkMember) bool { + memberOrg := g.getOrgOfPeer(member.PKIid) + if len(memberOrg) == 0 { + return false + } + isFromMyOrg := bytes.Equal(g.selfOrg, memberOrg) + return isFromMyOrg || bytes.Equal(memberOrg, peersOrg) + } +} + // partitionMessages receives a predicate and a slice of gossip messages // and returns a tuple of two slices: the messages that hold for the predicate // and the rest diff --git a/gossip/gossip/orgs_test.go b/gossip/gossip/orgs_test.go index 3de0647da11..b825c58e22c 100644 --- a/gossip/gossip/orgs_test.go +++ b/gossip/gossip/orgs_test.go @@ -22,11 +22,15 @@ import ( "testing" "time" + "sync" + "sync/atomic" + "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/discovery" "github.com/hyperledger/fabric/gossip/identity" + proto "github.com/hyperledger/fabric/protos/gossip" "github.com/stretchr/testify/assert" ) @@ -241,3 +245,267 @@ func TestMultipleOrgEndpointLeakage(t *testing.T) { } } } + +func TestConfidentiality(t *testing.T) { + t.Parallel() + // Scenario: create 4 organizations: {A, B, C, D}, each with 3 peers. + // Make only the first 2 peers have an external endpoint. + // Also, add the peers to the following channels: + // Channel C0: { orgA, orgB } + // Channel C1: { orgA, orgC } + // Channel C2: { orgB, orgC } + // Channel C3: { orgB, orgD } + // [ A ]-C0-[ B ]-C3-[ D ] + // | / + // | / + // C1 C2 + // | / + // | / + // [ C ] + // Subscribe to all membership messages for each peer, + // and fail the test if a message is sent to a peer in org X, + // from a peer in org Y about a peer in org Z not in {X, Y} + // or if any org other than orgB knows peers in orgD (and vice versa). + + portPrefix := 12610 + peersInOrg := 3 + externalEndpointsInOrg := 2 + + orgs := []string{"A", "B", "C", "D"} + channels := []string{"C0", "C1", "C2", "C3"} + isOrgInChan := func(org string, channel string) bool { + switch org { + case "A": + return channel == "C0" || channel == "C1" + case "B": + return channel == "C0" || channel == "C2" || channel == "C3" + case "C": + return channel == "C1" || channel == "C2" + case "D": + return channel == "C3" + } + + return false + } + + // Create the message crypto service + cs := &configurableCryptoService{m: make(map[string]api.OrgIdentityType)} + for i, org := range orgs { + for j := 0; j < peersInOrg; j++ { + port := portPrefix + i*peersInOrg + j + cs.putInOrg(port, org) + } + } + + var peers []Gossip + orgs2Peers := map[string][]Gossip{ + "A": {}, + "B": {}, + "C": {}, + "D": {}, + } + + anchorPeersByOrg := map[string]api.AnchorPeer{} + + for i, org := range orgs { + for j := 0; j < peersInOrg; j++ { + id := i*peersInOrg + j + port := id + portPrefix + endpoint := fmt.Sprintf("localhost:%d", port) + externalEndpoint := "" + if j < externalEndpointsInOrg { // The first peers of each org would have an external endpoint + externalEndpoint = endpoint + } + peer := newGossipInstanceWithExternalEndpoint(portPrefix, id, cs, externalEndpoint) + peers = append(peers, peer) + orgs2Peers[org] = append(orgs2Peers[org], peer) + // The first peer of the org will be used as the anchor peer + if j == 0 { + anchorPeersByOrg[org] = api.AnchorPeer{ + Host: "localhost", + Port: port, + } + } + } + } + + msgs2Inspect := make(chan *msg, 3000) + defer close(msgs2Inspect) + go inspectMsgs(t, msgs2Inspect, cs) + finished := int32(0) + var wg sync.WaitGroup + + membershipMsgs := func(o interface{}) bool { + msg := o.(proto.ReceivedMessage).GetGossipMessage() + return msg.IsAliveMsg() || msg.GetMemRes() != nil + } + // Listen to all peers membership messages and forward them to the inspection channel + // where they will be inspected, and the test would fail if a confidentiality violation is found + for _, p := range peers { + wg.Add(1) + _, msgs := p.Accept(membershipMsgs, true) + peerNetMember := p.(*gossipServiceImpl).selfNetworkMember() + targetORg := string(cs.OrgByPeerIdentity(api.PeerIdentityType(peerNetMember.InternalEndpoint))) + go func(targetOrg string, msgs <-chan proto.ReceivedMessage) { + defer wg.Done() + for receivedMsg := range msgs { + m := &msg{ + src: string(cs.OrgByPeerIdentity(receivedMsg.GetConnectionInfo().Identity)), + dst: targetORg, + GossipMessage: receivedMsg.GetGossipMessage().GossipMessage, + } + if atomic.LoadInt32(&finished) == int32(1) { + return + } + msgs2Inspect <- m + } + }(targetORg, msgs) + } + + // Now, construct the join channel messages + joinChanMsgsByChan := map[string]*joinChanMsg{} + for _, ch := range channels { + jcm := &joinChanMsg{members2AnchorPeers: map[string][]api.AnchorPeer{}} + for _, org := range orgs { + if isOrgInChan(org, ch) { + jcm.members2AnchorPeers[org] = append(jcm.members2AnchorPeers[org], anchorPeersByOrg[org]) + } + } + joinChanMsgsByChan[ch] = jcm + } + + // Next, make the peers join the channels + for org, peers := range orgs2Peers { + for _, ch := range channels { + if isOrgInChan(org, ch) { + for _, p := range peers { + p.JoinChan(joinChanMsgsByChan[ch], common.ChainID(ch)) + } + } + } + } + + assertMembership := func() bool { + for _, org := range orgs { + for i, p := range orgs2Peers[org] { + members := p.Peers() + expMemberSize := expectedMembershipSize(peersInOrg, externalEndpointsInOrg, org, i < externalEndpointsInOrg) + peerNetMember := p.(*gossipServiceImpl).selfNetworkMember() + membersCount := len(members) + if membersCount < expMemberSize { + return false + } + // Make sure no one knows too much + assert.True(t, membersCount <= expMemberSize, "%s knows too much (%d > %d) peers: %v", + membersCount, expMemberSize, peerNetMember.PKIid, members) + } + } + return true + } + + waitUntilOrFail(t, assertMembership) + stopPeers(peers) + wg.Wait() + atomic.StoreInt32(&finished, int32(1)) +} + +func expectedMembershipSize(peersInOrg, externalEndpointsInOrg int, org string, hasExternalEndpoint bool) int { + // x <-- peersInOrg + // y <-- externalEndpointsInOrg + // (x+2y)[ A ]-C0-[ B ]---C3--[ D ] (x+y) + // | /(x+3y) + // | / + // C1 C2 + // | / + // | / + // [ C ] x+2y + + m := map[string]func(x, y int) int{ + "A": func(x, y int) int { + return x + 2*y + }, + "B": func(x, y int) int { + return x + 3*y + }, + "C": func(x, y int) int { + return x + 2*y + }, + "D": func(x, y int) int { + return x + y + }, + } + + // If the peer doesn't have an external endpoint, + // it doesn't know peers from foreign organizations that have one + if !hasExternalEndpoint { + externalEndpointsInOrg = 0 + } + // Deduct 1 because the peer itself doesn't count + return m[org](peersInOrg, externalEndpointsInOrg) - 1 +} + +func extractOrgsFromMsg(msg *proto.GossipMessage, sec api.SecurityAdvisor) []string { + if msg.IsAliveMsg() { + return []string{string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))} + } + orgs := map[string]struct{}{} + alive := msg.GetMemRes().Alive + dead := msg.GetMemRes().Dead + for _, envp := range append(alive, dead...) { + msg, _ := envp.ToGossipMessage() + orgs[string(sec.OrgByPeerIdentity(api.PeerIdentityType(msg.GetAliveMsg().Membership.PkiId)))] = struct{}{} + } + res := []string{} + for org := range orgs { + res = append(res, org) + } + return res +} + +func inspectMsgs(t *testing.T, msgChan chan *msg, sec api.SecurityAdvisor) { + for msg := range msgChan { + // If the destination org is the same as the source org, + // the message can contain any organizations + if msg.src == msg.dst { + continue + } + // Else, it's a cross-organizational message. + // Denote src organization as s and dst organization as d. + // The total organizations of the message must be a subset of s U d. + orgs := extractOrgsFromMsg(msg.GossipMessage, sec) + s := []string{msg.src, msg.dst} + assert.True(t, isSubset(orgs, s), "%v isn't a subset of %v", orgs, s) + + // Ensure no one but B knows about D and vice versa + if msg.dst == "D" { + assert.NotContains(t, "A", orgs) + assert.NotContains(t, "C", orgs) + } + + if msg.dst == "A" || msg.dst == "C" { + assert.NotContains(t, "D", orgs) + } + } +} + +type msg struct { + src string + dst string + *proto.GossipMessage +} + +func isSubset(a []string, b []string) bool { + for _, s1 := range a { + found := false + for _, s2 := range b { + if s1 == s2 { + found = true + break + } + } + if !found { + return false + } + } + return true +}