Skip to content

Commit

Permalink
recon: update BTL bookkeeping managed by purge mgr
Browse files Browse the repository at this point in the history
Before we commit the pvtData of old blocks to the stateDB,
we must update the BTL bookkeeping accordingly by creating
appropriate expiryEntries.

FAB-12888 #done

Change-Id: Id16b69e5a2e0d59c2497ddad7fda3d82ed23e0a5
Signed-off-by: senthil <cendhu@gmail.com>
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
cendhu authored and manish-sethi committed Nov 20, 2018
1 parent 6bf6ead commit e00dfcb
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 12 deletions.
7 changes: 5 additions & 2 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,12 @@ func (h HashedUpdateBatch) ToCompositeKeyMap() map[HashedCompositeKey]*statedb.V
return m
}

// PvtdataCompositeKeyMap is a map of PvtdataCompositeKey to VersionedValue
type PvtdataCompositeKeyMap map[PvtdataCompositeKey]*statedb.VersionedValue

// ToCompositeKeyMap rearranges the update batch data in the form of a single map
func (p PvtUpdateBatch) ToCompositeKeyMap() map[PvtdataCompositeKey]*statedb.VersionedValue {
m := make(map[PvtdataCompositeKey]*statedb.VersionedValue)
func (p PvtUpdateBatch) ToCompositeKeyMap() PvtdataCompositeKeyMap {
m := make(PvtdataCompositeKeyMap)
for ns, nsBatch := range p.UpdateMap {
for _, coll := range nsBatch.GetCollectionNames() {
for key, vv := range nsBatch.GetUpdates(coll) {
Expand Down
11 changes: 11 additions & 0 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type expiryKeeper interface {
updateBookkeeping(toTrack []*expiryInfo, toClear []*expiryInfoKey) error
// retrieve returns the keys info that are supposed to be expired by the given block number
retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error)
// retrieveByExpiryKey retrieves the expiryInfo for given expiryKey
retrieveByExpiryKey(expiryKey *expiryInfoKey) (*expiryInfo, error)
}

func newExpiryKeeper(ledgerid string, provider bookkeeping.Provider) expiryKeeper {
Expand Down Expand Up @@ -95,6 +97,15 @@ func (ek *expKeeper) retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error) {
return listExpinfo, nil
}

func (ek *expKeeper) retrieveByExpiryKey(expiryKey *expiryInfoKey) (*expiryInfo, error) {
key := encodeExpiryInfoKey(expiryKey)
value, err := ek.db.Get(key)
if err != nil {
return nil, err
}
return decodeExpiryInfo(key, value)
}

func encodeKV(expinfo *expiryInfo) (key []byte, value []byte, err error) {
key = encodeExpiryInfoKey(expinfo.expiryInfoKey)
value, err = encodeExpiryInfoValue(expinfo.pvtdataKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestBuildExpiryScheduleWithMissingPvtdata(t *testing.T) {
}

func putPvtAndHashUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
putPvtUpdates(updates, ns, coll, key, value, ver)
putHashUpdates(updates, ns, coll, key, value, ver)
}

Expand All @@ -120,6 +120,10 @@ func putHashUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key stri
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
}

func putPvtUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
}

func deleteHashUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key string, ver *version.Height) {
updates.HashUpdates.Delete(ns, coll, util.ComputeStringHash(key), ver)
}
33 changes: 29 additions & 4 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/util"
)

// PurgeMgr manages purging of the expired pvtdata
Expand All @@ -27,6 +28,8 @@ type PurgeMgr interface {
DeleteExpiredAndUpdateBookkeeping(
pvtUpdates *privacyenabledstate.PvtUpdateBatch,
hashedUpdates *privacyenabledstate.HashedUpdateBatch) error
// UpdateBookkeepingForPvtDataOfOldBlocks updates the existing expiry entries in the bookkeeper with the given pvtUpdates
UpdateBookkeepingForPvtDataOfOldBlocks(pvtUpdates *privacyenabledstate.PvtUpdateBatch) error
// BlockCommitDone is a callback to the PurgeMgr when the block is committed to the ledger
BlockCommitDone() error
}
Expand All @@ -37,7 +40,7 @@ type keyAndVersion struct {
purgeKeyOnly bool
}

type expiryInfoMap map[*privacyenabledstate.HashedCompositeKey]*keyAndVersion
type expiryInfoMap map[privacyenabledstate.HashedCompositeKey]*keyAndVersion

type workingset struct {
toPurge expiryInfoMap
Expand Down Expand Up @@ -86,6 +89,28 @@ func (p *purgeMgr) WaitForPrepareToFinish() {
p.lock.Unlock()
}

func (p *purgeMgr) UpdateBookkeepingForPvtDataOfOldBlocks(pvtUpdates *privacyenabledstate.PvtUpdateBatch) error {
builder := newExpiryScheduleBuilder(p.btlPolicy)
pvtUpdateCompositeKeyMap := pvtUpdates.ToCompositeKeyMap()
for k, vv := range pvtUpdateCompositeKeyMap {
builder.add(k.Namespace, k.CollectionName, k.Key, util.ComputeStringHash(k.Key), vv)
}

var updatedList []*expiryInfo
for _, toAdd := range builder.getExpiryInfo() {
toUpdate, err := p.expKeeper.retrieveByExpiryKey(toAdd.expiryInfoKey)
if err != nil {
return err
}
// Though we could update the existing entry (as there should be one due
// to only the keyHash of this pvtUpdateKey), for simplicity and to be less
// expensive, we append a new entry
toUpdate.pvtdataKeys.addAll(toAdd.pvtdataKeys)
updatedList = append(updatedList, toUpdate)
}
return p.expKeeper.updateBookkeeping(updatedList, nil)
}

// DeleteExpiredAndUpdateBookkeeping implements function in the interface 'PurgeMgr'
func (p *purgeMgr) DeleteExpiredAndUpdateBookkeeping(
pvtUpdates *privacyenabledstate.PvtUpdateBatch,
Expand Down Expand Up @@ -216,18 +241,18 @@ func (p *purgeMgr) preloadCommittedVersionsInCache(expInfoMap expiryInfoMap) {
}
var hashedKeys []*privacyenabledstate.HashedCompositeKey
for k := range expInfoMap {
hashedKeys = append(hashedKeys, k)
hashedKeys = append(hashedKeys, &k)
}
p.db.LoadCommittedVersionsOfPubAndHashedKeys(nil, hashedKeys)
}

func transformToExpiryInfoMap(expiryInfo []*expiryInfo) expiryInfoMap {
var expinfoMap expiryInfoMap = make(map[*privacyenabledstate.HashedCompositeKey]*keyAndVersion)
expinfoMap := make(expiryInfoMap)
for _, expinfo := range expiryInfo {
for ns, colls := range expinfo.pvtdataKeys.Map {
for coll, keysAndHashes := range colls.Map {
for _, keyAndHash := range keysAndHashes.List {
compositeKey := &privacyenabledstate.HashedCompositeKey{Namespace: ns, CollectionName: coll, KeyHash: string(keyAndHash.Hash)}
compositeKey := privacyenabledstate.HashedCompositeKey{Namespace: ns, CollectionName: coll, KeyHash: string(keyAndHash.Hash)}
expinfoMap[compositeKey] = &keyAndVersion{key: keyAndHash.Key, committingBlock: expinfo.expiryInfoKey.committingBlk}
}
}
Expand Down
73 changes: 71 additions & 2 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,63 @@ func testPurgeMgr(t *testing.T, dbEnv privacyenabledstate.TestEnv) {
testHelper.checkPvtdataDoesNotExist("ns1", "coll4", "pvtkey4")
}

func TestPurgeMgrForCommittingPvtDataOfOldBlocks(t *testing.T) {
dbEnvs := []privacyenabledstate.TestEnv{
&privacyenabledstate.LevelDBCommonStorageTestEnv{},
&privacyenabledstate.CouchDBCommonStorageTestEnv{},
}
for _, dbEnv := range dbEnvs {
t.Run(dbEnv.GetName(), func(t *testing.T) { testPurgeMgrForCommittingPvtDataOfOldBlocks(t, dbEnv) })
}
}

func testPurgeMgrForCommittingPvtDataOfOldBlocks(t *testing.T, dbEnv privacyenabledstate.TestEnv) {
ledgerid := "testledger-purge-mgr-pvtdata-oldblocks"
btlPolicy := btltestutil.SampleBTLPolicy(
map[[2]string]uint64{
{"ns1", "coll1"}: 1,
},
)

testHelper := &testHelper{}
testHelper.init(t, ledgerid, btlPolicy, dbEnv)
defer testHelper.cleanup()

// committing block 1
block1Updates := privacyenabledstate.NewUpdateBatch()
// pvt data pvtkey1 is missing but the pvtkey2 is present.
// pvtkey1 and pvtkey2 both would get expired and purged while committing block 3
putHashUpdates(block1Updates, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1-1"), version.NewHeight(1, 1))
putPvtAndHashUpdates(t, block1Updates, "ns1", "coll1", "pvtkey2", []byte("pvtvalue1-2"), version.NewHeight(1, 1))
testHelper.commitUpdatesForTesting(1, block1Updates)

// pvtkey1 should not exist but pvtkey2 should exist
testHelper.checkOnlyPvtKeyDoesNotExist("ns1", "coll1", "pvtkey1")
testHelper.checkPvtdataExists("ns1", "coll1", "pvtkey2", []byte("pvtvalue1-2"))

// committing block 2
block2Updates := privacyenabledstate.NewUpdateBatch()
testHelper.commitUpdatesForTesting(2, block2Updates)

// Commit pvtkey1 via commit of missing data and this should be added to toPurge list as it
// should be removed while committing block 3
block1PvtData := privacyenabledstate.NewUpdateBatch()
putPvtUpdates(block1PvtData, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1-1"), version.NewHeight(1, 1))
testHelper.commitPvtDataOfOldBlocksForTesting(block1PvtData)

// both pvtkey1 and pvtkey1 should exist
testHelper.checkPvtdataExists("ns1", "coll1", "pvtkey1", []byte("pvtvalue1-1"))
testHelper.checkPvtdataExists("ns1", "coll1", "pvtkey2", []byte("pvtvalue1-2"))

// committing block 3
block3Updates := privacyenabledstate.NewUpdateBatch()
testHelper.commitUpdatesForTesting(3, block3Updates)

// both pvtkey1 and pvtkey1 should not exist
testHelper.checkPvtdataDoesNotExist("ns1", "coll1", "pvtkey1")
testHelper.checkPvtdataDoesNotExist("ns1", "coll1", "pvtkey2")
}

func TestKeyUpdateBeforeExpiryBlock(t *testing.T) {
dbEnv := &privacyenabledstate.LevelDBCommonStorageTestEnv{}
ledgerid := "testledger-perge-mgr"
Expand Down Expand Up @@ -222,8 +279,9 @@ type testHelper struct {
bookkeepingEnv *bookkeeping.TestEnv
dbEnv privacyenabledstate.TestEnv

db privacyenabledstate.DB
purgeMgr PurgeMgr
db privacyenabledstate.DB
purgeMgr PurgeMgr
purgerUsedOnce bool
}

func (h *testHelper) init(t *testing.T, ledgerid string, btlPolicy pvtdatapolicy.BTLPolicy, dbEnv privacyenabledstate.TestEnv) {
Expand Down Expand Up @@ -251,6 +309,11 @@ func (h *testHelper) commitUpdatesForTesting(blkNum uint64, updates *privacyenab
h.purgeMgr.BlockCommitDone()
}

func (h *testHelper) commitPvtDataOfOldBlocksForTesting(updates *privacyenabledstate.UpdateBatch) {
assert.NoError(h.t, h.purgeMgr.UpdateBookkeepingForPvtDataOfOldBlocks(updates.PvtUpdates))
assert.NoError(h.t, h.db.ApplyPrivacyAwareUpdates(updates, nil))
}

func (h *testHelper) checkPvtdataExists(ns, coll, key string, value []byte) {
vv, _ := h.fetchPvtdataFronDB(ns, coll, key)
vv, hashVersion := h.fetchPvtdataFronDB(ns, coll, key)
Expand All @@ -272,6 +335,12 @@ func (h *testHelper) checkOnlyPvtKeyExists(ns, coll, key string, value []byte) {
assert.Equal(h.t, value, vv.Value)
}

func (h *testHelper) checkOnlyPvtKeyDoesNotExist(ns, coll, key string) {
kv, err := h.db.GetPrivateData(ns, coll, key)
assert.Nil(h.t, err)
assert.Nil(h.t, kv)
}

func (h *testHelper) checkOnlyKeyHashExists(ns, coll, key string) {
vv, hashVersion := h.fetchPvtdataFronDB(ns, coll, key)
assert.Nil(h.t, vv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ SPDX-License-Identifier: Apache-2.0

package pvtstatepurgemgmt

func (pvtdataKeys *PvtdataKeys) addAll(toAdd *PvtdataKeys) {
for ns, colls := range toAdd.Map {
for coll, keysAndHashes := range colls.Map {
for _, k := range keysAndHashes.List {
pvtdataKeys.add(ns, coll, k.Key, k.Hash)
}
}
}
}

func (pvtdataKeys *PvtdataKeys) add(ns string, coll string, key string, keyhash []byte) {
colls := pvtdataKeys.getOrCreateCollections(ns)
keysAndHashes := colls.getOrCreateKeysAndHashes(coll)
Expand Down
19 changes: 19 additions & 0 deletions core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,22 @@ func TestItrWithoutClose(t *testing.T, itr statedb.ResultsIterator, expectedKeys
assert.NoError(t, err, "An unexpected error was thrown during iterator Next()")
assert.Nil(t, queryResult)
}

func TestApplyUpdatesWithNilHeight(t *testing.T, dbProvider statedb.VersionedDBProvider) {
db, err := dbProvider.GetDBHandle("test-apply-updates-with-nil-height")
assert.NoError(t, err)

batch1 := statedb.NewUpdateBatch()
batch1.Put("ns", "key1", []byte("value1"), version.NewHeight(1, 4))
savePoint := version.NewHeight(1, 5)
assert.NoError(t, db.ApplyUpdates(batch1, savePoint))

batch2 := statedb.NewUpdateBatch()
batch2.Put("ns", "key1", []byte("value2"), version.NewHeight(1, 1))
assert.NoError(t, db.ApplyUpdates(batch2, nil))

ht, err := db.GetLatestSavePoint()
assert.NoError(t, err)
assert.Equal(t, savePoint, ht) // savepoint should still be what was set with batch1
// (because batch2 calls ApplyUpdates with savepoint as nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,14 @@ func (vdb *VersionedDB) ensureFullCommitAndRecordSavepoint(height *version.Heigh
if err := vdb.ensureFullCommit(dbs); err != nil {
return err
}

// If a given height is nil, it denotes that we are committing pvt data of old blocks.
// In this case, we should not store a savepoint for recovery. The lastUpdatedOldBlockList
// in the pvtstore acts as a savepoint for pvt data.
if height == nil {
return nil
}

// construct savepoint document and save
savepointCouchDoc, err := encodeSavepoint(height)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,9 @@ func TestPaginatedQueryValidation(t *testing.T) {
assert.Error(t, err, "An should have been thrown for an invalid options")

}

func TestApplyUpdatesWithNilHeight(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()
commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider)
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,13 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
}
}
}
dbBatch.Put(savePointKey, height.ToBytes())
// Record a savepoint at a given height
// If a given height is nil, it denotes that we are committing pvt data of old blocks.
// In this case, we should not store a savepoint for recovery. The lastUpdatedOldBlockList
// in the pvtstore acts as a savepoint for pvt data.
if height != nil {
dbBatch.Put(savePointKey, height.ToBytes())
}
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,9 @@ func TestPaginatedRangeQuery(t *testing.T) {
defer env.Cleanup()
commontests.TestPaginatedRangeQuery(t, env.DBProvider)
}

func TestApplyUpdatesWithNilHeight(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()
commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider)
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (txmgr *LockBasedTxMgr) Commit() error {
}
defer func() {
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.current.blockNum() + 1)
logger.Debugf("Cleared version cache and launched the background routine for preparing keys to purge with the next block")
logger.Debugf("launched the background routine for preparing keys to purge with the next block")
txmgr.reset()
}()

Expand All @@ -423,7 +423,11 @@ func (txmgr *LockBasedTxMgr) Commit() error {
}
// only during the exclusive lock duration, we should clear the cache as the cache is being
// used by the old pvtData committer as well
txmgr.clearCache()
txmgr.clearCache() // note that we should clear the cache before calling
// PrepareForExpiringKeys as it uses the cache as well. To be precise,
// we should not clear the cache until PrepareForExpiringKeys completes
// the task.
logger.Debugf("cleared cached")
txmgr.commitRWLock.Unlock()
// EXCLUSIVE LOCK ENDS
logger.Debugf("Updates committed to state database")
Expand Down

0 comments on commit e00dfcb

Please sign in to comment.