diff --git a/core/transientstore/store.go b/core/transientstore/store.go index a16e5643db5..d2b559d25f0 100644 --- a/core/transientstore/store.go +++ b/core/transientstore/store.go @@ -53,10 +53,16 @@ type Store interface { // GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private // RWSets persisted from different endorsers (via Gossip) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error) - // Purge removes private read-writes set generated by endorsers at block height lesser than + // PurgeByTxids removes private read-write set of a given set of transactions from the + // transient store + PurgeByTxids(txids []string) error + // PurgeByHeight removes private read-writes set generated by endorsers at block height lesser than // a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets - // that were generated at block height of maxBlockNumToRetain or higher. - Purge(maxBlockNumToRetain uint64) error + // that were generated at block height of maxBlockNumToRetain or higher. Though the private + // read-write sets stored in transient store is removed by coordinator using PurgebyTxids() + // after successful block commit, PurgeByHeight() is still required to remove orphan entries (as + // transaction that gets endorsed may not be submitted by the client for commit) + PurgeByHeight(maxBlockNumToRetain uint64) error // GetMinEndorsementBlkHt returns the lowest retained endorsement block height GetMinEndorsementBlkHt() (uint64, error) Shutdown() @@ -117,17 +123,35 @@ func (s *store) Persist(txid string, endorsementBlkHt uint64, // Due to the fact that the txid may have multiple private RWSets persisted from different // endorsers (via Gossip), we postfix an uuid with the txid to avoid collision. uuid := util.GenerateUUID() - compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) + compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults) if err != nil { return err } - dbBatch.Put(compositeKey, privateSimulationResultsBytes) - - // Create compositeKey with appropriate prefix, endorsementBlkHt, txid, uuid & Store - // the compositeKey (purge index) a null byte as value. - compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, uuid) - dbBatch.Put(compositeKey, emptyValue) + dbBatch.Put(compositeKeyPvtRWSet, privateSimulationResultsBytes) + + // Create two index: (i) by txid, and (ii) by height + + // Create compositeKey for purge index by height with appropriate prefix, endorsementBlkHt, + // txid, uuid and store the compositeKey (purge index) with a null byte as value. Note that + // the purge index is used to remove orphan entries in the transient store (which are not removed + // by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction + // that gets endorsed but not submitted by the client for commit) + compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt, txid, uuid) + dbBatch.Put(compositeKeyPurgeIndexByHeight, emptyValue) + + // Create compositeKey for purge index by txid with appropriate prefix, txid, uuid, + // endorsementBlkHt and store the compositeKey (purge index) with a null byte as value. + // Though compositeKeyPvtRWSet itself can be used to purge private write set by txid, + // we create a separate composite key with a null byte as value. The reason is that + // if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write + // set associated with the key from db. Note that this purge index is used to remove non-orphan + // entries in the transient store and is used by PurgeTxids() + // Note: We can create compositeKeyPurgeIndexByTxid by just replacing the prefix of compositeKeyPvtRWSet + // with purgeIndexByTxidPrefix. For code readability and to be expressive, we use a + // createCompositeKeyForPurgeIndexByTxid() instead. + compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, endorsementBlkHt) + dbBatch.Put(compositeKeyPurgeIndexByTxid, emptyValue) return s.db.WriteBatch(dbBatch, true) } @@ -143,13 +167,56 @@ func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) return &RwsetScanner{txid, iter, filter}, nil } -// Purge removes private read-writes set generated by endorsers at block height lesser than -// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets -// that were generated at block height of maxBlockNumToRetain or higher. -func (s *store) Purge(maxBlockNumToRetain uint64) error { +// PurgeByTxids removes private read-write set of a given set of transactions from the +// transient store. PurgeByTxids() is expected to be called by coordinator after +// committing a block to ledger. +func (s *store) PurgeByTxids(txids []string) error { + dbBatch := leveldbhelper.NewUpdateBatch() + + for _, txid := range txids { + // Construct startKey and endKey to do an range query + startKey := createPurgeIndexByTxidRangeStartKey(txid) + endKey := createPurgeIndexByTxidRangeEndKey(txid) + + iter := s.db.GetIterator(startKey, endKey) + + // Get all txid and uuid from above result and remove it from transient store (both + // read/write set and the corresponding indexes. + for iter.Next() { + // For each entry, remove the private read-write set and correponding indexes + + // Remove private read-write set + compositeKeyPurgeIndexByTxid := iter.Key() + // Note: We can create compositeKeyPvtRWSet by just replacing the prefix of compositeKeyPurgeIndexByTxid + // with prwsetPrefix. For code readability and to be expressive, we split and create again. + uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid) + compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) + dbBatch.Delete(compositeKeyPvtRWSet) + + // Remove purge index -- purgeIndexByHeight + compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt, txid, uuid) + dbBatch.Delete(compositeKeyPurgeIndexByHeight) + + // Remove purge index -- purgeIndexByTxid + dbBatch.Delete(compositeKeyPurgeIndexByTxid) + } + iter.Release() + } + // If peer fails before/while writing the batch to golevelDB, these entries will be + // removed as per BTL policy later by PurgeByHeight() + return s.db.WriteBatch(dbBatch, true) +} + +// PurgeByHeight removes private read-writes set generated by endorsers at block height lesser than +// a given maxBlockNumToRetain. In other words, PurgeByHeight only retains private read-write sets +// that were generated at block height of maxBlockNumToRetain or higher. Though the private +// read-write sets stored in transient store is removed by coordinator using PurgebyTxids() +// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as +// transaction that gets endorsed may not be submitted by the client for commit) +func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error { // Do a range query with 0 as startKey and maxBlockNumToRetain-1 as endKey - startKey := createEndorsementBlkHtRangeStartKey(0) - endKey := createEndorsementBlkHtRangeEndKey(maxBlockNumToRetain - 1) + startKey := createPurgeIndexByHeightRangeStartKey(0) + endKey := createPurgeIndexByHeightRangeEndKey(maxBlockNumToRetain - 1) iter := s.db.GetIterator(startKey, endKey) dbBatch := leveldbhelper.NewUpdateBatch() @@ -157,12 +224,23 @@ func (s *store) Purge(maxBlockNumToRetain uint64) error { // Get all txid and uuid from above result and remove it from transient store (both // read/write set and the corresponding index. for iter.Next() { - dbKey := iter.Key() - txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey) - compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) - dbBatch.Delete(compositeKey) - dbBatch.Delete(dbKey) + // For each entry, remove the private read-write set and correponding indexes + + // Remove private read-write set + compositeKeyPurgeIndexByHeight := iter.Key() + txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight) + compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt) + dbBatch.Delete(compositeKeyPvtRWSet) + + // Remove purge index -- purgeIndexByTxid + compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, endorsementBlkHt) + dbBatch.Delete(compositeKeyPurgeIndexByTxid) + + // Remove purge index -- purgeIndexByHeight + dbBatch.Delete(compositeKeyPurgeIndexByHeight) } + iter.Release() + return s.db.WriteBatch(dbBatch, true) } @@ -172,14 +250,15 @@ func (s *store) GetMinEndorsementBlkHt() (uint64, error) { // as 0 (i.e., endorsementBlkHt) and returns the first key which denotes // the lowest retained endorsement block height. An alternative approach // is to explicitly store the minEndorsementBlkHt in the transientStore. - startKey := createEndorsementBlkHtRangeStartKey(0) + startKey := createPurgeIndexByHeightRangeStartKey(0) iter := s.db.GetIterator(startKey, nil) // Fetch the minimum endorsement block height if iter.Next() { dbKey := iter.Key() - _, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey) + _, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByHeight(dbKey) return endorsementBlkHt, nil } + iter.Release() // Returning an error may not be the right thing to do here. May be // return a bool. -1 is not possible due to unsigned int as first // return value diff --git a/core/transientstore/store_helper.go b/core/transientstore/store_helper.go index b834515df8f..5e1bee89687 100644 --- a/core/transientstore/store_helper.go +++ b/core/transientstore/store_helper.go @@ -15,9 +15,10 @@ import ( ) var ( - prwsetPrefix = []byte("P")[0] // key prefix for storing private read-write set in transient store. - purgeIndexPrefix = []byte("I")[0] // key prefix for storing index on private read-write set using endorsement block height. - compositeKeySep = byte(0x00) + prwsetPrefix = []byte("P")[0] // key prefix for storing private read-write set in transient store. + purgeIndexByHeightPrefix = []byte("H")[0] // key prefix for storing index on private read-write set using endorsement block height. + purgeIndexByTxidPrefix = []byte("T")[0] // key prefix for storing index on private read-write set using txid + compositeKeySep = byte(0x00) ) // createCompositeKeyForPvtRWSet creates a key for storing private read-write set @@ -26,6 +27,26 @@ func createCompositeKeyForPvtRWSet(txid string, uuid string, endorsementBlkHt ui var compositeKey []byte compositeKey = append(compositeKey, prwsetPrefix) compositeKey = append(compositeKey, compositeKeySep) + compositeKey = append(compositeKey, createCompositeKeyWithoutPrefixForTxid(txid, uuid, endorsementBlkHt)...) + + return compositeKey +} + +// createCompositeKeyForPurgeIndexByTxid creates a key to index private read-write set based on +// txid such that purge based on txid can be achieved. The structure +// of the key is ~txid~uuid~endorsementBlkHt. +func createCompositeKeyForPurgeIndexByTxid(txid string, uuid string, endorsementBlkHt uint64) []byte { + var compositeKey []byte + compositeKey = append(compositeKey, purgeIndexByTxidPrefix) + compositeKey = append(compositeKey, compositeKeySep) + compositeKey = append(compositeKey, createCompositeKeyWithoutPrefixForTxid(txid, uuid, endorsementBlkHt)...) + + return compositeKey +} + +// createCompositeKeyWithoutPrefixForTxid creates a composite key of structure txid~uuid~endorsementBlkHt. +func createCompositeKeyWithoutPrefixForTxid(txid string, uuid string, endorsementBlkHt uint64) []byte { + var compositeKey []byte compositeKey = append(compositeKey, []byte(txid)...) compositeKey = append(compositeKey, compositeKeySep) compositeKey = append(compositeKey, []byte(uuid)...) @@ -35,12 +56,12 @@ func createCompositeKeyForPvtRWSet(txid string, uuid string, endorsementBlkHt ui return compositeKey } -// createCompositeKeyForPurgeIndex creates a key to index private read-write set based on +// createCompositeKeyForPurgeIndexByHeight creates a key to index private read-write set based on // endorsement block height such that purge based on block height can be achieved. The structure -// of the key is ~endorsementBlkHt~txid~uuid. -func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, uuid string) []byte { +// of the key is ~endorsementBlkHt~txid~uuid. +func createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt uint64, txid string, uuid string) []byte { var compositeKey []byte - compositeKey = append(compositeKey, purgeIndexPrefix) + compositeKey = append(compositeKey, purgeIndexByHeightPrefix) compositeKey = append(compositeKey, compositeKeySep) compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...) compositeKey = append(compositeKey, compositeKeySep) @@ -51,18 +72,21 @@ func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, uuid return compositeKey } -// splitCompositeKeyOfPvtRWSet splits the compositeKey (~txid~uuid~endorsementBlkHt) into endorserId and endorsementBlkHt. +// splitCompositeKeyOfPvtRWSet splits the compositeKey (~txid~uuid~endorsementBlkHt) +// into uuid and endorsementBlkHt. func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (uuid string, endorsementBlkHt uint64) { - compositeKey = compositeKey[2:] - firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep) - secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1 - uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex]) - endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:]) - return uuid, endorsementBlkHt + return splitCompositeKeyWithoutPrefixForTxid(compositeKey[2:]) } -// splitCompositeKeyOfPurgeIndex splits the compositeKey (~endorsementBlkHt~txid~uuid) into txid, uuid and endorsementBlkHt. -func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, uuid string, endorsementBlkHt uint64) { +// splitCompositeKeyOfPurgeIndexByTxid splits the compositeKey (~txid~uuid~endorsementBlkHt) +// into uuid and endorsementBlkHt. +func splitCompositeKeyOfPurgeIndexByTxid(compositeKey []byte) (uuid string, endorsementBlkHt uint64) { + return splitCompositeKeyWithoutPrefixForTxid(compositeKey[2:]) +} + +// splitCompositeKeyOfPurgeIndexByHeight splits the compositeKey (~endorsementBlkHt~txid~uuid) +// into txid, uuid and endorsementBlkHt. +func splitCompositeKeyOfPurgeIndexByHeight(compositeKey []byte) (txid string, uuid string, endorsementBlkHt uint64) { var n int endorsementBlkHt, n = util.DecodeOrderPreservingVarUint64(compositeKey[2:]) splits := bytes.Split(compositeKey[n+3:], []byte{compositeKeySep}) @@ -71,6 +95,17 @@ func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, uuid strin return } +// splitCompositeKeyWithoutPrefixForTxid splits the composite key txid~uuid~endorsementBlkHt into +// uuid and endorsementBlkHt +func splitCompositeKeyWithoutPrefixForTxid(compositeKey []byte) (uuid string, endorsementBlkHt uint64) { + // skip txid as all functions which requires split of composite key already has it + firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep) + secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1 + uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex]) + endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:]) + return +} + // createTxidRangeStartKey returns a startKey to do a range query on transient store using txid func createTxidRangeStartKey(txid string) []byte { var startKey []byte @@ -91,28 +126,50 @@ func createTxidRangeEndKey(txid string) []byte { return endKey } -// createEndorsementBlkHtRangeStartKey returns a startKey to do a range query on index stored in transient store +// createPurgeIndexByHeightRangeStartKey returns a startKey to do a range query on index stored in transient store // using endorsementBlkHt -func createEndorsementBlkHtRangeStartKey(endorsementBlkHt uint64) []byte { +func createPurgeIndexByHeightRangeStartKey(endorsementBlkHt uint64) []byte { var startKey []byte - startKey = append(startKey, purgeIndexPrefix) + startKey = append(startKey, purgeIndexByHeightPrefix) startKey = append(startKey, compositeKeySep) startKey = append(startKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...) startKey = append(startKey, compositeKeySep) return startKey } -// createEndorsementBlkHtRangeStartKey returns a endKey to do a range query on index stored in transient store +// createPurgeIndexByHeightRangeStartKey returns a endKey to do a range query on index stored in transient store // using endorsementBlkHt -func createEndorsementBlkHtRangeEndKey(endorsementBlkHt uint64) []byte { +func createPurgeIndexByHeightRangeEndKey(endorsementBlkHt uint64) []byte { var endKey []byte - endKey = append(endKey, purgeIndexPrefix) + endKey = append(endKey, purgeIndexByHeightPrefix) endKey = append(endKey, compositeKeySep) endKey = append(endKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...) endKey = append(endKey, byte(0xff)) return endKey } +// createPurgeIndexByTxidRangeStartKey returns a startKey to do a range query on index stored in transient store +// using txid +func createPurgeIndexByTxidRangeStartKey(txid string) []byte { + var startKey []byte + startKey = append(startKey, purgeIndexByTxidPrefix) + startKey = append(startKey, compositeKeySep) + startKey = append(startKey, []byte(txid)...) + startKey = append(startKey, compositeKeySep) + return startKey +} + +// createPurgeIndexByTxidRangeStartKey returns a endKey to do a range query on index stored in transient store +// using txid +func createPurgeIndexByTxidRangeEndKey(txid string) []byte { + var endKey []byte + endKey = append(endKey, purgeIndexByTxidPrefix) + endKey = append(endKey, compositeKeySep) + endKey = append(endKey, []byte(txid)...) + endKey = append(endKey, byte(0xff)) + return endKey +} + // GetTransientStorePath returns the filesystem path for temporarily storing the private rwset func GetTransientStorePath() string { sysPath := config.GetPath("peer.fileSystemPath") diff --git a/core/transientstore/store_test.go b/core/transientstore/store_test.go index 52f84e8ccd1..bd840f18fd1 100644 --- a/core/transientstore/store_test.go +++ b/core/transientstore/store_test.go @@ -39,8 +39,8 @@ func TestPurgeIndexKeyCodingEncoding(t *testing.T) { testCase := fmt.Sprintf("blkHt=%d,txid=%s,uuid=%s", blkHt, txid, uuid) t.Run(testCase, func(t *testing.T) { t.Logf("Running test case [%s]", testCase) - purgeIndexKey := createCompositeKeyForPurgeIndex(blkHt, txid, uuid) - txid1, uuid1, blkHt1 := splitCompositeKeyOfPurgeIndex(purgeIndexKey) + purgeIndexKey := createCompositeKeyForPurgeIndexByHeight(blkHt, txid, uuid) + txid1, uuid1, blkHt1 := splitCompositeKeyOfPurgeIndexByHeight(purgeIndexKey) assert.Equal(txid, txid1) assert.Equal(uuid, uuid1) assert.Equal(blkHt, blkHt1) @@ -121,7 +121,164 @@ func TestTransientStorePersistAndRetrieve(t *testing.T) { assert.Equal(endorsersResults, actualEndorsersResults) } -func TestStorePurge(t *testing.T) { +func TestTransientStorePurgeByTxids(t *testing.T) { + env := NewTestStoreEnv(t) + assert := assert.New(t) + + var txids []string + var endorsersResults []*EndorserPvtSimulationResults + + samplePvtRWSet := samplePvtData(t) + + // Create two private write set entry for txid-1 + txids = append(txids, "txid-1") + endorser0SimulationResults := &EndorserPvtSimulationResults{ + EndorsementBlockHeight: 10, + PvtSimulationResults: samplePvtRWSet, + } + endorsersResults = append(endorsersResults, endorser0SimulationResults) + + txids = append(txids, "txid-1") + endorser1SimulationResults := &EndorserPvtSimulationResults{ + EndorsementBlockHeight: 11, + PvtSimulationResults: samplePvtRWSet, + } + endorsersResults = append(endorsersResults, endorser1SimulationResults) + + // Create one private write set entry for txid-2 + txids = append(txids, "txid-2") + endorser2SimulationResults := &EndorserPvtSimulationResults{ + EndorsementBlockHeight: 11, + PvtSimulationResults: samplePvtRWSet, + } + endorsersResults = append(endorsersResults, endorser2SimulationResults) + + // Create three private write set entry for txid-3 + txids = append(txids, "txid-3") + endorser3SimulationResults := &EndorserPvtSimulationResults{ + EndorsementBlockHeight: 12, + PvtSimulationResults: samplePvtRWSet, + } + endorsersResults = append(endorsersResults, endorser3SimulationResults) + + txids = append(txids, "txid-3") + endorser4SimulationResults := &EndorserPvtSimulationResults{ + EndorsementBlockHeight: 12, + PvtSimulationResults: samplePvtRWSet, + } + endorsersResults = append(endorsersResults, endorser4SimulationResults) + + txids = append(txids, "txid-3") + endorser5SimulationResults := &EndorserPvtSimulationResults{ + EndorsementBlockHeight: 13, + PvtSimulationResults: samplePvtRWSet, + } + endorsersResults = append(endorsersResults, endorser5SimulationResults) + + var err error + for i := 0; i < len(txids); i++ { + err = env.TestStore.Persist(txids[i], endorsersResults[i].EndorsementBlockHeight, + endorsersResults[i].PvtSimulationResults) + assert.NoError(err) + } + + // Retrieve simulation results of txid-2 from store + iter, err := env.TestStore.GetTxPvtRWSetByTxid("txid-2", nil) + assert.NoError(err) + + // Expected results for txid-2 + var expectedEndorsersResults []*EndorserPvtSimulationResults + expectedEndorsersResults = append(expectedEndorsersResults, endorser2SimulationResults) + + // Check whether actual results and expected results are same + var actualEndorsersResults []*EndorserPvtSimulationResults + for true { + result, err := iter.Next() + assert.NoError(err) + if result == nil { + break + } + actualEndorsersResults = append(actualEndorsersResults, result) + } + iter.Close() + + // Note that the ordering of actualRes and expectedRes is dependent on the uuid. Hence, we are sorting + // expectedRes and actualRes. + sortResults(expectedEndorsersResults) + sortResults(actualEndorsersResults) + + assert.Equal(expectedEndorsersResults, actualEndorsersResults) + + // Remove all private write set of txid-2 and txid-3 + toRemoveTxids := []string{"txid-2", "txid-3"} + err = env.TestStore.PurgeByTxids(toRemoveTxids) + assert.NoError(err) + + for _, txid := range toRemoveTxids { + + // Check whether private write sets of txid-2 are removed + var expectedEndorsersResults *EndorserPvtSimulationResults + expectedEndorsersResults = nil + iter, err = env.TestStore.GetTxPvtRWSetByTxid(txid, nil) + assert.NoError(err) + // Should return nil, nil + result, err := iter.Next() + assert.NoError(err) + assert.Equal(expectedEndorsersResults, result) + } + + // Retrieve simulation results of txid-1 from store + iter, err = env.TestStore.GetTxPvtRWSetByTxid("txid-1", nil) + assert.NoError(err) + + // Expected results for txid-1 + expectedEndorsersResults = nil + expectedEndorsersResults = append(expectedEndorsersResults, endorser0SimulationResults) + expectedEndorsersResults = append(expectedEndorsersResults, endorser1SimulationResults) + + // Check whether actual results and expected results are same + actualEndorsersResults = nil + for true { + result, err := iter.Next() + assert.NoError(err) + if result == nil { + break + } + actualEndorsersResults = append(actualEndorsersResults, result) + } + iter.Close() + + // Note that the ordering of actualRes and expectedRes is dependent on the uuid. Hence, we are sorting + // expectedRes and actualRes. + sortResults(expectedEndorsersResults) + sortResults(actualEndorsersResults) + + assert.Equal(expectedEndorsersResults, actualEndorsersResults) + + toRemoveTxids = []string{"txid-1"} + err = env.TestStore.PurgeByTxids(toRemoveTxids) + assert.NoError(err) + + for _, txid := range toRemoveTxids { + + // Check whether private write sets of txid-1 are removed + var expectedEndorsersResults *EndorserPvtSimulationResults + expectedEndorsersResults = nil + iter, err = env.TestStore.GetTxPvtRWSetByTxid(txid, nil) + assert.NoError(err) + // Should return nil, nil + result, err := iter.Next() + assert.NoError(err) + assert.Equal(expectedEndorsersResults, result) + } + + // There should be no entries in the store + _, err = env.TestStore.GetMinEndorsementBlkHt() + assert.Equal(err, ErrStoreEmpty) + +} + +func TestTransientStorePurgeByHeight(t *testing.T) { env := NewTestStoreEnv(t) assert := assert.New(t) @@ -176,7 +333,7 @@ func TestStorePurge(t *testing.T) { // Retain results generate at block height greater than or equal to 12 minEndorsementBlkHtToRetain := uint64(12) - err = env.TestStore.Purge(minEndorsementBlkHtToRetain) + err = env.TestStore.PurgeByHeight(minEndorsementBlkHtToRetain) assert.NoError(err) // Retrieve simulation results of txid-1 from store @@ -216,7 +373,7 @@ func TestStorePurge(t *testing.T) { // Retain results generate at block height greater than or equal to 15 minEndorsementBlkHtToRetain = uint64(15) - err = env.TestStore.Purge(minEndorsementBlkHtToRetain) + err = env.TestStore.PurgeByHeight(minEndorsementBlkHtToRetain) assert.NoError(err) // There should be no entries in the store @@ -225,7 +382,7 @@ func TestStorePurge(t *testing.T) { // Retain results generate at block height greater than or equal to 15 minEndorsementBlkHtToRetain = uint64(15) - err = env.TestStore.Purge(minEndorsementBlkHtToRetain) + err = env.TestStore.PurgeByHeight(minEndorsementBlkHtToRetain) // Should not return any error assert.NoError(err)