diff --git a/gossip/api/crypto.go b/gossip/api/crypto.go index 6a54885c086..e4817ff5a89 100644 --- a/gossip/api/crypto.go +++ b/gossip/api/crypto.go @@ -91,7 +91,7 @@ func (pis PeerIdentitySet) ByOrg() map[string]PeerIdentitySet { return m } -// ByOrg sorts the PeerIdentitySet by PKI-IDs of its peers +// ByID sorts the PeerIdentitySet by PKI-IDs of its peers func (pis PeerIdentitySet) ByID() map[string]PeerIdentityInfo { m := make(map[string]PeerIdentityInfo) for _, id := range pis { diff --git a/gossip/privdata/distributor.go b/gossip/privdata/distributor.go index a21fb7db9e1..7fc8b7f0f68 100644 --- a/gossip/privdata/distributor.go +++ b/gossip/privdata/distributor.go @@ -181,7 +181,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, logger.Debugf("Computing dissemination plan for collection [%s]", collectionName) dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg) if err != nil { - return nil, errors.WithStack(err) + return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName) } disseminationPlan = append(disseminationPlan, dPlan...) } @@ -219,8 +219,17 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces m := pvtDataMsg.GetPrivateData().Payload eligiblePeers := d.eligiblePeersOfChannel(routingFilter) - identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP) + // With the shift to per peer dissemination in FAB-15389, we must first check + // that there are enough eligible peers to satisfy RequiredPeerCount. + if (len(eligiblePeers)) < colAP.RequiredPeerCount() { + return nil, errors.Errorf("required to disseminate to at least %d peers, but know of only %d eligible peers", colAP.RequiredPeerCount(), len(eligiblePeers)) + } + + // Group eligible peers by org so that we can disseminate across orgs first + identitySetsByOrg := d.identitiesOfEligiblePeersByOrg(eligiblePeers, colAP) + + // peerEndpoints are used for dissemination debug only peerEndpoints := map[string]string{} for _, peer := range eligiblePeers { epToAdd := peer.Endpoint @@ -230,28 +239,35 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces peerEndpoints[string(peer.PKIid)] = epToAdd } - maximumPeerCount := colAP.MaximumPeerCount() - requiredPeerCount := colAP.RequiredPeerCount() + // Initialize maximumPeerRemainingCount and requiredPeerRemainingCount, + // these will be decremented until we've selected enough peers for dissemination + maximumPeerRemainingCount := colAP.MaximumPeerCount() + requiredPeerRemainingCount := colAP.RequiredPeerCount() - remainingPeers := []api.PeerIdentityInfo{} - selectedPeerEndpoints := []string{} + remainingPeersAcrossOrgs := []api.PeerIdentityInfo{} + selectedPeerEndpointsForDebug := []string{} rand.Seed(time.Now().Unix()) - // Select one representative from each org - if maximumPeerCount > 0 { - for _, selectionPeers := range identitySets { - required := 1 - if requiredPeerCount == 0 { - required = 0 + + // PHASE 1 - Select one peer from each eligible org + if maximumPeerRemainingCount > 0 { + for _, selectionPeersForOrg := range identitySetsByOrg { + + // Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination. + // TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount. + acksRequired := 1 + if requiredPeerRemainingCount == 0 { + acksRequired = 0 } - selectedPeerIndex := rand.Intn(len(selectionPeers)) - peer2SendPerOrg := selectionPeers[selectedPeerIndex] - selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)]) + + selectedPeerIndex := rand.Intn(len(selectionPeersForOrg)) + peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex] + selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)]) sc := gossipgossip.SendCriteria{ Timeout: d.pushAckTimeout, Channel: gossipCommon.ChannelID(d.chainID), MaxPeers: 1, - MinAck: required, + MinAck: acksRequired, IsEligible: func(member discovery.NetworkMember) bool { return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId) }, @@ -264,43 +280,42 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces }, }) - // Add unselected peers to remainingPeers - for i, peer := range selectionPeers { + // Add unselected peers to remainingPeersAcrossOrgs + for i, peer := range selectionPeersForOrg { if i != selectedPeerIndex { - remainingPeers = append(remainingPeers, peer) + remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, peer) } } - if requiredPeerCount > 0 { - requiredPeerCount-- + if requiredPeerRemainingCount > 0 { + requiredPeerRemainingCount-- } - maximumPeerCount-- - if maximumPeerCount == 0 { + maximumPeerRemainingCount-- + if maximumPeerRemainingCount == 0 { logger.Debug("MaximumPeerCount satisfied") - logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints) + logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) return disseminationPlan, nil } } } - // criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still - // unselected peers remaining for dissemination - numPeersToSelect := maximumPeerCount - if len(remainingPeers) < maximumPeerCount { - numPeersToSelect = len(remainingPeers) + // PHASE 2 - Select additional peers to satisfy colAP.MaximumPeerCount() if there are still peers in the remainingPeersAcrossOrgs pool + numRemainingPeersToSelect := maximumPeerRemainingCount + if len(remainingPeersAcrossOrgs) < maximumPeerRemainingCount { + numRemainingPeersToSelect = len(remainingPeersAcrossOrgs) } - if numPeersToSelect > 0 { - logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect) + if numRemainingPeersToSelect > 0 { + logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect) } - for maximumPeerCount > 0 && len(remainingPeers) > 0 { + for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 { required := 1 - if requiredPeerCount == 0 { + if requiredPeerRemainingCount == 0 { required = 0 } - selectedPeerIndex := rand.Intn(len(remainingPeers)) - peer2Send := remainingPeers[selectedPeerIndex] - selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)]) + selectedPeerIndex := rand.Intn(len(remainingPeersAcrossOrgs)) + peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex] + selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)]) sc := gossipgossip.SendCriteria{ Timeout: d.pushAckTimeout, Channel: gossipCommon.ChannelID(d.chainID), @@ -317,21 +332,22 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage), }, }) - if requiredPeerCount > 0 { - requiredPeerCount-- + if requiredPeerRemainingCount > 0 { + requiredPeerRemainingCount-- } - maximumPeerCount-- + maximumPeerRemainingCount-- // remove the selected peer from remaining peers - remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...) + remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...) } - logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints) + logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) return disseminationPlan, nil } -func (d *distributorImpl) identitiesOfEligiblePeers(eligiblePeers []discovery.NetworkMember, colAP privdata.CollectionAccessPolicy) map[string]api.PeerIdentitySet { +// identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid +func (d *distributorImpl) identitiesOfEligiblePeersByOrg(eligiblePeers []discovery.NetworkMember, colAP privdata.CollectionAccessPolicy) map[string]api.PeerIdentitySet { return d.gossipAdapter.IdentityInfo(). Filter(func(info api.PeerIdentityInfo) bool { if _, ok := colAP.MemberOrgs()[string(info.Organization)]; ok { diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 7b7ebabc51e..955cb03f8e4 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -261,7 +261,8 @@ func (g *GossipService) DistributePrivateData(channelID string, txID string, pri } if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil { - logger.Error("Failed to distributed private collection, txID", txID, "channel", channelID, "due to", err) + err := errors.WithMessagef(err, "failed to distribute private collection, txID %s, channel %s", txID, channelID) + logger.Error(err) return err } diff --git a/integration/pvtdata/pvtdata_test.go b/integration/pvtdata/pvtdata_test.go index 67ee95a334b..10ab101a5d2 100644 --- a/integration/pvtdata/pvtdata_test.go +++ b/integration/pvtdata/pvtdata_test.go @@ -85,7 +85,8 @@ var _ bool = Describe("PrivateData", func() { process, orderer = startNetwork(network) }) - It("disseminates private data per collections_config1", func() { + It("disseminates private data per collections_config1 (positive test) and collections_config8 (negative test)", func() { + By("deploying legacy chaincode and adding marble1") testChaincode := chaincode{ Chaincode: nwo.Chaincode{ @@ -108,6 +109,37 @@ var _ bool = Describe("PrivateData", func() { ) assertPvtdataPresencePerCollectionConfig1(network, testChaincode.Name, "marble1") + + By("deploying chaincode with RequiredPeerCount greater than number of peers, endorsement will fail") + testChaincodeHighRequiredPeerCount := chaincode{ + Chaincode: nwo.Chaincode{ + Name: "marblespHighRequiredPeerCount", + Version: "1.0", + Path: "github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd", + Ctor: `{"Args":["init"]}`, + Policy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member')`, + CollectionsConfig: collectionConfig("collections_config8_high_requiredPeerCount.json"), + }, + isLegacy: true, + } + deployChaincode(network, orderer, testChaincodeHighRequiredPeerCount) + + // attempt to add a marble with insufficient dissemination to meet RequiredPeerCount + marbleDetailsBase64 := base64.StdEncoding.EncodeToString([]byte(`{"name":"marble1", "color":"blue", "size":35, "owner":"tom", "price":99}`)) + + command := commands.ChaincodeInvoke{ + ChannelID: channelID, + Orderer: network.OrdererAddress(orderer, nwo.ListenPort), + Name: testChaincodeHighRequiredPeerCount.Name, + Ctor: fmt.Sprintf(`{"Args":["initMarble"]}`), + Transient: fmt.Sprintf(`{"marble":"%s"}`, marbleDetailsBase64), + PeerAddresses: []string{ + network.PeerAddress(network.Peer("Org1", "peer0"), nwo.ListenPort), + }, + WaitForEvent: true, + } + expectedErrMsg := `Error: endorsement failure during invoke. response: status:500 message:"error in simulation: failed to distribute private collection` + invokeChaincodeExpectErr(network, network.Peer("Org1", "peer0"), command, expectedErrMsg) }) When("collection config does not have maxPeerCount or requiredPeerCount", func() { @@ -771,7 +803,7 @@ var _ bool = Describe("PrivateData", func() { } peer1 := network.Peer("Org1", "peer0") expectedErrMsg := "tx creator does not have write access permission" - invokeChaincodeWithError(network, peer1, command, expectedErrMsg) + invokeChaincodeExpectErr(network, peer1, command, expectedErrMsg) assertMarbleAPIs() assertDeliverWithPrivateDataACLBehavior() @@ -803,7 +835,7 @@ var _ bool = Describe("PrivateData", func() { } peer1 := network.Peer("Org1", "peer0") expectedErrMsg := "tx creator does not have write access permission" - invokeChaincodeWithError(network, peer1, command, expectedErrMsg) + invokeChaincodeExpectErr(network, peer1, command, expectedErrMsg) assertMarbleAPIs() assertDeliverWithPrivateDataACLBehavior() @@ -1100,7 +1132,7 @@ func invokeChaincode(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeI Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful.")) } -func invokeChaincodeWithError(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeInvoke, expectedErrMsg string) { +func invokeChaincodeExpectErr(n *nwo.Network, peer *nwo.Peer, command commands.ChaincodeInvoke, expectedErrMsg string) { sess, err := n.PeerUserSession(peer, "User1", command) Expect(err).NotTo(HaveOccurred()) Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(1)) diff --git a/integration/pvtdata/testdata/collection_configs/collections_config8_high_requiredPeerCount.json b/integration/pvtdata/testdata/collection_configs/collections_config8_high_requiredPeerCount.json new file mode 100644 index 00000000000..69cdc3439e6 --- /dev/null +++ b/integration/pvtdata/testdata/collection_configs/collections_config8_high_requiredPeerCount.json @@ -0,0 +1,18 @@ +[ + { + "name": "collectionMarbles", + "policy": "OR('Org1MSP.member', 'Org2MSP.member')", + "requiredPeerCount": 10, + "maxPeerCount": 10, + "blockToLive":1000000, + "memberOnlyRead": false + }, + { + "name": "collectionMarblePrivateDetails", + "policy": "OR('Org2MSP.member', 'Org3MSP.member')", + "requiredPeerCount": 10, + "maxPeerCount": 10, + "blockToLive":1000000, + "memberOnlyRead": false + } +] diff --git a/integration/sbe/testdata/collection_config.json b/integration/sbe/testdata/collection_config.json index f2214a84f08..f6c2a8fd566 100644 --- a/integration/sbe/testdata/collection_config.json +++ b/integration/sbe/testdata/collection_config.json @@ -2,7 +2,7 @@ { "name": "col", "policy": "OR('Org1MSP.peer', 'Org2MSP.peer')", - "requiredPeerCount": 2, + "requiredPeerCount": 1, "maxPeerCount": 2, "blockToLive":1000000 }