diff --git a/gossip/api/crypto.go b/gossip/api/crypto.go index 3a10496061e..0036d46703a 100644 --- a/gossip/api/crypto.go +++ b/gossip/api/crypto.go @@ -88,7 +88,7 @@ func (pis PeerIdentitySet) ByOrg() map[string]PeerIdentitySet { return m } -// ByOrg sorts the PeerIdentitySet by PKI-IDs of its peers +// ByID sorts the PeerIdentitySet by PKI-IDs of its peers func (pis PeerIdentitySet) ByID() map[string]PeerIdentityInfo { m := make(map[string]PeerIdentityInfo) for _, id := range pis { diff --git a/gossip/privdata/distributor.go b/gossip/privdata/distributor.go index 23fc92cbf8b..b6b45928a52 100644 --- a/gossip/privdata/distributor.go +++ b/gossip/privdata/distributor.go @@ -171,7 +171,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, logger.Debugf("Computing dissemination plan for collection [%s]", collectionName) dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg) if err != nil { - return nil, errors.WithStack(err) + return nil, errors.WithMessage(err, fmt.Sprint("could not build private data dissemination plan for chaincode ", namespace, " and collection ", collectionName)) } disseminationPlan = append(disseminationPlan, dPlan...) } @@ -209,8 +209,17 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces m := pvtDataMsg.GetPrivateData().Payload eligiblePeers := d.eligiblePeersOfChannel(routingFilter) - identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP) + // With the shift to per peer dissemination in FAB-15389, we must first check + // that there are enough eligible peers to satisfy RequiredPeerCount. + if (len(eligiblePeers)) < colAP.RequiredPeerCount() { + return nil, errors.Errorf("required to disseminate to at least %d peers, but know of only %d eligible peers", colAP.RequiredPeerCount(), len(eligiblePeers)) + } + + // Group eligible peers by org so that we can disseminate across orgs first + identitySetsByOrg := d.identitiesOfEligiblePeersByOrg(eligiblePeers, colAP) + + // peerEndpoints are used for dissemination debug only peerEndpoints := map[string]string{} for _, peer := range eligiblePeers { epToAdd := peer.Endpoint @@ -220,28 +229,35 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces peerEndpoints[string(peer.PKIid)] = epToAdd } - maximumPeerCount := colAP.MaximumPeerCount() - requiredPeerCount := colAP.RequiredPeerCount() + // Initialize maximumPeerRemainingCount and requiredPeerRemainingCount, + // these will be decremented until we've selected enough peers for dissemination + maximumPeerRemainingCount := colAP.MaximumPeerCount() + requiredPeerRemainingCount := colAP.RequiredPeerCount() - remainingPeers := []api.PeerIdentityInfo{} - selectedPeerEndpoints := []string{} + remainingPeersAcrossOrgs := []api.PeerIdentityInfo{} + selectedPeerEndpointsForDebug := []string{} rand.Seed(time.Now().Unix()) - // Select one representative from each org - if maximumPeerCount > 0 { - for _, selectionPeers := range identitySets { - required := 1 - if requiredPeerCount == 0 { - required = 0 + + // PHASE 1 - Select one peer from each eligible org + if maximumPeerRemainingCount > 0 { + for _, selectionPeersForOrg := range identitySetsByOrg { + + // Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination. + // TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount. + acksRequired := 1 + if requiredPeerRemainingCount == 0 { + acksRequired = 0 } - selectedPeerIndex := rand.Intn(len(selectionPeers)) - peer2SendPerOrg := selectionPeers[selectedPeerIndex] - selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)]) + + selectedPeerIndex := rand.Intn(len(selectionPeersForOrg)) + peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex] + selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)]) sc := gossip2.SendCriteria{ Timeout: d.pushAckTimeout, Channel: gossipCommon.ChainID(d.chainID), MaxPeers: 1, - MinAck: required, + MinAck: acksRequired, IsEligible: func(member discovery.NetworkMember) bool { return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId) }, @@ -254,43 +270,42 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces }, }) - // Add unselected peers to remainingPeers - for i, peer := range selectionPeers { + // Add unselected peers to remainingPeersAcrossOrgs + for i, peer := range selectionPeersForOrg { if i != selectedPeerIndex { - remainingPeers = append(remainingPeers, peer) + remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, peer) } } - if requiredPeerCount > 0 { - requiredPeerCount-- + if requiredPeerRemainingCount > 0 { + requiredPeerRemainingCount-- } - maximumPeerCount-- - if maximumPeerCount == 0 { + maximumPeerRemainingCount-- + if maximumPeerRemainingCount == 0 { logger.Debug("MaximumPeerCount satisfied") - logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints) + logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) return disseminationPlan, nil } } } - // criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still - // unselected peers remaining for dissemination - numPeersToSelect := maximumPeerCount - if len(remainingPeers) < maximumPeerCount { - numPeersToSelect = len(remainingPeers) + // PHASE 2 - Select additional peers to satisfy colAP.MaximumPeerCount() if there are still peers in the remainingPeersAcrossOrgs pool + numRemainingPeersToSelect := maximumPeerRemainingCount + if len(remainingPeersAcrossOrgs) < maximumPeerRemainingCount { + numRemainingPeersToSelect = len(remainingPeersAcrossOrgs) } - if numPeersToSelect > 0 { - logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect) + if numRemainingPeersToSelect > 0 { + logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect) } - for maximumPeerCount > 0 && len(remainingPeers) > 0 { + for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 { required := 1 - if requiredPeerCount == 0 { + if requiredPeerRemainingCount == 0 { required = 0 } - selectedPeerIndex := rand.Intn(len(remainingPeers)) - peer2Send := remainingPeers[selectedPeerIndex] - selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)]) + selectedPeerIndex := rand.Intn(len(remainingPeersAcrossOrgs)) + peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex] + selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)]) sc := gossip2.SendCriteria{ Timeout: d.pushAckTimeout, Channel: gossipCommon.ChainID(d.chainID), @@ -307,21 +322,22 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces GossipMessage: proto2.Clone(pvtDataMsg.GossipMessage).(*proto.GossipMessage), }, }) - if requiredPeerCount > 0 { - requiredPeerCount-- + if requiredPeerRemainingCount > 0 { + requiredPeerRemainingCount-- } - maximumPeerCount-- + maximumPeerRemainingCount-- // remove the selected peer from remaining peers - remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...) + remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...) } - logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints) + logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) return disseminationPlan, nil } -func (d *distributorImpl) identitiesOfEligiblePeers(eligiblePeers []discovery.NetworkMember, colAP privdata.CollectionAccessPolicy) map[string]api.PeerIdentitySet { +// identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid +func (d *distributorImpl) identitiesOfEligiblePeersByOrg(eligiblePeers []discovery.NetworkMember, colAP privdata.CollectionAccessPolicy) map[string]api.PeerIdentitySet { return d.gossipAdapter.IdentityInfo(). Filter(func(info api.PeerIdentityInfo) bool { for _, orgID := range colAP.MemberOrgs() { diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index a98765f4250..eba46bd38df 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package service import ( + "fmt" "sync" "github.com/hyperledger/fabric/common/metrics" @@ -232,7 +233,8 @@ func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, p } if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil { - logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err) + err := errors.WithMessage(err, fmt.Sprint("failed to distribute private collection, txID ", txID, ", channel %s", chainID)) + logger.Error(err) return err } diff --git a/integration/pvtdata/pvtdata_test.go b/integration/pvtdata/pvtdata_test.go index eb4de57d58f..1c83e872f7a 100644 --- a/integration/pvtdata/pvtdata_test.go +++ b/integration/pvtdata/pvtdata_test.go @@ -70,6 +70,7 @@ var _ bool = Describe("PrivateData", func() { JustBeforeEach(func() { process, orderer, expectedPeers = startNetwork(network) + By("installing and instantiating chaincode on all peers") chaincode := nwo.Chaincode{ Name: "marblesp", @@ -85,16 +86,47 @@ var _ bool = Describe("PrivateData", func() { By("waiting for block to propagate") waitUntilAllPeersSameLedgerHeight(network, expectedPeers, "testchannel", getLedgerHeight(network, network.Peer("org1", "peer0"), "testchannel")) + }) AfterEach(func() { testCleanup(testDir, network, process) }) - It("verifies private data was disseminated", func() { - By("verify access of initial setup") + It("verifies private data was disseminated (positive test) and verifies endorsement fails when dissemination RequiredPeerCount not met (negative test)", func() { + + By("verify access of initial setup (positive test)") verifyAccessInitialSetup(network) + + // Integration test structure refactored in future releases. This is a backported integration test from master, + // hence it doesn't necessarily align with other integration tests in this file that have not yet been refactored. + By("deploying chaincode with RequiredPeerCount greater than number of peers, endorsement will fail (negative test)") + testChaincodeHighRequiredPeerCount := nwo.Chaincode{ + Name: "marblespHighRequiredPeerCount", + Version: "1.0", + Path: "github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd", + Ctor: `{"Args":["init"]}`, + Policy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member')`, + CollectionsConfig: filepath.Join("testdata", "collection_configs", "collections_config8_high_requiredPeerCount.json")} + + nwo.DeployChaincode(network, "testchannel", orderer, testChaincodeHighRequiredPeerCount) + + // attempt to add a marble with insufficient dissemination to meet RequiredPeerCount + command := commands.ChaincodeInvoke{ + ChannelID: "testchannel", + Orderer: network.OrdererAddress(orderer, nwo.ListenPort), + Name: testChaincodeHighRequiredPeerCount.Name, + Ctor: fmt.Sprintf(`{"Args":["initMarble","marble1","blue","35","tom","99"]}`), + PeerAddresses: []string{ + network.PeerAddress(network.Peer("org1", "peer0"), nwo.ListenPort), + }, + WaitForEvent: true, + } + expectedErrMsg := `Error: endorsement failure during invoke. response: status:500 message:"failed to distribute private collection` + invokeChaincodeExpectErr(network, network.Peer("org1", "peer0"), command, expectedErrMsg) + }) + }) Describe("reconciliation and pulling", func() { @@ -938,6 +970,13 @@ func invokeChaincode(n *nwo.Network, org string, peer string, ccname string, arg Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful.")) } +func invokeChaincodeExpectErr(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeInvoke, expectedErrMsg string) { + sess, err := n.PeerUserSession(peer, "User1", command) + Expect(err).NotTo(HaveOccurred()) + Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(1)) + Expect(sess.Err).To(gbytes.Say(expectedErrMsg)) +} + func getLedgerHeight(n *nwo.Network, peer *nwo.Peer, channelName string) int { sess, err := n.PeerUserSession(peer, "User1", commands.ChannelInfo{ ChannelID: channelName, diff --git a/integration/pvtdata/testdata/collection_configs/collections_config8_high_requiredPeerCount.json b/integration/pvtdata/testdata/collection_configs/collections_config8_high_requiredPeerCount.json new file mode 100644 index 00000000000..69cdc3439e6 --- /dev/null +++ b/integration/pvtdata/testdata/collection_configs/collections_config8_high_requiredPeerCount.json @@ -0,0 +1,18 @@ +[ + { + "name": "collectionMarbles", + "policy": "OR('Org1MSP.member', 'Org2MSP.member')", + "requiredPeerCount": 10, + "maxPeerCount": 10, + "blockToLive":1000000, + "memberOnlyRead": false + }, + { + "name": "collectionMarblePrivateDetails", + "policy": "OR('Org2MSP.member', 'Org3MSP.member')", + "requiredPeerCount": 10, + "maxPeerCount": 10, + "blockToLive":1000000, + "memberOnlyRead": false + } +] diff --git a/integration/sbe/testdata/collection_config.json b/integration/sbe/testdata/collection_config.json index f2214a84f08..f6c2a8fd566 100644 --- a/integration/sbe/testdata/collection_config.json +++ b/integration/sbe/testdata/collection_config.json @@ -2,7 +2,7 @@ { "name": "col", "policy": "OR('Org1MSP.peer', 'Org2MSP.peer')", - "requiredPeerCount": 2, + "requiredPeerCount": 1, "maxPeerCount": 2, "blockToLive":1000000 }