Skip to content

Commit

Permalink
recon: commit pvtData of oldBlks tp StateDB
Browse files Browse the repository at this point in the history
Commit the valid old pvtData to the stateDB

FAB-11765 #done

Change-Id: I5e4bd529329f39cbad204a85490f80889ba953c2
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Nov 20, 2018
1 parent e00dfcb commit bd5df09
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 20 deletions.
73 changes: 53 additions & 20 deletions core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type LockBasedTxMgr struct {
stateListeners []ledger.StateListener
ccInfoProvider ledger.DeployedChaincodeInfoProvider
commitRWLock sync.RWMutex
oldBlockCommit sync.Mutex
current *current
}

Expand Down Expand Up @@ -115,10 +116,10 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
// RemoveStaleAndCommitPvtDataOfOldBlocks implements method in interface `txmgmt.TxMgr`
// The following six operations are performed:
// (1) contructs the unique pvt data from the passed blocksPvtData
// (2) acquires the exclusive lock before checking for the stale pvtData
// (2) acquire a lock on oldBlockCommit
// (3) checks for stale pvtData by comparing [version, valueHash] and removes stale data
// (4) creates update batch from the the non-stale pvtData
// (5) update the BTL bookkeeping managed by the purge manager
// (5) update the BTL bookkeeping managed by the purge manager and prepare expiring keys.
// (6) commit the non-stale pvt data to the stateDB
// This function assumes that the passed input contains only transactions that had been
// marked "Valid". In the current design, this assumption holds true as we store
Expand All @@ -141,10 +142,13 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
return err
}

// (2) acquire an exclusive lock on the stateDB as we cannot allow any regular block
// commit to happen while validating and committing the pvtData of old blocks
txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
// (2) acquire a lock on oldBlockCommit. If the regular block commit has already
// acquired this lock, commit of old blocks' pvtData cannot proceed until the lock
// is released. This is required as the PrepareForExpiringKeys() used in step (5)
// of this function might affect the result of DeleteExpiredAndUpdateBookkeeping()
// in Commit()
txmgr.oldBlockCommit.Lock()
defer txmgr.oldBlockCommit.Unlock()

// (3) remove the pvt data which does not matches the hashed
// value stored in the public state
Expand All @@ -153,10 +157,24 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
}

// (4) create the update batch from the uniquePvtData
_ = uniquePvtData.transformToUpdateBatch()
batch := uniquePvtData.transformToUpdateBatch()

// (5) update booking in the purge manager and prepare expiring keys.
// Though the expiring keys would have been loaded in memory during last
// PrepareExpiringKeys from Commit but we rerun this here because,
// RemoveStaleAndCommitPvtDataOfOldBlocks may have added new data which might be
// eligible for expiry during the next regular block commit.
txmgr.pvtdataPurgeMgr.UpdateBookkeepingForPvtDataOfOldBlocks(batch.PvtUpdates)
nextBlockNumToBeCommitted, err := txmgr.getNextBlockNumberToBeCommitted()
if err != nil {
return err
}
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(nextBlockNumToBeCommitted)

// (5) TODO: update booking in the purge manager
// (6) TODO: commit the pvt data of old blocks to the sateDB
// (6) commit the pvt data to the stateDB
if err := txmgr.db.ApplyPrivacyAwareUpdates(batch, nil); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -333,6 +351,16 @@ func checkIfPvtWriteIsStale(hashedKey *privacyenabledstate.HashedCompositeKey,
return true, nil
}

// getNextBlockNumberToBeCommittedreturn the last committed block number + 1
func (txmgr *LockBasedTxMgr) getNextBlockNumberToBeCommitted() (uint64, error) {
lastCommittedBlk, err := txmgr.db.GetLatestSavePoint()
if err != nil {
return 0, err
}

return lastCommittedBlk.BlockNum + 1, nil
}

func (uniquePvtData uniquePvtDataMap) transformToUpdateBatch() *privacyenabledstate.UpdateBatch {
batch := privacyenabledstate.NewUpdateBatch()
for hashedCompositeKey, pvtWrite := range uniquePvtData {
Expand Down Expand Up @@ -390,6 +418,16 @@ func (txmgr *LockBasedTxMgr) Shutdown() {

// Commit implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Commit() error {
// we need to acquire a lock on oldBlockCommit. This is required because
// the DeleteExpiredAndUpdateBookkeeping() would perform incorrect operation if
// PrepareForExpiringKeys() in RemoveStaleAndCommitPvtDataOfOldBlocks() is allowed to
// execute parallely. RemoveStaleAndCommitPvtDataOfOldBlocks computes the update
// batch based on the current state and if we allow regular block commits at the
// same time, the former may overwrite the newer versions of the data and we may
// end up with an incorrect update batch.
txmgr.oldBlockCommit.Lock()
defer txmgr.oldBlockCommit.Unlock()

// When using the purge manager for the first block commit after peer start, the asynchronous function
// 'PrepareForExpiringKeys' is invoked in-line. However, for the subsequent blocks commits, this function is invoked
// in advance for the next block
Expand All @@ -413,24 +451,19 @@ func (txmgr *LockBasedTxMgr) Commit() error {
return err
}

// EXCLUSIVE LOCK STARTS
commitHeight := version.NewHeight(txmgr.current.blockNum(), txmgr.current.maxTxNumber())
txmgr.commitRWLock.Lock()
logger.Debugf("Write lock acquired for committing updates to state database")
commitHeight := version.NewHeight(txmgr.current.blockNum(), txmgr.current.maxTxNumber())
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
txmgr.commitRWLock.Unlock()
return err
}
// only during the exclusive lock duration, we should clear the cache as the cache is being
// used by the old pvtData committer as well
txmgr.clearCache() // note that we should clear the cache before calling
// PrepareForExpiringKeys as it uses the cache as well. To be precise,
// we should not clear the cache until PrepareForExpiringKeys completes
// the task.
logger.Debugf("cleared cached")
txmgr.commitRWLock.Unlock()
// EXCLUSIVE LOCK ENDS
logger.Debugf("Updates committed to state database")
// only while holding a lock on oldBlockCommit, we should clear the cache as the
// cache is being used by the old pvtData committer to load the version of
// hashedKeys. Also, note that the PrepareForExpiringKeys uses the cache.
txmgr.clearCache()
logger.Debugf("Updates committed to state database and the write lock is released")

// purge manager should be called (in this call the purge mgr removes the expiry entries from schedules) after committing to statedb
if err := txmgr.pvtdataPurgeMgr.BlockCommitDone(); err != nil {
Expand Down
220 changes: 220 additions & 0 deletions core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,113 @@ func producePvtdata(t *testing.T, txNum uint64, nsColls []string, keys []string,
}
}

func TestRemoveStaleAndCommitPvtDataOfOldBlocks(t *testing.T) {
for _, testEnv := range testEnvs {
t.Logf("Running test for TestEnv = %s", testEnv.getName())
testLedgerID := "testvalidationandcommitofoldpvtdata"
testEnv.init(t, testLedgerID, nil)
testValidationAndCommitOfOldPvtData(t, testEnv)
testEnv.cleanup()
}
}

func testValidationAndCommitOfOldPvtData(t *testing.T, env testEnv) {
ledgerid := "testvalidationandcommitofoldpvtdata"
btlPolicy := btltestutil.SampleBTLPolicy(
map[[2]string]uint64{
{"ns1", "coll1"}: 0,
{"ns1", "coll2"}: 0,
},
)
env.init(t, ledgerid, btlPolicy)
txMgr := env.getTxMgr()
populateCollConfigForTest(t, txMgr.(*LockBasedTxMgr),
[]collConfigkey{
{"ns1", "coll1"},
{"ns1", "coll2"},
},
version.NewHeight(1, 1),
)

db := env.getVDB()
updateBatch := privacyenabledstate.NewUpdateBatch()
// all pvt data are missing
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key1"), util.ComputeStringHash("value1"), version.NewHeight(1, 1)) // E1
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key2"), util.ComputeStringHash("value2"), version.NewHeight(1, 2)) // E2
updateBatch.HashUpdates.Put("ns1", "coll2", util.ComputeStringHash("key3"), util.ComputeStringHash("value3"), version.NewHeight(1, 2)) // E3
updateBatch.HashUpdates.Put("ns1", "coll2", util.ComputeStringHash("key4"), util.ComputeStringHash("value4"), version.NewHeight(1, 3)) // E4
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(1, 2))

updateBatch = privacyenabledstate.NewUpdateBatch()
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key1"), util.ComputeStringHash("new-value1"), version.NewHeight(2, 1)) // E1 is updated
updateBatch.HashUpdates.Delete("ns1", "coll1", util.ComputeStringHash("key2"), version.NewHeight(2, 2)) // E2 is being deleted
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(2, 2))

updateBatch = privacyenabledstate.NewUpdateBatch()
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key1"), util.ComputeStringHash("another-new-value1"), version.NewHeight(3, 1)) // E1 is again updated
updateBatch.HashUpdates.Put("ns1", "coll2", util.ComputeStringHash("key3"), util.ComputeStringHash("value3"), version.NewHeight(3, 2)) // E3 gets only metadata update
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(3, 2))

v1 := []byte("value1")
// ns1-coll1-key1 should be rejected as it is updated in the future by Blk2Tx1
pvtDataBlk1Tx1 := producePvtdata(t, 1, []string{"ns1:coll1"}, []string{"key1"}, [][]byte{v1})
// ns1-coll2-key3 should be accepted but ns1-coll1-key2 as it is updated in the future by Blk2Tx2
v2 := []byte("value2")
v3 := []byte("value3")
pvtDataBlk1Tx2 := producePvtdata(t, 2, []string{"ns1:coll1", "ns1:coll2"}, []string{"key2", "key3"}, [][]byte{v2, v3})
// ns1-coll2-key4 should be accepted
v4 := []byte("value4")
pvtDataBlk1Tx3 := producePvtdata(t, 3, []string{"ns1:coll2"}, []string{"key4"}, [][]byte{v4})

nv1 := []byte("new-value1")
// ns1-coll1-key1 should be rejected as it is updated in the future by Blk3Tx1
pvtDataBlk2Tx1 := producePvtdata(t, 1, []string{"ns1:coll1"}, []string{"key1"}, [][]byte{nv1})
// ns1-coll1-key2 should be accepted -- a delete operation
pvtDataBlk2Tx2 := producePvtdata(t, 2, []string{"ns1:coll1"}, []string{"key2"}, [][]byte{nil})

anv1 := []byte("another-new-value1")
// ns1-coll1-key1 should be accepted
pvtDataBlk3Tx1 := producePvtdata(t, 1, []string{"ns1:coll1"}, []string{"key1"}, [][]byte{anv1})
// ns1-coll2-key3 should be accepted -- assume that only metadata is being updated
pvtDataBlk3Tx2 := producePvtdata(t, 2, []string{"ns1:coll2"}, []string{"key3"}, [][]byte{v3})

blocksPvtData := map[uint64][]*ledger.TxPvtData{
1: {
pvtDataBlk1Tx1,
pvtDataBlk1Tx2,
pvtDataBlk1Tx3,
},
2: {
pvtDataBlk2Tx1,
pvtDataBlk2Tx2,
},
3: {
pvtDataBlk3Tx1,
pvtDataBlk3Tx2,
},
}

err := txMgr.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData)
assert.NoError(t, err)

vv, err := db.GetPrivateData("ns1", "coll1", "key1")
assert.NoError(t, err)
assert.Equal(t, anv1, vv.Value) // last updated value

vv, err = db.GetPrivateData("ns1", "coll1", "key2")
assert.NoError(t, err)
assert.Equal(t, nil, nil) // deleted

vv, err = db.GetPrivateData("ns1", "coll2", "key3")
assert.NoError(t, err)
assert.Equal(t, v3, vv.Value)
assert.Equal(t, version.NewHeight(3, 2), vv.Version) // though we passed with version {1,2}, we should get {3,2} due to metadata update

vv, err = db.GetPrivateData("ns1", "coll2", "key4")
assert.NoError(t, err)
assert.Equal(t, v4, vv.Value)
}

func TestTxSimulatorMissingPvtdata(t *testing.T) {
testEnv := testEnvs[0]
testEnv.init(t, "TestTxSimulatorUnsupportedTxQueries", nil)
Expand Down Expand Up @@ -1132,6 +1239,119 @@ func TestTxSimulatorMissingPvtdata(t *testing.T) {
assert.Nil(t, val)
}

func TestRemoveStaleAndCommitPvtDataOfOldBlocksWithExpiry(t *testing.T) {
ledgerid := "TestTxSimulatorMissingPvtdataExpiry"
btlPolicy := btltestutil.SampleBTLPolicy(
map[[2]string]uint64{
{"ns", "coll"}: 1,
},
)
testEnv := testEnvs[0]
testEnv.init(t, ledgerid, btlPolicy)
defer testEnv.cleanup()

txMgr := testEnv.getTxMgr()
populateCollConfigForTest(t, txMgr.(*LockBasedTxMgr),
[]collConfigkey{
{"ns", "coll"},
},
version.NewHeight(1, 1),
)

viper.Set(fmt.Sprintf("ledger.pvtdata.btlpolicy.%s.ns.coll", ledgerid), 1)
bg, _ := testutil.NewBlockGenerator(t, ledgerid, false)

// storing hashed data but the pvt key is missing
// stored pvt key would get expired and purged while committing block 3
blkAndPvtdata := prepareNextBlockForTest(t, txMgr, bg, "txid-1",
map[string]string{"pubkey1": "pub-value1"}, map[string]string{"pvtkey1": "pvt-value1"}, true)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
// committing block 1
assert.NoError(t, txMgr.Commit())

// pvt data should not exist
simulator, _ := txMgr.NewTxSimulator("tx-tmp")
pvtval, err := simulator.GetPrivateData("ns", "coll", "pvtkey1")
_, ok := err.(*txmgr.ErrPvtdataNotAvailable)
assert.Equal(t, ok, true)
assert.Nil(t, pvtval)
simulator.Done()

// committing pvt data of block 1
v1 := []byte("pvt-value1")
pvtDataBlk1Tx1 := producePvtdata(t, 1, []string{"ns:coll"}, []string{"pvtkey1"}, [][]byte{v1})
blocksPvtData := map[uint64][]*ledger.TxPvtData{
1: {
pvtDataBlk1Tx1,
},
}
err = txMgr.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData)
assert.NoError(t, err)

// pvt data should exist
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey1")
assert.Nil(t, err)
assert.Equal(t, pvtval, v1)
simulator.Done()

// storing hashed data but the pvt key is missing
// stored pvt key would get expired and purged while committing block 4
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-2",
map[string]string{"pubkey2": "pub-value2"}, map[string]string{"pvtkey2": "pvt-value2"}, true)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
// committing block 2
assert.NoError(t, txMgr.Commit())

// pvt data should not exist
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey2")
_, ok = err.(*txmgr.ErrPvtdataNotAvailable)
assert.Equal(t, ok, true)
assert.Nil(t, pvtval)
simulator.Done()

blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-3",
map[string]string{"pubkey3": "pub-value3"}, nil, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
// committing block 3
assert.NoError(t, txMgr.Commit())

// prepareForExpiringKey must have selected the pvtkey2 as it would
// get expired during next block commit

// committing pvt data of block 2
v2 := []byte("pvt-value2")
pvtDataBlk2Tx1 := producePvtdata(t, 1, []string{"ns:coll"}, []string{"pvtkey2"}, [][]byte{v2})
blocksPvtData = map[uint64][]*ledger.TxPvtData{
2: {
pvtDataBlk2Tx1,
},
}

err = txMgr.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData)
assert.NoError(t, err)

// pvt data should exist
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey2")
assert.Nil(t, err)
assert.Equal(t, pvtval, v2)
simulator.Done()

blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-4",
map[string]string{"pubkey4": "pub-value4"}, nil, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
// committing block 4 and should purge pvtkey2
assert.NoError(t, txMgr.Commit())

simulator, _ = txMgr.NewTxSimulator("tx-tmp")
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey2")
assert.NoError(t, err)
assert.Nil(t, pvtval)
simulator.Done()
}

func TestDeleteOnCursor(t *testing.T) {
cID := "cid"
env := testEnvs[0]
Expand Down

0 comments on commit bd5df09

Please sign in to comment.