Skip to content

Commit

Permalink
[FAB-2424] Enforce MSP channel validation in gossip
Browse files Browse the repository at this point in the history
Before this commit, peers of an organization could publish assertions
for having joined a channel, and it was enough for them to obtain blocks
from peers in the same organization.

This commit integrates gossip with the MSP method (VerifyByChannel) that consults
the MSP policies whether a certain peer is indeed eligible for receiving blocks
for the channel.

I changed the gossip tests and the channel tests to mock the method,
and also added a test in the channel test that checks that even when we query
the membership of a channel, it also calls VerifyByChannel.

I also had to make the MembershipStore thread safe for the channel

Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Change-Id: I0fccf98080d7e72d05e7e762244fe366e9f6e32a
  • Loading branch information
yacovm committed Feb 27, 2017
1 parent 7134f9f commit b36a664
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 86 deletions.
10 changes: 5 additions & 5 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type gossipDiscoveryImpl struct {
deadLastTS map[string]*timestamp // H
aliveLastTS map[string]*timestamp // V
id2Member map[string]*NetworkMember // all known members
aliveMembership util.MembershipStore
deadMembership util.MembershipStore
aliveMembership *util.MembershipStore
deadMembership *util.MembershipStore

bootstrapPeers []string

Expand All @@ -98,8 +98,8 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
id2Member: make(map[string]*NetworkMember),
aliveMembership: make(util.MembershipStore, 0),
deadMembership: make(util.MembershipStore, 0),
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
Expand Down Expand Up @@ -208,7 +208,7 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {

d.lock.RLock()

n := len(d.aliveMembership)
n := d.aliveMembership.Size()
k := peerNum
if k > n {
k = n
Expand Down
91 changes: 60 additions & 31 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type GossipChannel interface {
// IsOrgInChannel returns whether the given organization is in the channel
IsOrgInChannel(membersOrg api.OrgIdentityType) bool

// IsSubscribed returns whether the given member published
// its participation in the channel
IsSubscribed(member discovery.NetworkMember) bool
// EligibleForChannel returns whether the given member should get blocks
// for this channel
EligibleForChannel(member discovery.NetworkMember) bool

// HandleMessage processes a message sent by a remote peer
HandleMessage(proto.ReceivedMessage)
Expand Down Expand Up @@ -106,7 +106,11 @@ type Adapter interface {
OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType

// GetOrgOfPeer returns the organization ID of a given peer PKI-ID
GetOrgOfPeer(common.PKIidType) api.OrgIdentityType
GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType

// GetIdentityByPKIID returns an identity of a peer with a certain
// pkiID, or nil if not found
GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType
}

type gossipChannel struct {
Expand All @@ -119,7 +123,7 @@ type gossipChannel struct {
orgs []api.OrgIdentityType
joinMsg api.JoinChannelMessage
blockMsgStore msgstore.MessageStore
stateInfoMsgStore msgstore.MessageStore
stateInfoMsgStore *stateInfoCache
leaderMsgStore msgstore.MessageStore
chainID common.ChainID
blocksPuller pull.Mediator
Expand All @@ -138,7 +142,7 @@ type membershipFilter struct {
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
var members []discovery.NetworkMember
for _, mem := range mf.adapter.GetMembership() {
if mf.IsSubscribed(mem) {
if mf.EligibleForChannel(mem) {
members = append(members, mem)
}
}
Expand Down Expand Up @@ -166,7 +170,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
})

gc.stateInfoMsgStore = NewStateInfoMessageStore()
gc.stateInfoMsgStore = newStateInfoCache()
gc.blocksPuller = gc.createBlockPuller()
gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})

Expand Down Expand Up @@ -203,30 +207,23 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
members := []discovery.NetworkMember{}

pkiID2NetMember := make(map[string]discovery.NetworkMember)
for _, member := range gc.GetMembership() {
pkiID2NetMember[string(member.PKIid)] = member
}

for _, o := range gc.stateInfoMsgStore.Get() {
stateInf := o.(*proto.SignedGossipMessage).GetStateInfo()
pkiID := stateInf.PkiID
if member, exists := pkiID2NetMember[string(pkiID)]; !exists {
if !gc.EligibleForChannel(member) {
continue
} else {
member.Metadata = stateInf.Metadata
members = append(members, member)
}
stateInf := gc.stateInfoMsgStore.MsgByID(member.PKIid)
if stateInf == nil {
continue
}
member.Metadata = stateInf.GetStateInfo().Metadata
members = append(members, member)
}
return members
}

func (gc *gossipChannel) requestStateInfo() {
req := gc.createStateInfoRequest().NoopSign()
endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsSubscribed)
if len(endpoints) == 0 {
endpoints = filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
}
endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
gc.Send(req, endpoints...)
}

Expand Down Expand Up @@ -296,19 +293,20 @@ func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
return false
}

// IsSubscribed returns whether the given member published
// its participation in the channel
func (gc *gossipChannel) IsSubscribed(member discovery.NetworkMember) bool {
// EligibleForChannel returns whether the given member should get blocks
// for this channel
func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
if !gc.IsMemberInChan(member) {
return false
}
for _, o := range gc.stateInfoMsgStore.Get() {
m, isMsg := o.(*proto.SignedGossipMessage)
if isMsg && m.IsStateInfoMsg() && bytes.Equal(m.GetStateInfo().PkiID, member.PKIid) {
return true
}

identity := gc.GetIdentityByPKIID(member.PKIid)
msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
if msg == nil || identity == nil {
return false
}
return false

return gc.mcs.VerifyByChannel(gc.chainID, identity, msg.Envelope.Signature, msg.Envelope.Payload) == nil
}

// AddToMsgStore adds a given GossipMessage to the message store
Expand Down Expand Up @@ -412,6 +410,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
return
}
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BlockMessage {
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetPKIID()}) {
gc.logger.Warning(msg.GetPKIID(), "isn't eligible for channel", gc.chainID)
return
}
if m.IsDataUpdate() {
for _, item := range m.GetDataUpdate().Data {
gMsg, err := item.ToGossipMessage()
Expand Down Expand Up @@ -563,3 +565,30 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
func NewStateInfoMessageStore() msgstore.MessageStore {
return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})
}

func newStateInfoCache() *stateInfoCache {
return &stateInfoCache{
MembershipStore: util.NewMembershipStore(),
MessageStore: NewStateInfoMessageStore(),
}
}

// stateInfoCache is actually a messageStore
// that also indexes messages that are added
// so that they could be extracted later
type stateInfoCache struct {
*util.MembershipStore
msgstore.MessageStore
}

// Add attempts to add the given message to the stateInfoCache,
// and if the message was added, also indexes it.
// Message must be a StateInfo message.
func (cache stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
added := cache.MessageStore.Add(msg)
pkiID := msg.GetStateInfo().PkiID
if added {
cache.MembershipStore.Put(pkiID, msg)
}
return added
}
82 changes: 71 additions & 11 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ limitations under the License.
package channel

import (
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"errors"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
Expand Down Expand Up @@ -57,11 +56,12 @@ func init() {
var (
// Organizations: {ORG1, ORG2}
// Channel A: {ORG1}
channelA = common.ChainID("A")
orgInChannelA = api.OrgIdentityType("ORG1")
orgNotInChannelA = api.OrgIdentityType("ORG2")
pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1")
pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2")
channelA = common.ChainID("A")
orgInChannelA = api.OrgIdentityType("ORG1")
orgNotInChannelA = api.OrgIdentityType("ORG2")
pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1")
pkiIDInOrg1ButNotEligible = common.PKIidType("pkiIDInOrg1ButNotEligible")
pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2")
)

type joinChanMsg struct {
Expand All @@ -88,15 +88,20 @@ func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer {
}

type cryptoService struct {
mocked bool
mock.Mock
}

func (cs *cryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
panic("Should not be called in this test")
}

func (cs *cryptoService) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityType, _, _ []byte) error {
panic("Should not be called in this test")
func (cs *cryptoService) VerifyByChannel(channel common.ChainID, identity api.PeerIdentityType, _, _ []byte) error {
if !cs.mocked {
return nil
}
args := cs.Called(identity)
return args.Get(0).(error)
}

func (cs *cryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error {
Expand Down Expand Up @@ -199,10 +204,15 @@ func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdenti
return args.Get(0).(api.OrgIdentityType)
}

func (ga *gossipAdapterMock) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
return api.PeerIdentityType(pkiID)
}

func configureAdapter(adapter *gossipAdapterMock, members ...discovery.NetworkMember) {
adapter.On("GetConf").Return(conf)
adapter.On("GetMembership").Return(members)
adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(orgInChannelA)
adapter.On("GetOrgOfPeer", pkiIDInOrg1ButNotEligible).Return(orgInChannelA)
adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA)
adapter.On("GetOrgOfPeer", mock.Anything).Return(api.OrgIdentityType(nil))
}
Expand Down Expand Up @@ -312,6 +322,8 @@ func TestChannelPeerNotInChannel(t *testing.T) {
gossipMessagesSentFromChannel <- msg
}
// First, ensure it does that for pull messages from peers that are in the channel
// Let the peer first publish it is in the channel
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1})
helloMsg := createHelloMsg(pkiIDInOrg1)
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
gc.HandleMessage(helloMsg)
Expand All @@ -330,6 +342,23 @@ func TestChannelPeerNotInChannel(t *testing.T) {
case <-time.After(time.Second * 1):
}

// Now for a more advanced scenario- the peer claims to be in the right org, and also claims to be in the channel
// but the MSP declares it is not eligible for the channel
// pkiIDInOrg1ButNotEligible
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
cs.mocked = true
helloMsg = createHelloMsg(pkiIDInOrg1ButNotEligible)
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
gc.HandleMessage(helloMsg)
select {
case <-gossipMessagesSentFromChannel:
t.Fatal("Responded with digest, but shouldn't have since peer is not eligible for the channel")
case <-time.After(time.Second * 1):
}

cs.Mock = mock.Mock{}

// Ensure we respond to a valid StateInfoRequest
req := gc.(*gossipChannel).createStateInfoRequest()
validReceivedMsg := &receivedMsg{
Expand Down Expand Up @@ -404,7 +433,7 @@ func TestChannelIsSubscribed(t *testing.T) {
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1})
assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
}

func TestChannelAddToMessageStore(t *testing.T) {
Expand Down Expand Up @@ -456,7 +485,7 @@ func TestChannelAddToMessageStore(t *testing.T) {
}

gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1})
assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
}

func TestChannelBadBlocks(t *testing.T) {
Expand Down Expand Up @@ -782,7 +811,38 @@ func TestChannelReconfigureChannel(t *testing.T) {
t.Fatal("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel")
case <-time.After(time.Second * 1):
}
}

func TestChannelGetPeers(t *testing.T) {
t.Parallel()

// Scenario: We have a peer in an org, and the peer is notified that several peers
// exist, and some of them:
// (1) Join its channel, and are eligible for receiving blocks.
// (2) Join its channel, but are not eligible for receiving blocks (MSP doesn't allow this).
// (3) Say they join its channel, but are actually from an org that is not in the channel.
// The GetPeers query should only return peers that belong to the first group.
cs := &cryptoService{}
adapter := new(gossipAdapterMock)
adapter.On("Gossip", mock.Anything)
adapter.On("Send", mock.Anything, mock.Anything)
adapter.On("DeMultiplex", mock.Anything)
members := []discovery.NetworkMember{
{PKIid: pkiIDInOrg1},
{PKIid: pkiIDInOrg1ButNotEligible},
{PKIid: pkiIDinOrg2},
}
configureAdapter(adapter, members...)
gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
assert.Len(t, gc.GetPeers(), 1)
assert.Equal(t, pkiIDInOrg1, gc.GetPeers()[0].PKIid)

gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
cs.mocked = true
assert.Len(t, gc.GetPeers(), 0)
}

func createDataUpdateMsg(nonce uint64) *proto.SignedGossipMessage {
Expand Down
10 changes: 10 additions & 0 deletions gossip/gossip/chanstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,13 @@ func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) ap
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
}

// GetIdentityByPKIID returns an identity of a peer with a certain
// pkiID, or nil if not found
func (ga *gossipAdapterImpl) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
identity, err := ga.idMapper.Get(pkiID)
if err != nil {
return nil
}
return identity
}
4 changes: 2 additions & 2 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
// Gossip blocks
blocks, msgs = partitionMessages(isABlock, msgs)
g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter {
return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg)
return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
})

// Gossip StateInfo messages
Expand All @@ -445,7 +445,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
// Gossip Leadership messages
leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs)
g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter {
return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg)
return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
})

// Gossip messages restricted to our org
Expand Down
Loading

0 comments on commit b36a664

Please sign in to comment.