Skip to content

Commit

Permalink
Merge "[FAB-6011] Implement BulkOptimizable in DB intf"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Sep 11, 2017
2 parents 833b24a + 3740fbc commit bd23071
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,50 @@ func NewCommonStorageDB(vdb statedb.VersionedDB, ledgerid string) (DB, error) {
return &CommonStorageDB{VersionedDB: vdb}, nil
}

// IsBulkOptimizable implements corresponding function in interface DB
func (s *CommonStorageDB) IsBulkOptimizable() bool {
if _, ok := s.VersionedDB.(statedb.BulkOptimizable); ok {
return true
}
return false
}

// LoadCommittedVersionsOfPubAndHashedKeys implements corresponding function in interface DB
func (s *CommonStorageDB) LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*statedb.CompositeKey,
hashedKeys []*HashedCompositeKey) {

bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if !ok {
return
}

// Here, hashedKeys are merged into pubKeys to get a combined set of keys for combined loading
for _, key := range hashedKeys {
ns := derivePvtDataNs(key.Namespace, key.CollectionName)
// No need to check for duplicates as hashedKeys are in separate namespace
var keyHashStr string
if !s.BytesKeySuppoted() {
keyHashStr = base64.StdEncoding.EncodeToString([]byte(key.KeyHash))
} else {
keyHashStr = key.KeyHash
}
pubKeys = append(pubKeys, &statedb.CompositeKey{
Namespace: ns,
Key: keyHashStr,
})
}

bulkOptimizable.LoadCommittedVersions(pubKeys)
}

// ClearCommittedVersions implements corresponding function in interface DB
func (s *CommonStorageDB) ClearCommittedVersions() {
bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if ok {
bulkOptimizable.ClearCachedVersions()
}
}

// GetPrivateData implements corresponding function in interface DB
func (s *CommonStorageDB) GetPrivateData(namespace, collection, key string) (*statedb.VersionedValue, error) {
return s.GetState(derivePvtDataNs(namespace, collection), key)
Expand All @@ -83,12 +127,12 @@ func (s *CommonStorageDB) GetValueHash(namespace, collection string, keyHash []b
}

// GetHashedDataNsAndKeyHashStr implements corresponding function in interface DB
func (s *CommonStorageDB) GetHashedDataNsAndKeyHashStr(namespace, collection string, keyHash []byte) (string, string) {
func (s *CommonStorageDB) GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error) {
keyHashStr := string(keyHash)
if !s.BytesKeySuppoted() {
keyHashStr = base64.StdEncoding.EncodeToString(keyHash)
}
return deriveHashedDataNs(namespace, collection), keyHashStr
return s.GetVersion(deriveHashedDataNs(namespace, collection), keyHashStr)
}

// GetPrivateDataMultipleKeys implements corresponding function in interface DB
Expand Down
12 changes: 11 additions & 1 deletion core/ledger/kvledger/txmgmt/privacyenabledstate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ type DBProvider interface {
// DB extends VersionedDB interface. This interface provides additional functions for managing private data state
type DB interface {
statedb.VersionedDB
IsBulkOptimizable() bool
LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*statedb.CompositeKey, hashedKeys []*HashedCompositeKey)
ClearCommittedVersions()
GetPrivateData(namespace, collection, key string) (*statedb.VersionedValue, error)
GetValueHash(namespace, collection string, keyHash []byte) (*statedb.VersionedValue, error)
GetHashedDataNsAndKeyHashStr(namespace, collection string, keyHash []byte) (string, string)
GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error)
GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([]*statedb.VersionedValue, error)
GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (statedb.ResultsIterator, error)
ExecuteQueryOnPrivateData(namespace, collection, query string) (statedb.ResultsIterator, error)
ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error
}

// HashedCompositeKey encloses Namespace, CollectionName and KeyHash components
type HashedCompositeKey struct {
Namespace string
CollectionName string
KeyHash string
}

// UpdateBatch encapsulates the updates to Public, Private, and Hashed data.
// This is expected to contain a consistent set of updates
type UpdateBatch struct {
Expand Down
6 changes: 1 addition & 5 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,7 @@ func testDB(t *testing.T, env TestEnv) {
assert.NoError(t, err)
assert.Equal(t, &statedb.VersionedValue{Value: util.ComputeStringHash("pvt_value1"), Version: version.NewHeight(1, 4)}, vv)

ns, key := db.GetHashedDataNsAndKeyHashStr("ns1", "coll1", util.ComputeStringHash("key1"))
vv, err = db.GetState(ns, key)
assert.NoError(t, err)
assert.Equal(t, &statedb.VersionedValue{Value: util.ComputeStringHash("pvt_value1"), Version: version.NewHeight(1, 4)}, vv)
committedVersion, err := db.GetVersion(ns, key)
committedVersion, err := db.GetKeyHashVersion("ns1", "coll1", util.ComputeStringHash("key1"))
assert.NoError(t, err)
assert.Equal(t, version.NewHeight(1, 4), committedVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
// STEP 1: GATHER DOCUMENT REVISION NUMBERS REQUIRED FOR THE COUCHDB BULK UPDATE

// initialize a missing key list
missingKeys := []*statedb.CompositeKey{}
var missingKeys []*statedb.CompositeKey

// Revision numbers are needed for couchdb updates.
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
Expand All @@ -313,6 +313,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version

if !keyFound {
// Add the key to the missing key list
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
missingKeys = append(missingKeys, &compositeKey)
}
}
Expand Down Expand Up @@ -439,11 +440,11 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version

// printCompositeKeys is a convenience method to print readable log entries for arrays of pointers
// to composite keys
func printCompositeKeys(keyPointers []*statedb.CompositeKey) string {
func printCompositeKeys(keys []*statedb.CompositeKey) string {

compositeKeyString := []string{}
for _, keyPointer := range keyPointers {
compositeKeyString = append(compositeKeyString, "["+keyPointer.Namespace+","+keyPointer.Key+"]")
for _, key := range keys {
compositeKeyString = append(compositeKeyString, "["+key.Namespace+","+key.Key+"]")
}
return strings.Join(compositeKeyString, ",")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
Expand Down Expand Up @@ -116,10 +115,8 @@ func (txmgr *LockBasedTxMgr) Rollback() {

// clearCache empty the cache maintained by the statedb implementation
func (txmgr *LockBasedTxMgr) clearCache() {
commonStorageDB, _ := txmgr.db.(*privacyenabledstate.CommonStorageDB)
bulkOptimizable, ok := commonStorageDB.VersionedDB.(statedb.BulkOptimizable)
if ok {
bulkOptimizable.ClearCachedVersions()
if txmgr.db.IsBulkOptimizable() {
txmgr.db.ClearCommittedVersions()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,50 @@ func NewValidator(db privacyenabledstate.DB) *Validator {
// transaction's read set into a cache.
func (v *Validator) preLoadCommittedVersionOfRSet(block *valinternal.Block) {

// Collect read set of all transactions in a given block
var readSetKeys []*statedb.CompositeKey
// Collect both public and hashed keys in read sets of all transactions in a given block
var pubKeys []*statedb.CompositeKey
var hashedKeys []*privacyenabledstate.HashedCompositeKey

// pubKeysMap and hashedKeysMap are used to avoid duplicate entries in the
// pubKeys and hashedKeys. Though map alone can be used to collect keys in
// read sets and pass as an argument in LoadCommittedVersionOfPubAndHashedKeys(),
// array is used for better code readability. On the negative side, this approach
// might use some extra memory.
pubKeysMap := make(map[statedb.CompositeKey]interface{})
hashedKeysMap := make(map[privacyenabledstate.HashedCompositeKey]interface{})

for _, tx := range block.Txs {
for _, nsRWSet := range tx.RWSet.NsRwSets {
for _, kvRead := range nsRWSet.KvRwSet.Reads {
readSetKeys = append(readSetKeys, &statedb.CompositeKey{
compositeKey := statedb.CompositeKey{
Namespace: nsRWSet.NameSpace,
Key: kvRead.Key,
})
}
if _, ok := pubKeysMap[compositeKey]; !ok {
pubKeysMap[compositeKey] = nil
pubKeys = append(pubKeys, &compositeKey)
}

}
for _, colHashedRwSet := range nsRWSet.CollHashedRwSets {
for _, kvHashedRead := range colHashedRwSet.HashedRwSet.HashedReads {
ns, key := v.db.GetHashedDataNsAndKeyHashStr(nsRWSet.NameSpace, colHashedRwSet.CollectionName,
kvHashedRead.KeyHash)
readSetKeys = append(readSetKeys, &statedb.CompositeKey{
Namespace: ns,
Key: key,
})
hashedCompositeKey := privacyenabledstate.HashedCompositeKey{
Namespace: nsRWSet.NameSpace,
CollectionName: colHashedRwSet.CollectionName,
KeyHash: string(kvHashedRead.KeyHash),
}
if _, ok := hashedKeysMap[hashedCompositeKey]; !ok {
hashedKeysMap[hashedCompositeKey] = nil
hashedKeys = append(hashedKeys, &hashedCompositeKey)
}
}
}
}
}

// Load committed version of all keys into a cache
if len(readSetKeys) > 0 {
commonStorageDB := v.db.(*privacyenabledstate.CommonStorageDB)
bulkOptimizable, ok := commonStorageDB.VersionedDB.(statedb.BulkOptimizable)
if ok {
bulkOptimizable.LoadCommittedVersions(readSetKeys)
}
if len(pubKeys) > 0 || len(hashedKeys) > 0 {
v.db.LoadCommittedVersionsOfPubAndHashedKeys(pubKeys, hashedKeys)
}
}

Expand All @@ -71,9 +85,7 @@ func (v *Validator) ValidateAndPrepareBatch(block *valinternal.Block, doMVCCVali
// Check whether statedb implements BulkOptimizable interface. For now,
// only CouchDB implements BulkOptimizable to reduce the number of REST
// API calls from peer to CouchDB instance.
commonStorageDB := v.db.(*privacyenabledstate.CommonStorageDB)
_, ok := commonStorageDB.VersionedDB.(statedb.BulkOptimizable)
if ok {
if v.db.IsBulkOptimizable() {
v.preLoadCommittedVersionOfRSet(block)
}

Expand Down Expand Up @@ -250,8 +262,7 @@ func (v *Validator) validateKVReadHash(ns, coll string, kvReadHash *kvrwset.KVRe
if updates.Contains(ns, coll, kvReadHash.KeyHash) {
return false, nil
}
ns, key := v.db.GetHashedDataNsAndKeyHashStr(ns, coll, kvReadHash.KeyHash)
committedVersion, err := v.db.GetVersion(ns, key)
committedVersion, err := v.db.GetKeyHashVersion(ns, coll, kvReadHash.KeyHash)
if err != nil {
return false, err
}
Expand Down

0 comments on commit bd23071

Please sign in to comment.