diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go index 659cffb0159..fa9ea1f195d 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go @@ -199,9 +199,12 @@ func (h HashedUpdateBatch) ToCompositeKeyMap() map[HashedCompositeKey]*statedb.V return m } +// PvtdataCompositeKeyMap is a map of PvtdataCompositeKey to VersionedValue +type PvtdataCompositeKeyMap map[PvtdataCompositeKey]*statedb.VersionedValue + // ToCompositeKeyMap rearranges the update batch data in the form of a single map -func (p PvtUpdateBatch) ToCompositeKeyMap() map[PvtdataCompositeKey]*statedb.VersionedValue { - m := make(map[PvtdataCompositeKey]*statedb.VersionedValue) +func (p PvtUpdateBatch) ToCompositeKeyMap() PvtdataCompositeKeyMap { + m := make(PvtdataCompositeKeyMap) for ns, nsBatch := range p.UpdateMap { for _, coll := range nsBatch.GetCollectionNames() { for key, vv := range nsBatch.GetUpdates(coll) { diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go index 1d1436cb575..4b412f28360 100644 --- a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go @@ -41,6 +41,8 @@ type expiryKeeper interface { updateBookkeeping(toTrack []*expiryInfo, toClear []*expiryInfoKey) error // retrieve returns the keys info that are supposed to be expired by the given block number retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error) + // retrieveByExpiryKey retrieves the expiryInfo for given expiryKey + retrieveByExpiryKey(expiryKey *expiryInfoKey) (*expiryInfo, error) } func newExpiryKeeper(ledgerid string, provider bookkeeping.Provider) expiryKeeper { @@ -95,6 +97,15 @@ func (ek *expKeeper) retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error) { return listExpinfo, nil } +func (ek *expKeeper) retrieveByExpiryKey(expiryKey *expiryInfoKey) (*expiryInfo, error) { + key := encodeExpiryInfoKey(expiryKey) + value, err := ek.db.Get(key) + if err != nil { + return nil, err + } + return decodeExpiryInfo(key, value) +} + func encodeKV(expinfo *expiryInfo) (key []byte, value []byte, err error) { key = encodeExpiryInfoKey(expinfo.expiryInfoKey) value, err = encodeExpiryInfoValue(expinfo.pvtdataKeys) diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_schedule_builder_test.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_schedule_builder_test.go index 360e9bdd8cc..83fd88fde54 100644 --- a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_schedule_builder_test.go +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_schedule_builder_test.go @@ -107,7 +107,7 @@ func TestBuildExpiryScheduleWithMissingPvtdata(t *testing.T) { } func putPvtAndHashUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) { - updates.PvtUpdates.Put(ns, coll, key, value, ver) + putPvtUpdates(updates, ns, coll, key, value, ver) putHashUpdates(updates, ns, coll, key, value, ver) } @@ -120,6 +120,10 @@ func putHashUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key stri updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver) } +func putPvtUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) { + updates.PvtUpdates.Put(ns, coll, key, value, ver) +} + func deleteHashUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key string, ver *version.Height) { updates.HashUpdates.Delete(ns, coll, util.ComputeStringHash(key), ver) } diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go index e932fab36e8..65ef319b0f9 100644 --- a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go @@ -15,6 +15,7 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" + "github.com/hyperledger/fabric/core/ledger/util" ) // PurgeMgr manages purging of the expired pvtdata @@ -27,6 +28,8 @@ type PurgeMgr interface { DeleteExpiredAndUpdateBookkeeping( pvtUpdates *privacyenabledstate.PvtUpdateBatch, hashedUpdates *privacyenabledstate.HashedUpdateBatch) error + // UpdateBookkeepingForPvtDataOfOldBlocks updates the existing expiry entries in the bookkeeper with the given pvtUpdates + UpdateBookkeepingForPvtDataOfOldBlocks(pvtUpdates *privacyenabledstate.PvtUpdateBatch) error // BlockCommitDone is a callback to the PurgeMgr when the block is committed to the ledger BlockCommitDone() error } @@ -37,7 +40,7 @@ type keyAndVersion struct { purgeKeyOnly bool } -type expiryInfoMap map[*privacyenabledstate.HashedCompositeKey]*keyAndVersion +type expiryInfoMap map[privacyenabledstate.HashedCompositeKey]*keyAndVersion type workingset struct { toPurge expiryInfoMap @@ -86,6 +89,28 @@ func (p *purgeMgr) WaitForPrepareToFinish() { p.lock.Unlock() } +func (p *purgeMgr) UpdateBookkeepingForPvtDataOfOldBlocks(pvtUpdates *privacyenabledstate.PvtUpdateBatch) error { + builder := newExpiryScheduleBuilder(p.btlPolicy) + pvtUpdateCompositeKeyMap := pvtUpdates.ToCompositeKeyMap() + for k, vv := range pvtUpdateCompositeKeyMap { + builder.add(k.Namespace, k.CollectionName, k.Key, util.ComputeStringHash(k.Key), vv) + } + + var updatedList []*expiryInfo + for _, toAdd := range builder.getExpiryInfo() { + toUpdate, err := p.expKeeper.retrieveByExpiryKey(toAdd.expiryInfoKey) + if err != nil { + return err + } + // Though we could update the existing entry (as there should be one due + // to only the keyHash of this pvtUpdateKey), for simplicity and to be less + // expensive, we append a new entry + toUpdate.pvtdataKeys.addAll(toAdd.pvtdataKeys) + updatedList = append(updatedList, toUpdate) + } + return p.expKeeper.updateBookkeeping(updatedList, nil) +} + // DeleteExpiredAndUpdateBookkeeping implements function in the interface 'PurgeMgr' func (p *purgeMgr) DeleteExpiredAndUpdateBookkeeping( pvtUpdates *privacyenabledstate.PvtUpdateBatch, @@ -216,18 +241,18 @@ func (p *purgeMgr) preloadCommittedVersionsInCache(expInfoMap expiryInfoMap) { } var hashedKeys []*privacyenabledstate.HashedCompositeKey for k := range expInfoMap { - hashedKeys = append(hashedKeys, k) + hashedKeys = append(hashedKeys, &k) } p.db.LoadCommittedVersionsOfPubAndHashedKeys(nil, hashedKeys) } func transformToExpiryInfoMap(expiryInfo []*expiryInfo) expiryInfoMap { - var expinfoMap expiryInfoMap = make(map[*privacyenabledstate.HashedCompositeKey]*keyAndVersion) + expinfoMap := make(expiryInfoMap) for _, expinfo := range expiryInfo { for ns, colls := range expinfo.pvtdataKeys.Map { for coll, keysAndHashes := range colls.Map { for _, keyAndHash := range keysAndHashes.List { - compositeKey := &privacyenabledstate.HashedCompositeKey{Namespace: ns, CollectionName: coll, KeyHash: string(keyAndHash.Hash)} + compositeKey := privacyenabledstate.HashedCompositeKey{Namespace: ns, CollectionName: coll, KeyHash: string(keyAndHash.Hash)} expinfoMap[compositeKey] = &keyAndVersion{key: keyAndHash.Key, committingBlock: expinfo.expiryInfoKey.committingBlk} } } diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr_test.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr_test.go index e7a5e8b4275..b528f803dac 100644 --- a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr_test.go +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr_test.go @@ -100,6 +100,63 @@ func testPurgeMgr(t *testing.T, dbEnv privacyenabledstate.TestEnv) { testHelper.checkPvtdataDoesNotExist("ns1", "coll4", "pvtkey4") } +func TestPurgeMgrForCommittingPvtDataOfOldBlocks(t *testing.T) { + dbEnvs := []privacyenabledstate.TestEnv{ + &privacyenabledstate.LevelDBCommonStorageTestEnv{}, + &privacyenabledstate.CouchDBCommonStorageTestEnv{}, + } + for _, dbEnv := range dbEnvs { + t.Run(dbEnv.GetName(), func(t *testing.T) { testPurgeMgrForCommittingPvtDataOfOldBlocks(t, dbEnv) }) + } +} + +func testPurgeMgrForCommittingPvtDataOfOldBlocks(t *testing.T, dbEnv privacyenabledstate.TestEnv) { + ledgerid := "testledger-purge-mgr-pvtdata-oldblocks" + btlPolicy := btltestutil.SampleBTLPolicy( + map[[2]string]uint64{ + {"ns1", "coll1"}: 1, + }, + ) + + testHelper := &testHelper{} + testHelper.init(t, ledgerid, btlPolicy, dbEnv) + defer testHelper.cleanup() + + // committing block 1 + block1Updates := privacyenabledstate.NewUpdateBatch() + // pvt data pvtkey1 is missing but the pvtkey2 is present. + // pvtkey1 and pvtkey2 both would get expired and purged while committing block 3 + putHashUpdates(block1Updates, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1-1"), version.NewHeight(1, 1)) + putPvtAndHashUpdates(t, block1Updates, "ns1", "coll1", "pvtkey2", []byte("pvtvalue1-2"), version.NewHeight(1, 1)) + testHelper.commitUpdatesForTesting(1, block1Updates) + + // pvtkey1 should not exist but pvtkey2 should exist + testHelper.checkOnlyPvtKeyDoesNotExist("ns1", "coll1", "pvtkey1") + testHelper.checkPvtdataExists("ns1", "coll1", "pvtkey2", []byte("pvtvalue1-2")) + + // committing block 2 + block2Updates := privacyenabledstate.NewUpdateBatch() + testHelper.commitUpdatesForTesting(2, block2Updates) + + // Commit pvtkey1 via commit of missing data and this should be added to toPurge list as it + // should be removed while committing block 3 + block1PvtData := privacyenabledstate.NewUpdateBatch() + putPvtUpdates(block1PvtData, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1-1"), version.NewHeight(1, 1)) + testHelper.commitPvtDataOfOldBlocksForTesting(block1PvtData) + + // both pvtkey1 and pvtkey1 should exist + testHelper.checkPvtdataExists("ns1", "coll1", "pvtkey1", []byte("pvtvalue1-1")) + testHelper.checkPvtdataExists("ns1", "coll1", "pvtkey2", []byte("pvtvalue1-2")) + + // committing block 3 + block3Updates := privacyenabledstate.NewUpdateBatch() + testHelper.commitUpdatesForTesting(3, block3Updates) + + // both pvtkey1 and pvtkey1 should not exist + testHelper.checkPvtdataDoesNotExist("ns1", "coll1", "pvtkey1") + testHelper.checkPvtdataDoesNotExist("ns1", "coll1", "pvtkey2") +} + func TestKeyUpdateBeforeExpiryBlock(t *testing.T) { dbEnv := &privacyenabledstate.LevelDBCommonStorageTestEnv{} ledgerid := "testledger-perge-mgr" @@ -222,8 +279,9 @@ type testHelper struct { bookkeepingEnv *bookkeeping.TestEnv dbEnv privacyenabledstate.TestEnv - db privacyenabledstate.DB - purgeMgr PurgeMgr + db privacyenabledstate.DB + purgeMgr PurgeMgr + purgerUsedOnce bool } func (h *testHelper) init(t *testing.T, ledgerid string, btlPolicy pvtdatapolicy.BTLPolicy, dbEnv privacyenabledstate.TestEnv) { @@ -251,6 +309,11 @@ func (h *testHelper) commitUpdatesForTesting(blkNum uint64, updates *privacyenab h.purgeMgr.BlockCommitDone() } +func (h *testHelper) commitPvtDataOfOldBlocksForTesting(updates *privacyenabledstate.UpdateBatch) { + assert.NoError(h.t, h.purgeMgr.UpdateBookkeepingForPvtDataOfOldBlocks(updates.PvtUpdates)) + assert.NoError(h.t, h.db.ApplyPrivacyAwareUpdates(updates, nil)) +} + func (h *testHelper) checkPvtdataExists(ns, coll, key string, value []byte) { vv, _ := h.fetchPvtdataFronDB(ns, coll, key) vv, hashVersion := h.fetchPvtdataFronDB(ns, coll, key) @@ -272,6 +335,12 @@ func (h *testHelper) checkOnlyPvtKeyExists(ns, coll, key string, value []byte) { assert.Equal(h.t, value, vv.Value) } +func (h *testHelper) checkOnlyPvtKeyDoesNotExist(ns, coll, key string) { + kv, err := h.db.GetPrivateData(ns, coll, key) + assert.Nil(h.t, err) + assert.Nil(h.t, kv) +} + func (h *testHelper) checkOnlyKeyHashExists(ns, coll, key string) { vv, hashVersion := h.fetchPvtdataFronDB(ns, coll, key) assert.Nil(h.t, vv) diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go index 222ea88e38f..738eb052a35 100644 --- a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go @@ -6,6 +6,16 @@ SPDX-License-Identifier: Apache-2.0 package pvtstatepurgemgmt +func (pvtdataKeys *PvtdataKeys) addAll(toAdd *PvtdataKeys) { + for ns, colls := range toAdd.Map { + for coll, keysAndHashes := range colls.Map { + for _, k := range keysAndHashes.List { + pvtdataKeys.add(ns, coll, k.Key, k.Hash) + } + } + } +} + func (pvtdataKeys *PvtdataKeys) add(ns string, coll string, key string, keyhash []byte) { colls := pvtdataKeys.getOrCreateCollections(ns) keysAndHashes := colls.getOrCreateKeysAndHashes(coll) diff --git a/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go b/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go index 64feecf45a6..01f25a87f50 100644 --- a/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go +++ b/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go @@ -947,3 +947,22 @@ func TestItrWithoutClose(t *testing.T, itr statedb.ResultsIterator, expectedKeys assert.NoError(t, err, "An unexpected error was thrown during iterator Next()") assert.Nil(t, queryResult) } + +func TestApplyUpdatesWithNilHeight(t *testing.T, dbProvider statedb.VersionedDBProvider) { + db, err := dbProvider.GetDBHandle("test-apply-updates-with-nil-height") + assert.NoError(t, err) + + batch1 := statedb.NewUpdateBatch() + batch1.Put("ns", "key1", []byte("value1"), version.NewHeight(1, 4)) + savePoint := version.NewHeight(1, 5) + assert.NoError(t, db.ApplyUpdates(batch1, savePoint)) + + batch2 := statedb.NewUpdateBatch() + batch2.Put("ns", "key1", []byte("value2"), version.NewHeight(1, 1)) + assert.NoError(t, db.ApplyUpdates(batch2, nil)) + + ht, err := db.GetLatestSavePoint() + assert.NoError(t, err) + assert.Equal(t, savePoint, ht) // savepoint should still be what was set with batch1 + // (because batch2 calls ApplyUpdates with savepoint as nil) +} diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index 17b6da13f5b..da6b01a8bd8 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -489,6 +489,14 @@ func (vdb *VersionedDB) ensureFullCommitAndRecordSavepoint(height *version.Heigh if err := vdb.ensureFullCommit(dbs); err != nil { return err } + + // If a given height is nil, it denotes that we are committing pvt data of old blocks. + // In this case, we should not store a savepoint for recovery. The lastUpdatedOldBlockList + // in the pvtstore acts as a savepoint for pvt data. + if height == nil { + return nil + } + // construct savepoint document and save savepointCouchDoc, err := encodeSavepoint(height) if err != nil { diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go index ed1f24b4150..7f8dce14c84 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go @@ -686,3 +686,9 @@ func TestPaginatedQueryValidation(t *testing.T) { assert.Error(t, err, "An should have been thrown for an invalid options") } + +func TestApplyUpdatesWithNilHeight(t *testing.T) { + env := NewTestVDBEnv(t) + defer env.Cleanup() + commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider) +} diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go index a88290277b9..043f8dda8a0 100644 --- a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go @@ -185,7 +185,13 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version } } } - dbBatch.Put(savePointKey, height.ToBytes()) + // Record a savepoint at a given height + // If a given height is nil, it denotes that we are committing pvt data of old blocks. + // In this case, we should not store a savepoint for recovery. The lastUpdatedOldBlockList + // in the pvtstore acts as a savepoint for pvt data. + if height != nil { + dbBatch.Put(savePointKey, height.ToBytes()) + } // Setting snyc to true as a precaution, false may be an ok optimization after further testing. if err := vdb.db.WriteBatch(dbBatch, true); err != nil { return err diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go index 16fc1d390e9..e91b9146491 100644 --- a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go @@ -130,3 +130,9 @@ func TestPaginatedRangeQuery(t *testing.T) { defer env.Cleanup() commontests.TestPaginatedRangeQuery(t, env.DBProvider) } + +func TestApplyUpdatesWithNilHeight(t *testing.T) { + env := NewTestVDBEnv(t) + defer env.Cleanup() + commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider) +} diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go index e46279dc281..a8ee4ea5427 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go @@ -399,7 +399,7 @@ func (txmgr *LockBasedTxMgr) Commit() error { } defer func() { txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.current.blockNum() + 1) - logger.Debugf("Cleared version cache and launched the background routine for preparing keys to purge with the next block") + logger.Debugf("launched the background routine for preparing keys to purge with the next block") txmgr.reset() }() @@ -423,7 +423,11 @@ func (txmgr *LockBasedTxMgr) Commit() error { } // only during the exclusive lock duration, we should clear the cache as the cache is being // used by the old pvtData committer as well - txmgr.clearCache() + txmgr.clearCache() // note that we should clear the cache before calling + // PrepareForExpiringKeys as it uses the cache as well. To be precise, + // we should not clear the cache until PrepareForExpiringKeys completes + // the task. + logger.Debugf("cleared cached") txmgr.commitRWLock.Unlock() // EXCLUSIVE LOCK ENDS logger.Debugf("Updates committed to state database")