From b36a66412717bccb65e8e5491a82619fb4c67341 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Wed, 22 Feb 2017 10:43:35 +0200 Subject: [PATCH] [FAB-2424] Enforce MSP channel validation in gossip Before this commit, peers of an organization could publish assertions for having joined a channel, and it was enough for them to obtain blocks from peers in the same organization. This commit integrates gossip with the MSP method (VerifyByChannel) that consults the MSP policies whether a certain peer is indeed eligible for receiving blocks for the channel. I changed the gossip tests and the channel tests to mock the method, and also added a test in the channel test that checks that even when we query the membership of a channel, it also calls VerifyByChannel. I also had to make the MembershipStore thread safe for the channel Signed-off-by: Yacov Manevich Change-Id: I0fccf98080d7e72d05e7e762244fe366e9f6e32a --- gossip/discovery/discovery_impl.go | 10 +-- gossip/gossip/channel/channel.go | 91 ++++++++++++++++++--------- gossip/gossip/channel/channel_test.go | 82 ++++++++++++++++++++---- gossip/gossip/chanstate.go | 10 +++ gossip/gossip/gossip_impl.go | 4 +- gossip/gossip/gossip_test.go | 77 ++++++++++++++++------- gossip/util/msgs.go | 43 ++++++++++--- gossip/util/msgs_test.go | 10 +-- 8 files changed, 241 insertions(+), 86 deletions(-) diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index dd90a7061b7..db7f99af2fe 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -75,8 +75,8 @@ type gossipDiscoveryImpl struct { deadLastTS map[string]*timestamp // H aliveLastTS map[string]*timestamp // V id2Member map[string]*NetworkMember // all known members - aliveMembership util.MembershipStore - deadMembership util.MembershipStore + aliveMembership *util.MembershipStore + deadMembership *util.MembershipStore bootstrapPeers []string @@ -98,8 +98,8 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS deadLastTS: make(map[string]*timestamp), aliveLastTS: make(map[string]*timestamp), id2Member: make(map[string]*NetworkMember), - aliveMembership: make(util.MembershipStore, 0), - deadMembership: make(util.MembershipStore, 0), + aliveMembership: util.NewMembershipStore(), + deadMembership: util.NewMembershipStore(), crypt: crypt, comm: comm, lock: &sync.RWMutex{}, @@ -208,7 +208,7 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) { d.lock.RLock() - n := len(d.aliveMembership) + n := d.aliveMembership.Size() k := peerNum if k > n { k = n diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index 6a819cd5160..9caddd04bec 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -62,9 +62,9 @@ type GossipChannel interface { // IsOrgInChannel returns whether the given organization is in the channel IsOrgInChannel(membersOrg api.OrgIdentityType) bool - // IsSubscribed returns whether the given member published - // its participation in the channel - IsSubscribed(member discovery.NetworkMember) bool + // EligibleForChannel returns whether the given member should get blocks + // for this channel + EligibleForChannel(member discovery.NetworkMember) bool // HandleMessage processes a message sent by a remote peer HandleMessage(proto.ReceivedMessage) @@ -106,7 +106,11 @@ type Adapter interface { OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType // GetOrgOfPeer returns the organization ID of a given peer PKI-ID - GetOrgOfPeer(common.PKIidType) api.OrgIdentityType + GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType + + // GetIdentityByPKIID returns an identity of a peer with a certain + // pkiID, or nil if not found + GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType } type gossipChannel struct { @@ -119,7 +123,7 @@ type gossipChannel struct { orgs []api.OrgIdentityType joinMsg api.JoinChannelMessage blockMsgStore msgstore.MessageStore - stateInfoMsgStore msgstore.MessageStore + stateInfoMsgStore *stateInfoCache leaderMsgStore msgstore.MessageStore chainID common.ChainID blocksPuller pull.Mediator @@ -138,7 +142,7 @@ type membershipFilter struct { func (mf *membershipFilter) GetMembership() []discovery.NetworkMember { var members []discovery.NetworkMember for _, mem := range mf.adapter.GetMembership() { - if mf.IsSubscribed(mem) { + if mf.EligibleForChannel(mem) { members = append(members, mem) } } @@ -166,7 +170,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage)) }) - gc.stateInfoMsgStore = NewStateInfoMessageStore() + gc.stateInfoMsgStore = newStateInfoCache() gc.blocksPuller = gc.createBlockPuller() gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) @@ -203,30 +207,23 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) { func (gc *gossipChannel) GetPeers() []discovery.NetworkMember { members := []discovery.NetworkMember{} - pkiID2NetMember := make(map[string]discovery.NetworkMember) for _, member := range gc.GetMembership() { - pkiID2NetMember[string(member.PKIid)] = member - } - - for _, o := range gc.stateInfoMsgStore.Get() { - stateInf := o.(*proto.SignedGossipMessage).GetStateInfo() - pkiID := stateInf.PkiID - if member, exists := pkiID2NetMember[string(pkiID)]; !exists { + if !gc.EligibleForChannel(member) { continue - } else { - member.Metadata = stateInf.Metadata - members = append(members, member) } + stateInf := gc.stateInfoMsgStore.MsgByID(member.PKIid) + if stateInf == nil { + continue + } + member.Metadata = stateInf.GetStateInfo().Metadata + members = append(members, member) } return members } func (gc *gossipChannel) requestStateInfo() { req := gc.createStateInfoRequest().NoopSign() - endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsSubscribed) - if len(endpoints) == 0 { - endpoints = filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan) - } + endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan) gc.Send(req, endpoints...) } @@ -296,19 +293,20 @@ func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool { return false } -// IsSubscribed returns whether the given member published -// its participation in the channel -func (gc *gossipChannel) IsSubscribed(member discovery.NetworkMember) bool { +// EligibleForChannel returns whether the given member should get blocks +// for this channel +func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool { if !gc.IsMemberInChan(member) { return false } - for _, o := range gc.stateInfoMsgStore.Get() { - m, isMsg := o.(*proto.SignedGossipMessage) - if isMsg && m.IsStateInfoMsg() && bytes.Equal(m.GetStateInfo().PkiID, member.PKIid) { - return true - } + + identity := gc.GetIdentityByPKIID(member.PKIid) + msg := gc.stateInfoMsgStore.MsgByID(member.PKIid) + if msg == nil || identity == nil { + return false } - return false + + return gc.mcs.VerifyByChannel(gc.chainID, identity, msg.Envelope.Signature, msg.Envelope.Payload) == nil } // AddToMsgStore adds a given GossipMessage to the message store @@ -412,6 +410,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) { return } if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BlockMessage { + if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetPKIID()}) { + gc.logger.Warning(msg.GetPKIID(), "isn't eligible for channel", gc.chainID) + return + } if m.IsDataUpdate() { for _, item := range m.GetDataUpdate().Data { gMsg, err := item.ToGossipMessage() @@ -563,3 +565,30 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) { func NewStateInfoMessageStore() msgstore.MessageStore { return msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) } + +func newStateInfoCache() *stateInfoCache { + return &stateInfoCache{ + MembershipStore: util.NewMembershipStore(), + MessageStore: NewStateInfoMessageStore(), + } +} + +// stateInfoCache is actually a messageStore +// that also indexes messages that are added +// so that they could be extracted later +type stateInfoCache struct { + *util.MembershipStore + msgstore.MessageStore +} + +// Add attempts to add the given message to the stateInfoCache, +// and if the message was added, also indexes it. +// Message must be a StateInfo message. +func (cache stateInfoCache) Add(msg *proto.SignedGossipMessage) bool { + added := cache.MessageStore.Add(msg) + pkiID := msg.GetStateInfo().PkiID + if added { + cache.MembershipStore.Put(pkiID, msg) + } + return added +} diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go index 5d6ac17f4f3..53ad54345c0 100644 --- a/gossip/gossip/channel/channel_test.go +++ b/gossip/gossip/channel/channel_test.go @@ -17,6 +17,7 @@ limitations under the License. package channel import ( + "errors" "fmt" "strconv" "sync" @@ -24,8 +25,6 @@ import ( "testing" "time" - "errors" - "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" @@ -57,11 +56,12 @@ func init() { var ( // Organizations: {ORG1, ORG2} // Channel A: {ORG1} - channelA = common.ChainID("A") - orgInChannelA = api.OrgIdentityType("ORG1") - orgNotInChannelA = api.OrgIdentityType("ORG2") - pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1") - pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2") + channelA = common.ChainID("A") + orgInChannelA = api.OrgIdentityType("ORG1") + orgNotInChannelA = api.OrgIdentityType("ORG2") + pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1") + pkiIDInOrg1ButNotEligible = common.PKIidType("pkiIDInOrg1ButNotEligible") + pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2") ) type joinChanMsg struct { @@ -88,6 +88,7 @@ func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { } type cryptoService struct { + mocked bool mock.Mock } @@ -95,8 +96,12 @@ func (cs *cryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) commo panic("Should not be called in this test") } -func (cs *cryptoService) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityType, _, _ []byte) error { - panic("Should not be called in this test") +func (cs *cryptoService) VerifyByChannel(channel common.ChainID, identity api.PeerIdentityType, _, _ []byte) error { + if !cs.mocked { + return nil + } + args := cs.Called(identity) + return args.Get(0).(error) } func (cs *cryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error { @@ -199,10 +204,15 @@ func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdenti return args.Get(0).(api.OrgIdentityType) } +func (ga *gossipAdapterMock) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType { + return api.PeerIdentityType(pkiID) +} + func configureAdapter(adapter *gossipAdapterMock, members ...discovery.NetworkMember) { adapter.On("GetConf").Return(conf) adapter.On("GetMembership").Return(members) adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(orgInChannelA) + adapter.On("GetOrgOfPeer", pkiIDInOrg1ButNotEligible).Return(orgInChannelA) adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA) adapter.On("GetOrgOfPeer", mock.Anything).Return(api.OrgIdentityType(nil)) } @@ -312,6 +322,8 @@ func TestChannelPeerNotInChannel(t *testing.T) { gossipMessagesSentFromChannel <- msg } // First, ensure it does that for pull messages from peers that are in the channel + // Let the peer first publish it is in the channel + gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) helloMsg := createHelloMsg(pkiIDInOrg1) helloMsg.On("Respond", mock.Anything).Run(messageRelayer) gc.HandleMessage(helloMsg) @@ -330,6 +342,23 @@ func TestChannelPeerNotInChannel(t *testing.T) { case <-time.After(time.Second * 1): } + // Now for a more advanced scenario- the peer claims to be in the right org, and also claims to be in the channel + // but the MSP declares it is not eligible for the channel + // pkiIDInOrg1ButNotEligible + gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible}) + cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible")) + cs.mocked = true + helloMsg = createHelloMsg(pkiIDInOrg1ButNotEligible) + helloMsg.On("Respond", mock.Anything).Run(messageRelayer) + gc.HandleMessage(helloMsg) + select { + case <-gossipMessagesSentFromChannel: + t.Fatal("Responded with digest, but shouldn't have since peer is not eligible for the channel") + case <-time.After(time.Second * 1): + } + + cs.Mock = mock.Mock{} + // Ensure we respond to a valid StateInfoRequest req := gc.(*gossipChannel).createStateInfoRequest() validReceivedMsg := &receivedMsg{ @@ -404,7 +433,7 @@ func TestChannelIsSubscribed(t *testing.T) { adapter.On("Send", mock.Anything, mock.Anything) adapter.On("DeMultiplex", mock.Anything) gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) - assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1})) + assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1})) } func TestChannelAddToMessageStore(t *testing.T) { @@ -456,7 +485,7 @@ func TestChannelAddToMessageStore(t *testing.T) { } gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) - assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1})) + assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1})) } func TestChannelBadBlocks(t *testing.T) { @@ -782,7 +811,38 @@ func TestChannelReconfigureChannel(t *testing.T) { t.Fatal("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel") case <-time.After(time.Second * 1): } +} + +func TestChannelGetPeers(t *testing.T) { + t.Parallel() + // Scenario: We have a peer in an org, and the peer is notified that several peers + // exist, and some of them: + // (1) Join its channel, and are eligible for receiving blocks. + // (2) Join its channel, but are not eligible for receiving blocks (MSP doesn't allow this). + // (3) Say they join its channel, but are actually from an org that is not in the channel. + // The GetPeers query should only return peers that belong to the first group. + cs := &cryptoService{} + adapter := new(gossipAdapterMock) + adapter.On("Gossip", mock.Anything) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + members := []discovery.NetworkMember{ + {PKIid: pkiIDInOrg1}, + {PKIid: pkiIDInOrg1ButNotEligible}, + {PKIid: pkiIDinOrg2}, + } + configureAdapter(adapter, members...) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)}) + assert.Len(t, gc.GetPeers(), 1) + assert.Equal(t, pkiIDInOrg1, gc.GetPeers()[0].PKIid) + + gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible}) + cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible")) + cs.mocked = true + assert.Len(t, gc.GetPeers(), 0) } func createDataUpdateMsg(nonce uint64) *proto.SignedGossipMessage { diff --git a/gossip/gossip/chanstate.go b/gossip/gossip/chanstate.go index 42b305d2efb..4b0115802d4 100644 --- a/gossip/gossip/chanstate.go +++ b/gossip/gossip/chanstate.go @@ -113,3 +113,13 @@ func (ga *gossipAdapterImpl) OrgByPeerIdentity(identity api.PeerIdentityType) ap func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType { return ga.gossipServiceImpl.getOrgOfPeer(PKIID) } + +// GetIdentityByPKIID returns an identity of a peer with a certain +// pkiID, or nil if not found +func (ga *gossipAdapterImpl) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType { + identity, err := ga.idMapper.Get(pkiID) + if err != nil { + return nil + } + return identity +} diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 3952c751b4e..fc7c6997c41 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -433,7 +433,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { // Gossip blocks blocks, msgs = partitionMessages(isABlock, msgs) g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter { - return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg) + return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg) }) // Gossip StateInfo messages @@ -445,7 +445,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) { // Gossip Leadership messages leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs) g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter { - return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg) + return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg) }) // Gossip messages restricted to our org diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index 3f3030168d0..77a7c763ad1 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -18,6 +18,7 @@ package gossip import ( "bytes" + "errors" "fmt" "runtime" "strconv" @@ -87,6 +88,7 @@ func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { } type naiveCryptoService struct { + allowedPkiIDS map[string]struct{} } type orgCryptoService struct { @@ -106,8 +108,14 @@ func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { // VerifyByChannel verifies a peer's signature on a message in the context // of a specific channel -func (*naiveCryptoService) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityType, _, _ []byte) error { - return nil +func (cs *naiveCryptoService) VerifyByChannel(_ common.ChainID, identity api.PeerIdentityType, _, _ []byte) error { + if cs.allowedPkiIDS == nil { + return nil + } + if _, allowed := cs.allowedPkiIDS[string(identity)]; allowed { + return nil + } + return errors.New("Forbidden") } func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { @@ -152,7 +160,7 @@ func bootPeers(portPrefix int, ids ...int) []string { return peers } -func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip { +func newGossipInstanceWithCustomMCS(portPrefix int, id int, maxMsgCount int, mcs api.MessageCryptoService, boot ...int) Gossip { port := id + portPrefix conf := &Config{ BindPort: port, @@ -171,14 +179,17 @@ func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gos PublishStateInfoInterval: time.Duration(1) * time.Second, RequestStateInfoInterval: time.Duration(1) * time.Second, } - cryptoService := &naiveCryptoService{} - idMapper := identity.NewIdentityMapper(cryptoService) - g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper, api.PeerIdentityType(conf.InternalEndpoint)) + idMapper := identity.NewIdentityMapper(mcs) + g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint)) return g } +func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip { + return newGossipInstanceWithCustomMCS(portPrefix, id, maxMsgCount, &naiveCryptoService{}, boot...) +} + func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip { port := id + portPrefix conf := &Config{ @@ -658,9 +669,27 @@ func TestDataLeakage(t *testing.T) { // Scenario: spawn some nodes and let them all // establish full membership. // Then, have half be in channel A and half be in channel B. - // Ensure nodes only get messages of their channels. + // However, make it so that only the first 3 from each channel + // are eligible to obtain blocks from the channels they're in. + // Ensure nodes only get messages of their channels and in case they + // are eligible for the channels. totalPeers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} // THIS MUST BE EVEN AND NOT ODD + // Peer0 and Peer5 disseminate blocks + // only 1,2 and 6,7 should get blocks. + + mcs := &naiveCryptoService{ + allowedPkiIDS: map[string]struct{}{ + // Channel A + "localhost:1610": {}, + "localhost:1611": {}, + "localhost:1612": {}, + // Channel B + "localhost:1615": {}, + "localhost:1616": {}, + "localhost:1617": {}, + }, + } stopped := int32(0) go waitForTestCompletion(&stopped, t) @@ -673,7 +702,7 @@ func TestDataLeakage(t *testing.T) { go func(i int) { totPeers := append([]int(nil), totalPeers[:i]...) bootPeers := append(totPeers, totalPeers[i+1:]...) - peers[i] = newGossipInstance(portPrefix, i, 100, bootPeers...) + peers[i] = newGossipInstanceWithCustomMCS(portPrefix, i, 100, mcs, bootPeers...) wg.Done() }(i) } @@ -683,26 +712,30 @@ func TestDataLeakage(t *testing.T) { channels := []common.ChainID{common.ChainID("A"), common.ChainID("B")} + channelAmetadata := []byte("some metadata of channel A") + channelBmetadata := []byte("some metadata on channel B") + for i, channel := range channels { for j := 0; j < (n / 2); j++ { instanceIndex := (n/2)*i + j peers[instanceIndex].JoinChan(&joinChanMsg{}, channel) + var metadata []byte + if i == 0 { + metadata = channelAmetadata + } else { + metadata = channelBmetadata + } + peers[instanceIndex].UpdateChannelMetadata(metadata, channel) t.Log(instanceIndex, "joined", string(channel)) } } - channelAmetadata := []byte("some metadata of channel A") - channelBmetadata := []byte("some metadata on channel B") - - peers[0].UpdateChannelMetadata(channelAmetadata, channels[0]) - peers[n/2].UpdateChannelMetadata(channelBmetadata, channels[1]) - - // Wait until all peers have at least 1 peer in the per-channel view + // Wait until all peers have other peers in the per-channel view seeChannelMetadata := func() bool { for i, channel := range channels { - for j := 1; j < (n / 2); j++ { + for j := 0; j < 3; j++ { instanceIndex := (n/2)*i + j - if len(peers[instanceIndex].PeersOfChannel(channel)) == 0 { + if len(peers[instanceIndex].PeersOfChannel(channel)) < 2 { return false } } @@ -711,12 +744,12 @@ func TestDataLeakage(t *testing.T) { } t1 := time.Now() waitUntilOrFail(t, seeChannelMetadata) - t.Log("Metadata sync took", time.Since(t1)) + t.Log("Metadata sync took", time.Since(t1)) for i, channel := range channels { - for j := 1; j < (n / 2); j++ { + for j := 0; j < 3; j++ { instanceIndex := (n/2)*i + j - assert.Len(t, peers[instanceIndex].PeersOfChannel(channel), 1) + assert.Len(t, peers[instanceIndex].PeersOfChannel(channel), 2) if i == 0 { assert.Equal(t, channelAmetadata, peers[instanceIndex].PeersOfChannel(channel)[0].Metadata) } else { @@ -727,9 +760,9 @@ func TestDataLeakage(t *testing.T) { gotMessages := func() { var wg sync.WaitGroup - wg.Add(n - 2) + wg.Add(4) for i, channel := range channels { - for j := 1; j < (n / 2); j++ { + for j := 1; j < 3; j++ { instanceIndex := (n/2)*i + j go func(instanceIndex int, channel common.ChainID) { incMsgChan, _ := peers[instanceIndex].Accept(acceptData, false) diff --git a/gossip/util/msgs.go b/gossip/util/msgs.go index cdc1ea46397..726b67144e4 100644 --- a/gossip/util/msgs.go +++ b/gossip/util/msgs.go @@ -17,37 +17,60 @@ limitations under the License. package util import ( + "sync" + "github.com/hyperledger/fabric/gossip/common" proto "github.com/hyperledger/fabric/protos/gossip" ) -type MembershipStore map[string]*proto.SignedGossipMessage +type MembershipStore struct { + m map[string]*proto.SignedGossipMessage + sync.RWMutex +} + +func NewMembershipStore() *MembershipStore { + return &MembershipStore{m: make(map[string]*proto.SignedGossipMessage)} +} // msgByID returns a message stored by a certain ID, or nil // if such an ID isn't found -func (m MembershipStore) MsgByID(pkiID common.PKIidType) *proto.SignedGossipMessage { - if msg, exists := m[string(pkiID)]; exists { +func (m *MembershipStore) MsgByID(pkiID common.PKIidType) *proto.SignedGossipMessage { + m.RLock() + defer m.RUnlock() + if msg, exists := m.m[string(pkiID)]; exists { return msg } return nil } +func (m *MembershipStore) Size() int { + m.RLock() + defer m.RUnlock() + return len(m.m) +} + // Put associates msg with the given pkiID -func (m MembershipStore) Put(pkiID common.PKIidType, msg *proto.SignedGossipMessage) { - m[string(pkiID)] = msg +func (m *MembershipStore) Put(pkiID common.PKIidType, msg *proto.SignedGossipMessage) { + m.Lock() + defer m.Unlock() + m.m[string(pkiID)] = msg } // Remove removes a message with a given pkiID -func (m MembershipStore) Remove(pkiID common.PKIidType) { - delete(m, string(pkiID)) +func (m *MembershipStore) Remove(pkiID common.PKIidType) { + m.Lock() + defer m.Unlock() + delete(m.m, string(pkiID)) } // ToSlice returns a slice backed by the elements // of the MembershipStore -func (m MembershipStore) ToSlice() []*proto.SignedGossipMessage { - members := make([]*proto.SignedGossipMessage, len(m)) +func (m *MembershipStore) ToSlice() []*proto.SignedGossipMessage { + m.RLock() + defer m.RUnlock() + members := make([]*proto.SignedGossipMessage, len(m.m)) i := 0 - for _, member := range m { + for _, member := range m.m { members[i] = member i++ } diff --git a/gossip/util/msgs_test.go b/gossip/util/msgs_test.go index 14f961cd027..01e2098897c 100644 --- a/gossip/util/msgs_test.go +++ b/gossip/util/msgs_test.go @@ -25,7 +25,7 @@ import ( ) func TestMembershipStore(t *testing.T) { - membershipStore := make(MembershipStore, 0) + membershipStore := NewMembershipStore() id1 := common.PKIidType("id1") id2 := common.PKIidType("id2") @@ -35,7 +35,7 @@ func TestMembershipStore(t *testing.T) { // Test initially created store is empty assert.Nil(t, membershipStore.MsgByID(id1)) - assert.Len(t, membershipStore, 0) + assert.Equal(t, membershipStore.Size(), 0) // Test put works as expected membershipStore.Put(id1, msg1) assert.NotNil(t, membershipStore.MsgByID(id1)) @@ -44,11 +44,11 @@ func TestMembershipStore(t *testing.T) { assert.Equal(t, msg1, membershipStore.MsgByID(id1)) assert.NotEqual(t, msg2, membershipStore.MsgByID(id1)) // Test capacity grows - assert.Len(t, membershipStore, 2) + assert.Equal(t, membershipStore.Size(), 2) // Test remove works membershipStore.Remove(id1) assert.Nil(t, membershipStore.MsgByID(id1)) - assert.Len(t, membershipStore, 1) + assert.Equal(t, membershipStore.Size(), 1) // Test returned instance is not a copy msg3 := &proto.SignedGossipMessage{GossipMessage: &proto.GossipMessage{}} msg3Clone := &proto.SignedGossipMessage{GossipMessage: &proto.GossipMessage{}} @@ -60,7 +60,7 @@ func TestMembershipStore(t *testing.T) { } func TestToSlice(t *testing.T) { - membershipStore := make(MembershipStore, 0) + membershipStore := NewMembershipStore() id1 := common.PKIidType("id1") id2 := common.PKIidType("id2") id3 := common.PKIidType("id3")