diff --git a/core/transientstore/store.go b/core/transientstore/store.go index 69e17011c5c..16a4535290e 100644 --- a/core/transientstore/store.go +++ b/core/transientstore/store.go @@ -316,6 +316,8 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) { return nil, err } + // trim the tx rwset based on the current collection filter, + // nil will be returned to filteredTxPvtRWSet if the transient store txid entry does not contain the data for the collection filteredTxPvtRWSet = trimPvtWSet(txPvtRWSetWithConfig.GetPvtRwset(), scanner.filter) configs, err := trimPvtCollectionConfigs(txPvtRWSetWithConfig.CollectionConfigs, scanner.filter) if err != nil { diff --git a/gossip/privdata/pvtdataprovider.go b/gossip/privdata/pvtdataprovider.go index 36d1c780717..06191bb697f 100644 --- a/gossip/privdata/pvtdataprovider.go +++ b/gossip/privdata/pvtdataprovider.go @@ -145,10 +145,10 @@ func (ec *eligibilityComputer) computeEligibility(mspID string, pvtdataToRetriev } return &pvtdataRetrievalInfo{ - sources: sources, - txns: txList, - eligibleMissingKeys: eligibleMissingKeys, - ineligibleMissingKeys: ineligibleMissingKeys, + sources: sources, + txns: txList, + remainingEligibleMissingKeys: eligibleMissingKeys, + ineligibleMissingKeys: ineligibleMissingKeys, }, nil } @@ -202,32 +202,48 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat pvtdata := make(rwsetByKeys) + //If there is no private data to retrieve for the block, skip all population attempts and return + if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 { + pdp.logger.Debugf("No eligible collection private write sets to fetch for block [%d]", pdp.blockNum) + retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo + retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo) + return retrievedPvtdata, nil + } + + fetchStats := &fetchStats{} + + totalEligibleMissingKeysToRetrieve := len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) + // POPULATE FROM CACHE pdp.populateFromCache(pvtdata, pvtdataRetrievalInfo, pvtdataToRetrieve) - if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 { - pdp.logger.Debug("No missing collection private write sets to fetch from transient store") + fetchStats.fromLocalCache = totalEligibleMissingKeysToRetrieve - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) + + if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 { + pdp.logger.Infof("Successfully fetched all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats) retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo) return retrievedPvtdata, nil } // POPULATE FROM TRANSIENT STORE - pdp.logger.Debugf("Could not find all collection private write sets in cache for block [%d]", pdp.blockNum) - pdp.logger.Debugf("Fetching %d collection private write sets from transient store", len(pvtdataRetrievalInfo.eligibleMissingKeys)) + numRemainingToFetch := len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) pdp.populateFromTransientStore(pvtdata, pvtdataRetrievalInfo) - if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 { - pdp.logger.Debug("No missing collection private write sets to fetch from remote peers") + fetchStats.fromTransientStore = numRemainingToFetch - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) + + if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 { + pdp.logger.Infof("Successfully fetched all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats) retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo) return retrievedPvtdata, nil } // POPULATE FROM REMOTE PEERS + numRemainingToFetch = len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) retryThresh := pdp.pullRetryThreshold pdp.logger.Debugf("Could not find all collection private write sets in local peer transient store for block [%d]", pdp.blockNum) - pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.eligibleMissingKeys), retryThresh) + pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys), retryThresh) startPull := time.Now() - for len(pvtdataRetrievalInfo.eligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh { + for len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh { if needToRetry := pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo); !needToRetry { break } @@ -237,11 +253,14 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat elapsedPull := int64(time.Since(startPull) / time.Millisecond) // duration in ms pdp.fetchDurationHistogram.Observe(time.Since(startPull).Seconds()) - if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 { + fetchStats.fromRemotePeer = numRemainingToFetch - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) + + if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 { pdp.logger.Debugf("Fetched all missing collection private write sets from remote peers for block [%d] (%dms)", pdp.blockNum, elapsedPull) + pdp.logger.Infof("Successfully fetched all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats) } else { - pdp.logger.Debugf("Could not fetch all missing collection private write sets from remote peers for block [%d]", - pdp.blockNum) + pdp.logger.Warningf("Could not fetch all %d eligible collection private write sets for block [%d] %s. Will commit block with missing private write sets:[%v]", + totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats, pvtdataRetrievalInfo.remainingEligibleMissingKeys) } retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo @@ -252,7 +271,7 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat // populateFromCache populates pvtdata with data fetched from cache and updates // pvtdataRetrievalInfo by removing missing data that was fetched from cache func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo, pvtdataToRetrieve []*ledger.TxPvtdataInfo) { - pdp.logger.Debugf("Attempting to retrieve %d private write sets from cache.", len(pvtdataRetrievalInfo.eligibleMissingKeys)) + pdp.logger.Debugf("Attempting to retrieve %d private write sets from cache.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)) for _, txPvtdata := range pdp.prefetchedPvtdata { txID := getTxIDBySeqInBlock(txPvtdata.SeqInBlock, pvtdataToRetrieve) @@ -271,14 +290,14 @@ func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrie hash: hex.EncodeToString(commonutil.ComputeSHA256(col.Rwset)), } // skip if key not originally missing - if _, missing := pvtdataRetrievalInfo.eligibleMissingKeys[key]; !missing { + if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing { pdp.logger.Warningf("Found extra data in prefetched:[%v]. Skipping.", key) continue } // populate the pvtdata with the RW set from the cache pvtdata[key] = col.Rwset // remove key from missing - delete(pvtdataRetrievalInfo.eligibleMissingKeys, key) + delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key) } // iterate over collections in the namespace } // iterate over the namespaces in the WSet } // iterate over cached private data in the block @@ -287,10 +306,10 @@ func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrie // populateFromTransientStore populates pvtdata with data fetched from transient store // and updates pvtdataRetrievalInfo by removing missing data that was fetched from transient store func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) { - pdp.logger.Debugf("Attempting to retrieve %d private write sets from transient store.", len(pvtdataRetrievalInfo.eligibleMissingKeys)) + pdp.logger.Debugf("Attempting to retrieve %d private write sets from transient store.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)) // Put into pvtdata RW sets that are missing and found in the transient store - for k := range pvtdataRetrievalInfo.eligibleMissingKeys { + for k := range pvtdataRetrievalInfo.remainingEligibleMissingKeys { filter := ledger.NewPvtNsCollFilter() filter.Add(k.namespace, k.collection) iterator, err := pdp.transientStore.GetTxPvtRWSetByTxid(k.txID, filter) @@ -314,8 +333,9 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd continue } simRes := res.PvtSimulationResultsWithConfig + // simRes.PvtRwset will be nil if the transient store contains an entry for the txid but the entry does not contain the data for the collection if simRes.PvtRwset == nil { - pdp.logger.Warningf("The PvtRwset of PvtSimulationResultsWithConfig for txID [%s] is nil. Skipping.", k.txID) + pdp.logger.Debugf("The PvtRwset of PvtSimulationResultsWithConfig for txID [%s] is nil. Skipping.", k.txID) continue } for _, ns := range simRes.PvtRwset.NsPvtRwset { @@ -328,13 +348,14 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd hash: hex.EncodeToString(commonutil.ComputeSHA256(col.Rwset)), } // skip if not missing - if _, missing := pvtdataRetrievalInfo.eligibleMissingKeys[key]; !missing { + if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing { continue } // populate the pvtdata with the RW set from the transient store + pdp.logger.Debugf("Found private data for key %v in transient store", key) pvtdata[key] = col.Rwset // remove key from missing - delete(pvtdataRetrievalInfo.eligibleMissingKeys, key) + delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key) } // iterating over all collections } // iterating over all namespaces } // iterating over the TxPvtRWSet results @@ -344,11 +365,11 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd // populateFromRemotePeers populates pvtdata with data fetched from remote peers and updates // pvtdataRetrievalInfo by removing missing data that was fetched from remote peers func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) bool { - pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.eligibleMissingKeys)) + pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)) dig2src := make(map[pvtdatacommon.DigKey][]*peer.Endorsement) var skipped int - for k, v := range pvtdataRetrievalInfo.eligibleMissingKeys { + for k, v := range pvtdataRetrievalInfo.remainingEligibleMissingKeys { if v.invalid && pdp.skipPullingInvalidTransactions { pdp.logger.Debugf("Skipping invalid key [%v] because peer is configured to skip pulling rwsets of invalid transactions.", k) skipped++ @@ -387,7 +408,7 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata hash: hex.EncodeToString(commonutil.ComputeSHA256(rws)), } // skip if not missing - if _, missing := pvtdataRetrievalInfo.eligibleMissingKeys[key]; !missing { + if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing { // key isn't missing and was never fetched earlier, log that it wasn't originally requested if _, exists := pvtdata[key]; !exists { pdp.logger.Debugf("Ignoring [%v] because it was never requested.", key) @@ -397,26 +418,26 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata // populate the pvtdata with the RW set from the remote peer pvtdata[key] = rws // remove key from missing - delete(pvtdataRetrievalInfo.eligibleMissingKeys, key) + delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key) pdp.logger.Debugf("Fetched [%v]", key) } } // Iterate over purged data for _, dig := range fetchedData.PurgedElements { // delete purged key from missing keys - for missingPvtRWKey := range pvtdataRetrievalInfo.eligibleMissingKeys { + for missingPvtRWKey := range pvtdataRetrievalInfo.remainingEligibleMissingKeys { if missingPvtRWKey.namespace == dig.Namespace && missingPvtRWKey.collection == dig.Collection && missingPvtRWKey.seqInBlock == dig.SeqInBlock && missingPvtRWKey.txID == dig.TxId { - delete(pvtdataRetrievalInfo.eligibleMissingKeys, missingPvtRWKey) + delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, missingPvtRWKey) pdp.logger.Warningf("Missing key because was purged or will soon be purged, "+ "continue block commit without [%+v] in private rwset", missingPvtRWKey) } } } - return len(pvtdataRetrievalInfo.eligibleMissingKeys) > skipped + return len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) > skipped } // prepareBlockPvtdata consolidates the fetched private data as well as ineligible and eligible @@ -427,13 +448,6 @@ func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetr MissingPvtData: make(ledger.TxMissingPvtDataMap), } - if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 { - pdp.logger.Infof("Successfully fetched all eligible collection private write sets for block [%d]", pdp.blockNum) - } else { - pdp.logger.Warningf("Could not fetch all missing eligible collection private write sets for block [%d]. Will commit block with missing private write sets:[%v]", - pdp.blockNum, pvtdataRetrievalInfo.eligibleMissingKeys) - } - for seqInBlock, nsRWS := range pvtdata.bySeqsInBlock() { // add all found pvtdata to blockPvtDataPvtdata for seqInBlock blockPvtdata.PvtData[seqInBlock] = &ledger.TxPvtData{ @@ -442,7 +456,7 @@ func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetr } } - for key := range pvtdataRetrievalInfo.eligibleMissingKeys { + for key := range pvtdataRetrievalInfo.remainingEligibleMissingKeys { blockPvtdata.MissingPvtData.Add(key.seqInBlock, key.namespace, key.collection, true) } @@ -454,10 +468,10 @@ func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetr } type pvtdataRetrievalInfo struct { - sources map[rwSetKey][]*peer.Endorsement - txns []string - eligibleMissingKeys rwsetKeys - ineligibleMissingKeys rwsetKeys + sources map[rwSetKey][]*peer.Endorsement + txns []string + remainingEligibleMissingKeys rwsetKeys + ineligibleMissingKeys rwsetKeys } // rwset types @@ -564,3 +578,11 @@ func endorsersFromEligibleOrgs(ns string, col string, endorsers []*peer.Endorsem } return res } + +type fetchStats struct { + fromLocalCache, fromTransientStore, fromRemotePeer int +} + +func (stats fetchStats) String() string { + return fmt.Sprintf("(%d from local cache, %d from transient store, %d from other peers)", stats.fromLocalCache, stats.fromTransientStore, stats.fromRemotePeer) +} diff --git a/gossip/privdata/pvtdataprovider_test.go b/gossip/privdata/pvtdataprovider_test.go index 6f50477cb56..a6e21a5a1c8 100644 --- a/gossip/privdata/pvtdataprovider_test.go +++ b/gossip/privdata/pvtdataprovider_test.go @@ -1237,6 +1237,15 @@ func TestRetrievedPvtdataPurgeBelowHeight(t *testing.T) { } } +func TestFetchStats(t *testing.T) { + fetchStats := fetchStats{ + fromLocalCache: 1, + fromTransientStore: 2, + fromRemotePeer: 3, + } + assert.Equal(t, "(1 from local cache, 2 from transient store, 3 from other peers)", fetchStats.String()) +} + func testRetrievePvtdataSuccess(t *testing.T, scenario string, ts testSupport,