From 9c58f135c6da7b5dfae4480c0df38279708ad197 Mon Sep 17 00:00:00 2001 From: senthil Date: Mon, 11 Sep 2017 20:23:32 +0530 Subject: [PATCH] [FAB-6074] rm endorserid from tstore APIs This CR replaces endorserId with UUID. Change-Id: I54105eb4f63658f49696a5697e9d8b157f10ede4 Signed-off-by: senthil --- core/transientstore/store.go | 70 +++++++------------- core/transientstore/store_helper.go | 40 +++++------ core/transientstore/store_test.go | 95 ++++++++++++++------------- gossip/privdata/coordinator.go | 36 ++++++++-- gossip/privdata/coordinator_test.go | 4 +- gossip/service/gossip_service_test.go | 4 +- gossip/service/integration_test.go | 5 +- gossip/state/state_test.go | 4 +- 8 files changed, 124 insertions(+), 134 deletions(-) diff --git a/core/transientstore/store.go b/core/transientstore/store.go index 87a626c0042..1988fa514e2 100644 --- a/core/transientstore/store.go +++ b/core/transientstore/store.go @@ -13,6 +13,7 @@ import ( "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/common/util" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/pvtdatastorage" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -38,13 +39,10 @@ type StoreProvider interface { // the permanent storage or the pruning of some data items is enforced by the policy type Store interface { // Persist stores the private read-write set of a transaction in the transient store - Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error + Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error // GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private // RWSets persisted from different endorsers (via Gossip) - GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error) - // GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation - // performed by the peer itself - GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) + GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*RwsetScanner, error) // Purge removes private read-writes set generated by endorsers at block height lesser than // a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets // that were generated at block height of maxBlockNumToRetain or higher. @@ -56,7 +54,6 @@ type Store interface { // EndorserPvtSimulationResults captures the deatils of the simulation results specific to an endorser type EndorserPvtSimulationResults struct { - EndorserID string EndorsementBlockHeight uint64 PvtSimulationResults *rwset.TxPvtReadWriteSet } @@ -78,7 +75,7 @@ type store struct { ledgerID string } -type rwsetScanner struct { +type RwsetScanner struct { txid string dbItr iterator.Iterator filter ledger.PvtNsCollFilter @@ -102,56 +99,38 @@ func (provider *storeProvider) Close() { } // Persist stores the private read-write set of a transaction in the transient store -func (s *store) Persist(txid string, endorserid string, - endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { +func (s *store) Persist(txid string, endorsementBlkHt uint64, + privateSimulationResults *rwset.TxPvtReadWriteSet) error { dbBatch := leveldbhelper.NewUpdateBatch() - // Create compositeKey with appropriate prefix, txid, endorserid and endorsementBlkHt - compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt) + // Create compositeKey with appropriate prefix, txid, uuid and endorsementBlkHt + // Due to the fact that the txid may have multiple private RWSets persisted from different + // endorsers (via Gossip), we postfix an uuid with the txid to avoid collision. + uuid := util.GenerateUUID() + compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults) if err != nil { return err } dbBatch.Put(compositeKey, privateSimulationResultsBytes) - // Create compositeKey with appropriate prefix, endorsementBlkHt, txid, endorserid & Store + // Create compositeKey with appropriate prefix, endorsementBlkHt, txid, uuid & Store // the compositeKey (purge index) a null byte as value. - compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, endorserid) + compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, uuid) dbBatch.Put(compositeKey, emptyValue) return s.db.WriteBatch(dbBatch, true) } // GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private -// RWSets persisted from different endorserids. Eventually, we may pass a set of collections name -// (filters) along with txid. -func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error) { +// RWSets persisted from different endorsers. +func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*RwsetScanner, error) { // Construct startKey and endKey to do an range query startKey := createTxidRangeStartKey(txid) endKey := createTxidRangeEndKey(txid) iter := s.db.GetIterator(startKey, endKey) - return &rwsetScanner{txid, iter, filter}, nil -} - -// GetSelfSimulatedTxPvtRWSetByTxid returns the private write set generated from the simulation performed -// by the peer itself. Eventually, we may pass a set of collections name (filters) along with txid. -func (s *store) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) { - var err error - var itr *rwsetScanner - if itr, err = s.GetTxPvtRWSetByTxid(txid, nil); err != nil { - return nil, err - } - defer itr.Close() - var res *EndorserPvtSimulationResults - for { - if res, err = itr.Next(); res == nil || err != nil { - return nil, err - } - if selfSimulated(res) { - return res, nil - } - } + return &RwsetScanner{txid, iter, filter}, nil } // Purge removes private read-writes set generated by endorsers at block height lesser than @@ -165,12 +144,12 @@ func (s *store) Purge(maxBlockNumToRetain uint64) error { dbBatch := leveldbhelper.NewUpdateBatch() - // Get all txid and endorserid from above result and remove it from transient store (both + // Get all txid and uuid from above result and remove it from transient store (both // read/write set and the corresponding index. for iter.Next() { dbKey := iter.Key() - txid, endorserid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey) - compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt) + txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey) + compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) dbBatch.Delete(compositeKey) dbBatch.Delete(dbKey) } @@ -203,13 +182,13 @@ func (s *store) Shutdown() { // Next moves the iterator to the next key/value pair. // It returns whether the iterator is exhausted. -func (scanner *rwsetScanner) Next() (*EndorserPvtSimulationResults, error) { +func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) { if !scanner.dbItr.Next() { return nil, nil } dbKey := scanner.dbItr.Key() dbVal := scanner.dbItr.Value() - endorserid, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey) + _, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey) txPvtRWSet := &rwset.TxPvtReadWriteSet{} if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil { @@ -218,17 +197,12 @@ func (scanner *rwsetScanner) Next() (*EndorserPvtSimulationResults, error) { filteredTxPvtRWSet := pvtdatastorage.TrimPvtWSet(txPvtRWSet, scanner.filter) return &EndorserPvtSimulationResults{ - EndorserID: endorserid, EndorsementBlockHeight: endorsementBlkHt, PvtSimulationResults: filteredTxPvtRWSet, }, nil } // Close releases resource held by the iterator -func (scanner *rwsetScanner) Close() { +func (scanner *RwsetScanner) Close() { scanner.dbItr.Release() } - -func selfSimulated(pvtSimRes *EndorserPvtSimulationResults) bool { - return pvtSimRes.EndorserID == "" -} diff --git a/core/transientstore/store_helper.go b/core/transientstore/store_helper.go index a18aeba77f1..b834515df8f 100644 --- a/core/transientstore/store_helper.go +++ b/core/transientstore/store_helper.go @@ -1,17 +1,7 @@ /* -Copyright IBM Corp. 2017 All Rights Reserved. +Copyright IBM Corp. All Rights Reserved. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +SPDX-License-Identifier: Apache-2.0 */ package transientstore @@ -31,14 +21,14 @@ var ( ) // createCompositeKeyForPvtRWSet creates a key for storing private read-write set -// in the transient store. The structure of the key is ~txid~endorserid~endorsementBlkHt. -func createCompositeKeyForPvtRWSet(txid string, endorserid string, endorsementBlkHt uint64) []byte { +// in the transient store. The structure of the key is ~txid~uuid~endorsementBlkHt. +func createCompositeKeyForPvtRWSet(txid string, uuid string, endorsementBlkHt uint64) []byte { var compositeKey []byte compositeKey = append(compositeKey, prwsetPrefix) compositeKey = append(compositeKey, compositeKeySep) compositeKey = append(compositeKey, []byte(txid)...) compositeKey = append(compositeKey, compositeKeySep) - compositeKey = append(compositeKey, []byte(endorserid)...) + compositeKey = append(compositeKey, []byte(uuid)...) compositeKey = append(compositeKey, compositeKeySep) compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...) @@ -47,8 +37,8 @@ func createCompositeKeyForPvtRWSet(txid string, endorserid string, endorsementBl // createCompositeKeyForPurgeIndex creates a key to index private read-write set based on // endorsement block height such that purge based on block height can be achieved. The structure -// of the key is ~endorsementBlkHt~txid~endorserid. -func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, endorserid string) []byte { +// of the key is ~endorsementBlkHt~txid~uuid. +func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, uuid string) []byte { var compositeKey []byte compositeKey = append(compositeKey, purgeIndexPrefix) compositeKey = append(compositeKey, compositeKeySep) @@ -56,28 +46,28 @@ func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, endor compositeKey = append(compositeKey, compositeKeySep) compositeKey = append(compositeKey, []byte(txid)...) compositeKey = append(compositeKey, compositeKeySep) - compositeKey = append(compositeKey, []byte(endorserid)...) + compositeKey = append(compositeKey, []byte(uuid)...) return compositeKey } -// splitCompositeKeyOfPvtRWSet splits the compositeKey (~txid~endorserid~endorsementBlkHt) into endorserId and endorsementBlkHt. -func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (endorserid string, endorsementBlkHt uint64) { +// splitCompositeKeyOfPvtRWSet splits the compositeKey (~txid~uuid~endorsementBlkHt) into endorserId and endorsementBlkHt. +func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (uuid string, endorsementBlkHt uint64) { compositeKey = compositeKey[2:] firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep) secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1 - endorserid = string(compositeKey[firstSepIndex+1 : secondSepIndex]) + uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex]) endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:]) - return endorserid, endorsementBlkHt + return uuid, endorsementBlkHt } -// splitCompositeKeyOfPurgeIndex splits the compositeKey (~endorsementBlkHt~txid~endorserid) into txid, endorserid and endorsementBlkHt. -func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, endorserid string, endorsementBlkHt uint64) { +// splitCompositeKeyOfPurgeIndex splits the compositeKey (~endorsementBlkHt~txid~uuid) into txid, uuid and endorsementBlkHt. +func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, uuid string, endorsementBlkHt uint64) { var n int endorsementBlkHt, n = util.DecodeOrderPreservingVarUint64(compositeKey[2:]) splits := bytes.Split(compositeKey[n+3:], []byte{compositeKeySep}) txid = string(splits[0]) - endorserid = string(splits[1]) + uuid = string(splits[1]) return } diff --git a/core/transientstore/store_test.go b/core/transientstore/store_test.go index a785650a527..52f84e8ccd1 100644 --- a/core/transientstore/store_test.go +++ b/core/transientstore/store_test.go @@ -9,11 +9,14 @@ package transientstore import ( "fmt" "os" + "sort" "testing" "github.com/davecgh/go-spew/spew" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/spf13/viper" @@ -29,17 +32,17 @@ func TestPurgeIndexKeyCodingEncoding(t *testing.T) { assert := assert.New(t) blkHts := []uint64{0, 10, 20000} txids := []string{"txid", ""} - endorserids := []string{"endorserid", ""} + uuids := []string{"uuid", ""} for _, blkHt := range blkHts { for _, txid := range txids { - for _, endorserid := range endorserids { - testCase := fmt.Sprintf("blkHt=%d,txid=%s,endorserid=%s", blkHt, txid, endorserid) + for _, uuid := range uuids { + testCase := fmt.Sprintf("blkHt=%d,txid=%s,uuid=%s", blkHt, txid, uuid) t.Run(testCase, func(t *testing.T) { t.Logf("Running test case [%s]", testCase) - purgeIndexKey := createCompositeKeyForPurgeIndex(blkHt, txid, endorserid) - txid1, endorserid1, blkHt1 := splitCompositeKeyOfPurgeIndex(purgeIndexKey) + purgeIndexKey := createCompositeKeyForPurgeIndex(blkHt, txid, uuid) + txid1, uuid1, blkHt1 := splitCompositeKeyOfPurgeIndex(purgeIndexKey) assert.Equal(txid, txid1) - assert.Equal(endorserid, endorserid1) + assert.Equal(uuid, uuid1) assert.Equal(blkHt, blkHt1) }) } @@ -51,16 +54,16 @@ func TestRWSetKeyCodingEncoding(t *testing.T) { assert := assert.New(t) blkHts := []uint64{0, 10, 20000} txids := []string{"txid", ""} - endorserids := []string{"endorserid", ""} + uuids := []string{"uuid", ""} for _, blkHt := range blkHts { for _, txid := range txids { - for _, endorserid := range endorserids { - testCase := fmt.Sprintf("blkHt=%d,txid=%s,endorserid=%s", blkHt, txid, endorserid) + for _, uuid := range uuids { + testCase := fmt.Sprintf("blkHt=%d,txid=%s,uuid=%s", blkHt, txid, uuid) t.Run(testCase, func(t *testing.T) { t.Logf("Running test case [%s]", testCase) - rwsetKey := createCompositeKeyForPvtRWSet(txid, endorserid, blkHt) - endorserid1, blkHt1 := splitCompositeKeyOfPvtRWSet(rwsetKey) - assert.Equal(endorserid, endorserid1) + rwsetKey := createCompositeKeyForPvtRWSet(txid, uuid, blkHt) + uuid1, blkHt1 := splitCompositeKeyOfPvtRWSet(rwsetKey) + assert.Equal(uuid, uuid1) assert.Equal(blkHt, blkHt1) }) } @@ -79,7 +82,6 @@ func TestTransientStorePersistAndRetrieve(t *testing.T) { // Results produced by endorser 1 endorser0SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser0", EndorsementBlockHeight: 10, PvtSimulationResults: samplePvtRWSet, } @@ -87,7 +89,6 @@ func TestTransientStorePersistAndRetrieve(t *testing.T) { // Results produced by endorser 2 endorser1SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser1", EndorsementBlockHeight: 10, PvtSimulationResults: samplePvtRWSet, } @@ -96,8 +97,8 @@ func TestTransientStorePersistAndRetrieve(t *testing.T) { // Persist simulation results into store var err error for i := 0; i < len(endorsersResults); i++ { - err = env.TestStore.Persist(txid, endorsersResults[i].EndorserID, - endorsersResults[i].EndorsementBlockHeight, endorsersResults[i].PvtSimulationResults) + err = env.TestStore.Persist(txid, endorsersResults[i].EndorsementBlockHeight, + endorsersResults[i].PvtSimulationResults) assert.NoError(err) } @@ -115,28 +116,9 @@ func TestTransientStorePersistAndRetrieve(t *testing.T) { actualEndorsersResults = append(actualEndorsersResults, result) } iter.Close() + sortResults(endorsersResults) + sortResults(actualEndorsersResults) assert.Equal(endorsersResults, actualEndorsersResults) - - // No selfSimulated results for the txid. Result and err should be nil - endorserResults, err := env.TestStore.GetSelfSimulatedTxPvtRWSetByTxid(txid) - assert.NoError(err) - assert.Nil(endorserResults) - - // Self simulated results - selfSimulatedResults := &EndorserPvtSimulationResults{ - EndorserID: "", - EndorsementBlockHeight: 10, - PvtSimulationResults: samplePvtRWSet, - } - - // Persist self simulated results - err = env.TestStore.Persist(txid, selfSimulatedResults.EndorserID, - selfSimulatedResults.EndorsementBlockHeight, selfSimulatedResults.PvtSimulationResults) - assert.NoError(err) - - // Retrieve self simulated results - endorserResults, err = env.TestStore.GetSelfSimulatedTxPvtRWSetByTxid(txid) - assert.Equal(selfSimulatedResults, endorserResults) } func TestStorePurge(t *testing.T) { @@ -151,7 +133,6 @@ func TestStorePurge(t *testing.T) { // Results produced by endorser 1 endorser0SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser0", EndorsementBlockHeight: 10, PvtSimulationResults: samplePvtRWSet, } @@ -159,7 +140,6 @@ func TestStorePurge(t *testing.T) { // Results produced by endorser 2 endorser1SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser1", EndorsementBlockHeight: 11, PvtSimulationResults: samplePvtRWSet, } @@ -167,7 +147,6 @@ func TestStorePurge(t *testing.T) { // Results produced by endorser 3 endorser2SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser2", EndorsementBlockHeight: 12, PvtSimulationResults: samplePvtRWSet, } @@ -175,7 +154,6 @@ func TestStorePurge(t *testing.T) { // Results produced by endorser 3 endorser3SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser3", EndorsementBlockHeight: 12, PvtSimulationResults: samplePvtRWSet, } @@ -183,7 +161,6 @@ func TestStorePurge(t *testing.T) { // Results produced by endorser 3 endorser4SimulationResults := &EndorserPvtSimulationResults{ - EndorserID: "endorser4", EndorsementBlockHeight: 13, PvtSimulationResults: samplePvtRWSet, } @@ -192,8 +169,8 @@ func TestStorePurge(t *testing.T) { // Persist simulation results into store var err error for i := 0; i < 5; i++ { - err = env.TestStore.Persist(txid, endorsersResults[i].EndorserID, - endorsersResults[i].EndorsementBlockHeight, endorsersResults[i].PvtSimulationResults) + err = env.TestStore.Persist(txid, endorsersResults[i].EndorsementBlockHeight, + endorsersResults[i].PvtSimulationResults) assert.NoError(err) } @@ -223,6 +200,12 @@ func TestStorePurge(t *testing.T) { actualEndorsersResults = append(actualEndorsersResults, result) } iter.Close() + + // Note that the ordering of actualRes and expectedRes is dependent on the uuid. Hence, we are sorting + // expectedRes and actualRes. + sortResults(expectedEndorsersResults) + sortResults(actualEndorsersResults) + assert.Equal(expectedEndorsersResults, actualEndorsersResults) // Get the minimum retained endorsement block height @@ -258,7 +241,7 @@ func TestTransientStoreRetrievalWithFilter(t *testing.T) { testTxid := "testTxid" numEntries := 5 for i := 0; i < numEntries; i++ { - store.Persist(testTxid, "", uint64(i), samplePvtSimRes) + store.Persist(testTxid, uint64(i), samplePvtSimRes) } filter := ledger.NewPvtNsCollFilter() @@ -285,12 +268,32 @@ func TestTransientStoreRetrievalWithFilter(t *testing.T) { var expectedRes []*EndorserPvtSimulationResults for i := 0; i < numEntries; i++ { - expectedRes = append(expectedRes, &EndorserPvtSimulationResults{"", uint64(i), expectedSimulationRes}) + expectedRes = append(expectedRes, &EndorserPvtSimulationResults{uint64(i), expectedSimulationRes}) } + + // Note that the ordering of actualRes and expectedRes is dependent on the uuid. Hence, we are sorting + // expectedRes and actualRes. + sortResults(expectedRes) + sortResults(actualRes) assert.Equal(t, expectedRes, actualRes) t.Logf("Actual Res = %s", spew.Sdump(actualRes)) } +func sortResults(res []*EndorserPvtSimulationResults) { + // Results are sorted by ascending order of endorsement block height. When the endorsement block + // heights are same, we sort by comparing the hash of private write set. + var sortCondition = func(i, j int) bool { + if res[i].EndorsementBlockHeight == res[j].EndorsementBlockHeight { + res_i, _ := proto.Marshal(res[i].PvtSimulationResults) + res_j, _ := proto.Marshal(res[j].PvtSimulationResults) + // if hashes are same, any order would work. + return string(util.ComputeHash(res_i)) < string(util.ComputeHash(res_j)) + } + return res[i].EndorsementBlockHeight < res[j].EndorsementBlockHeight + } + sort.SliceStable(res, sortCondition) +} + func samplePvtData(t *testing.T) *rwset.TxPvtReadWriteSet { pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV} pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{ diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go index 05e26f944da..72e1ba02e51 100644 --- a/gossip/privdata/coordinator.go +++ b/gossip/privdata/coordinator.go @@ -30,11 +30,10 @@ func init() { // TransientStore holds private data that the corresponding blocks haven't been committed yet into the ledger type TransientStore interface { // Persist stores the private read-write set of a transaction in the transient store - Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error - - // GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation - // performed by the peer itself - GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) + Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error + // GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private + // RWSets persisted from different endorsers (via Gossip) + GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*transientstore.RwsetScanner, error) } // PrivateDataDistributor distributes private data to peers @@ -83,7 +82,7 @@ func NewCoordinator(committer committer.Committer, store TransientStore) Coordin // to peers according policies that are derived from the given PolicyStore and PolicyParser func (c *coordinator) Distribute(privateData *rwset.TxPvtReadWriteSet, txID string, ps privdata.PolicyStore, pp privdata.PolicyParser) error { // TODO: also need to distribute the data... - return c.TransientStore.Persist(txID, "", 0, privateData) + return c.TransientStore.Persist(txID, 0, privateData) } // StoreBlock stores block with private data into the ledger @@ -155,10 +154,33 @@ func (c *coordinator) retrievePrivateData(block *common.Block) (map[uint64]*ledg if err != nil { return nil, err } - pvtEndorsement, err := c.TransientStore.GetSelfSimulatedTxPvtRWSetByTxid(chdr.TxId) + // TODO: For now, we assume that all peers have access to all collections. + // Once the peer to collection mapping is available via RSCC, + // we need to build the filter based on the ns/collections that this peer + // has access to and pass the filter to GetTxPvtRWSetByTxid instead of nil. + iter, err := c.TransientStore.GetTxPvtRWSetByTxid(chdr.TxId, nil) if err != nil { return nil, err } + + var pvtEndorsement *transientstore.EndorserPvtSimulationResults + + for { + pvtEndorsement, err = iter.Next() + if err != nil { + return nil, err + } + if pvtEndorsement == nil { + break + } + // TODO: When we introduce collection filters, we need to compare hashes + // and collect the correct private write set. For now, we assume that + // all entries are correct and each entry contains required collections. + // Hence, with the first entry, we can break this loop. + break + } + iter.Close() + if pvtEndorsement == nil { continue } diff --git a/gossip/privdata/coordinator_test.go b/gossip/privdata/coordinator_test.go index 8fbbcf6cbc8..a2885a22c6f 100644 --- a/gossip/privdata/coordinator_test.go +++ b/gossip/privdata/coordinator_test.go @@ -22,11 +22,11 @@ import ( type mockTransientStore struct { } -func (*mockTransientStore) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { +func (*mockTransientStore) Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { panic("implement me") } -func (*mockTransientStore) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { +func (*mockTransientStore) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*transientstore.RwsetScanner, error) { panic("implement me") } diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 17bdce5938d..5c04eb683bc 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -47,11 +47,11 @@ func init() { type mockTransientStore struct { } -func (*mockTransientStore) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { +func (*mockTransientStore) Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { panic("implement me") } -func (*mockTransientStore) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { +func (*mockTransientStore) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*transientstore.RwsetScanner, error) { panic("implement me") } diff --git a/gossip/service/integration_test.go b/gossip/service/integration_test.go index 92d1a1b690f..3cf76c88cbc 100644 --- a/gossip/service/integration_test.go +++ b/gossip/service/integration_test.go @@ -16,6 +16,7 @@ import ( "github.com/hyperledger/fabric/core/deliverservice" "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" + "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/transientstore" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/election" @@ -27,11 +28,11 @@ import ( type transientStoreMock struct { } -func (*transientStoreMock) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { +func (*transientStoreMock) Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { panic("implement me") } -func (transientStoreMock) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { +func (transientStoreMock) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*transientstore.RwsetScanner, error) { panic("implement me") } diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 7861f60f09e..3d40f256181 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -168,11 +168,11 @@ func (node *peerNode) shutdown() { type mockTransientStore struct { } -func (*mockTransientStore) Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { +func (*mockTransientStore) Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error { panic("implement me") } -func (mockTransientStore) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*transientstore.EndorserPvtSimulationResults, error) { +func (mockTransientStore) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*transientstore.RwsetScanner, error) { panic("implement me") }