Skip to content

Commit

Permalink
[FAB-17523] Endorsing peer was not honoring RequiredPeerCount (hyperl…
Browse files Browse the repository at this point in the history
…edger#716) (hyperledger#733)

If there were not enough known eligible peers to meet a private data
collection RequiredPeerCount, endorsement was succeeding rather
than returning an error.

This was a regression caused by FAB-15389 in v1.4.4.
FAB-15389 shifted entirely to per-peer dissemination where required
peers were tagged up front before dissemination, however there was
no overall check to ensure that RequiredPeerCount was met. Prior
to FAB-15389, the check was handled deeper in the gossip dissemination.
Gossip still checks that each selected required peer disseminates
the private data.

This fix adds the overall RequiredPeerCount check for cases where
there are not enough known eligible peers.

Also, improvements are made to variable names, error messages, and
comments to make dissemination code easier to understand.

Signed-off-by: David Enyeart <enyeart@us.ibm.com>
  • Loading branch information
denyeart authored and C0rWin committed Sep 15, 2020
1 parent e3b38b1 commit d13ec6e
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 47 deletions.
2 changes: 1 addition & 1 deletion gossip/api/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (pis PeerIdentitySet) ByOrg() map[string]PeerIdentitySet {
return m
}

// ByOrg sorts the PeerIdentitySet by PKI-IDs of its peers
// ByID sorts the PeerIdentitySet by PKI-IDs of its peers
func (pis PeerIdentitySet) ByID() map[string]PeerIdentityInfo {
m := make(map[string]PeerIdentityInfo)
for _, id := range pis {
Expand Down
100 changes: 58 additions & 42 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
if err != nil {
return nil, errors.WithStack(err)
return nil, errors.WithMessage(err, fmt.Sprint("could not build private data dissemination plan for chaincode ", namespace, " and collection ", collectionName))
}
disseminationPlan = append(disseminationPlan, dPlan...)
}
Expand Down Expand Up @@ -209,8 +209,17 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
m := pvtDataMsg.GetPrivateData().Payload

eligiblePeers := d.eligiblePeersOfChannel(routingFilter)
identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP)

// With the shift to per peer dissemination in FAB-15389, we must first check
// that there are enough eligible peers to satisfy RequiredPeerCount.
if (len(eligiblePeers)) < colAP.RequiredPeerCount() {
return nil, errors.Errorf("required to disseminate to at least %d peers, but know of only %d eligible peers", colAP.RequiredPeerCount(), len(eligiblePeers))
}

// Group eligible peers by org so that we can disseminate across orgs first
identitySetsByOrg := d.identitiesOfEligiblePeersByOrg(eligiblePeers, colAP)

// peerEndpoints are used for dissemination debug only
peerEndpoints := map[string]string{}
for _, peer := range eligiblePeers {
epToAdd := peer.Endpoint
Expand All @@ -220,28 +229,35 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
peerEndpoints[string(peer.PKIid)] = epToAdd
}

maximumPeerCount := colAP.MaximumPeerCount()
requiredPeerCount := colAP.RequiredPeerCount()
// Initialize maximumPeerRemainingCount and requiredPeerRemainingCount,
// these will be decremented until we've selected enough peers for dissemination
maximumPeerRemainingCount := colAP.MaximumPeerCount()
requiredPeerRemainingCount := colAP.RequiredPeerCount()

remainingPeers := []api.PeerIdentityInfo{}
selectedPeerEndpoints := []string{}
remainingPeersAcrossOrgs := []api.PeerIdentityInfo{}
selectedPeerEndpointsForDebug := []string{}

rand.Seed(time.Now().Unix())
// Select one representative from each org
if maximumPeerCount > 0 {
for _, selectionPeers := range identitySets {
required := 1
if requiredPeerCount == 0 {
required = 0

// PHASE 1 - Select one peer from each eligible org
if maximumPeerRemainingCount > 0 {
for _, selectionPeersForOrg := range identitySetsByOrg {

// Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination.
// TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount.
acksRequired := 1
if requiredPeerRemainingCount == 0 {
acksRequired = 0
}
selectedPeerIndex := rand.Intn(len(selectionPeers))
peer2SendPerOrg := selectionPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)])

selectedPeerIndex := rand.Intn(len(selectionPeersForOrg))
peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex]
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)])
sc := gossip2.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: 1,
MinAck: required,
MinAck: acksRequired,
IsEligible: func(member discovery.NetworkMember) bool {
return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId)
},
Expand All @@ -254,43 +270,42 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
},
})

// Add unselected peers to remainingPeers
for i, peer := range selectionPeers {
// Add unselected peers to remainingPeersAcrossOrgs
for i, peer := range selectionPeersForOrg {
if i != selectedPeerIndex {
remainingPeers = append(remainingPeers, peer)
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, peer)
}
}

if requiredPeerCount > 0 {
requiredPeerCount--
if requiredPeerRemainingCount > 0 {
requiredPeerRemainingCount--
}

maximumPeerCount--
if maximumPeerCount == 0 {
maximumPeerRemainingCount--
if maximumPeerRemainingCount == 0 {
logger.Debug("MaximumPeerCount satisfied")
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
return disseminationPlan, nil
}
}
}

// criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still
// unselected peers remaining for dissemination
numPeersToSelect := maximumPeerCount
if len(remainingPeers) < maximumPeerCount {
numPeersToSelect = len(remainingPeers)
// PHASE 2 - Select additional peers to satisfy colAP.MaximumPeerCount() if there are still peers in the remainingPeersAcrossOrgs pool
numRemainingPeersToSelect := maximumPeerRemainingCount
if len(remainingPeersAcrossOrgs) < maximumPeerRemainingCount {
numRemainingPeersToSelect = len(remainingPeersAcrossOrgs)
}
if numPeersToSelect > 0 {
logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect)
if numRemainingPeersToSelect > 0 {
logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect)
}
for maximumPeerCount > 0 && len(remainingPeers) > 0 {
for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 {
required := 1
if requiredPeerCount == 0 {
if requiredPeerRemainingCount == 0 {
required = 0
}
selectedPeerIndex := rand.Intn(len(remainingPeers))
peer2Send := remainingPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)])
selectedPeerIndex := rand.Intn(len(remainingPeersAcrossOrgs))
peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex]
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)])
sc := gossip2.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChainID(d.chainID),
Expand All @@ -307,21 +322,22 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
GossipMessage: proto2.Clone(pvtDataMsg.GossipMessage).(*proto.GossipMessage),
},
})
if requiredPeerCount > 0 {
requiredPeerCount--
if requiredPeerRemainingCount > 0 {
requiredPeerRemainingCount--
}

maximumPeerCount--
maximumPeerRemainingCount--

// remove the selected peer from remaining peers
remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...)
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...)
}

logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
return disseminationPlan, nil
}

func (d *distributorImpl) identitiesOfEligiblePeers(eligiblePeers []discovery.NetworkMember, colAP privdata.CollectionAccessPolicy) map[string]api.PeerIdentitySet {
// identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid
func (d *distributorImpl) identitiesOfEligiblePeersByOrg(eligiblePeers []discovery.NetworkMember, colAP privdata.CollectionAccessPolicy) map[string]api.PeerIdentitySet {
return d.gossipAdapter.IdentityInfo().
Filter(func(info api.PeerIdentityInfo) bool {
for _, orgID := range colAP.MemberOrgs() {
Expand Down
4 changes: 3 additions & 1 deletion gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package service

import (
"fmt"
"sync"

"github.com/hyperledger/fabric/common/metrics"
Expand Down Expand Up @@ -232,7 +233,8 @@ func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, p
}

if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil {
logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
err := errors.WithMessage(err, fmt.Sprint("failed to distribute private collection, txID ", txID, ", channel %s", chainID))
logger.Error(err)
return err
}

Expand Down
43 changes: 41 additions & 2 deletions integration/pvtdata/pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var _ bool = Describe("PrivateData", func() {

JustBeforeEach(func() {
process, orderer, expectedPeers = startNetwork(network)

By("installing and instantiating chaincode on all peers")
chaincode := nwo.Chaincode{
Name: "marblesp",
Expand All @@ -85,16 +86,47 @@ var _ bool = Describe("PrivateData", func() {

By("waiting for block to propagate")
waitUntilAllPeersSameLedgerHeight(network, expectedPeers, "testchannel", getLedgerHeight(network, network.Peer("org1", "peer0"), "testchannel"))

})

AfterEach(func() {
testCleanup(testDir, network, process)
})

It("verifies private data was disseminated", func() {
By("verify access of initial setup")
It("verifies private data was disseminated (positive test) and verifies endorsement fails when dissemination RequiredPeerCount not met (negative test)", func() {

By("verify access of initial setup (positive test)")
verifyAccessInitialSetup(network)

// Integration test structure refactored in future releases. This is a backported integration test from master,
// hence it doesn't necessarily align with other integration tests in this file that have not yet been refactored.
By("deploying chaincode with RequiredPeerCount greater than number of peers, endorsement will fail (negative test)")
testChaincodeHighRequiredPeerCount := nwo.Chaincode{
Name: "marblespHighRequiredPeerCount",
Version: "1.0",
Path: "github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd",
Ctor: `{"Args":["init"]}`,
Policy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member')`,
CollectionsConfig: filepath.Join("testdata", "collection_configs", "collections_config8_high_requiredPeerCount.json")}

nwo.DeployChaincode(network, "testchannel", orderer, testChaincodeHighRequiredPeerCount)

// attempt to add a marble with insufficient dissemination to meet RequiredPeerCount
command := commands.ChaincodeInvoke{
ChannelID: "testchannel",
Orderer: network.OrdererAddress(orderer, nwo.ListenPort),
Name: testChaincodeHighRequiredPeerCount.Name,
Ctor: fmt.Sprintf(`{"Args":["initMarble","marble1","blue","35","tom","99"]}`),
PeerAddresses: []string{
network.PeerAddress(network.Peer("org1", "peer0"), nwo.ListenPort),
},
WaitForEvent: true,
}
expectedErrMsg := `Error: endorsement failure during invoke. response: status:500 message:"failed to distribute private collection`
invokeChaincodeExpectErr(network, network.Peer("org1", "peer0"), command, expectedErrMsg)

})

})

Describe("reconciliation and pulling", func() {
Expand Down Expand Up @@ -938,6 +970,13 @@ func invokeChaincode(n *nwo.Network, org string, peer string, ccname string, arg
Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful."))
}

func invokeChaincodeExpectErr(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeInvoke, expectedErrMsg string) {
sess, err := n.PeerUserSession(peer, "User1", command)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(1))
Expect(sess.Err).To(gbytes.Say(expectedErrMsg))
}

func getLedgerHeight(n *nwo.Network, peer *nwo.Peer, channelName string) int {
sess, err := n.PeerUserSession(peer, "User1", commands.ChannelInfo{
ChannelID: channelName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"name": "collectionMarbles",
"policy": "OR('Org1MSP.member', 'Org2MSP.member')",
"requiredPeerCount": 10,
"maxPeerCount": 10,
"blockToLive":1000000,
"memberOnlyRead": false
},
{
"name": "collectionMarblePrivateDetails",
"policy": "OR('Org2MSP.member', 'Org3MSP.member')",
"requiredPeerCount": 10,
"maxPeerCount": 10,
"blockToLive":1000000,
"memberOnlyRead": false
}
]
2 changes: 1 addition & 1 deletion integration/sbe/testdata/collection_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
"name": "col",
"policy": "OR('Org1MSP.peer', 'Org2MSP.peer')",
"requiredPeerCount": 2,
"requiredPeerCount": 1,
"maxPeerCount": 2,
"blockToLive":1000000
}
Expand Down

0 comments on commit d13ec6e

Please sign in to comment.