diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go index 1ffd856cef5..0387af9c890 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go @@ -68,6 +68,50 @@ func NewCommonStorageDB(vdb statedb.VersionedDB, ledgerid string) (DB, error) { return &CommonStorageDB{VersionedDB: vdb}, nil } +// IsBulkOptimizable implements corresponding function in interface DB +func (s *CommonStorageDB) IsBulkOptimizable() bool { + if _, ok := s.VersionedDB.(statedb.BulkOptimizable); ok { + return true + } + return false +} + +// LoadCommittedVersionsOfPubAndHashedKeys implements corresponding function in interface DB +func (s *CommonStorageDB) LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*statedb.CompositeKey, + hashedKeys []*HashedCompositeKey) { + + bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable) + if !ok { + return + } + + // Here, hashedKeys are merged into pubKeys to get a combined set of keys for combined loading + for _, key := range hashedKeys { + ns := derivePvtDataNs(key.Namespace, key.CollectionName) + // No need to check for duplicates as hashedKeys are in separate namespace + var keyHashStr string + if !s.BytesKeySuppoted() { + keyHashStr = base64.StdEncoding.EncodeToString([]byte(key.KeyHash)) + } else { + keyHashStr = key.KeyHash + } + pubKeys = append(pubKeys, &statedb.CompositeKey{ + Namespace: ns, + Key: keyHashStr, + }) + } + + bulkOptimizable.LoadCommittedVersions(pubKeys) +} + +// ClearCommittedVersions implements corresponding function in interface DB +func (s *CommonStorageDB) ClearCommittedVersions() { + bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable) + if ok { + bulkOptimizable.ClearCachedVersions() + } +} + // GetPrivateData implements corresponding function in interface DB func (s *CommonStorageDB) GetPrivateData(namespace, collection, key string) (*statedb.VersionedValue, error) { return s.GetState(derivePvtDataNs(namespace, collection), key) @@ -83,12 +127,12 @@ func (s *CommonStorageDB) GetValueHash(namespace, collection string, keyHash []b } // GetHashedDataNsAndKeyHashStr implements corresponding function in interface DB -func (s *CommonStorageDB) GetHashedDataNsAndKeyHashStr(namespace, collection string, keyHash []byte) (string, string) { +func (s *CommonStorageDB) GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error) { keyHashStr := string(keyHash) if !s.BytesKeySuppoted() { keyHashStr = base64.StdEncoding.EncodeToString(keyHash) } - return deriveHashedDataNs(namespace, collection), keyHashStr + return s.GetVersion(deriveHashedDataNs(namespace, collection), keyHashStr) } // GetPrivateDataMultipleKeys implements corresponding function in interface DB diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go index 45f03967963..9061c517215 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/db.go @@ -22,15 +22,25 @@ type DBProvider interface { // DB extends VersionedDB interface. This interface provides additional functions for managing private data state type DB interface { statedb.VersionedDB + IsBulkOptimizable() bool + LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*statedb.CompositeKey, hashedKeys []*HashedCompositeKey) + ClearCommittedVersions() GetPrivateData(namespace, collection, key string) (*statedb.VersionedValue, error) GetValueHash(namespace, collection string, keyHash []byte) (*statedb.VersionedValue, error) - GetHashedDataNsAndKeyHashStr(namespace, collection string, keyHash []byte) (string, string) + GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error) GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([]*statedb.VersionedValue, error) GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (statedb.ResultsIterator, error) ExecuteQueryOnPrivateData(namespace, collection, query string) (statedb.ResultsIterator, error) ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error } +// HashedCompositeKey encloses Namespace, CollectionName and KeyHash components +type HashedCompositeKey struct { + Namespace string + CollectionName string + KeyHash string +} + // UpdateBatch encapsulates the updates to Public, Private, and Hashed data. // This is expected to contain a consistent set of updates type UpdateBatch struct { diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go index 0ea62236832..1e16ffdf8d1 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go @@ -106,11 +106,7 @@ func testDB(t *testing.T, env TestEnv) { assert.NoError(t, err) assert.Equal(t, &statedb.VersionedValue{Value: util.ComputeStringHash("pvt_value1"), Version: version.NewHeight(1, 4)}, vv) - ns, key := db.GetHashedDataNsAndKeyHashStr("ns1", "coll1", util.ComputeStringHash("key1")) - vv, err = db.GetState(ns, key) - assert.NoError(t, err) - assert.Equal(t, &statedb.VersionedValue{Value: util.ComputeStringHash("pvt_value1"), Version: version.NewHeight(1, 4)}, vv) - committedVersion, err := db.GetVersion(ns, key) + committedVersion, err := db.GetKeyHashVersion("ns1", "coll1", util.ComputeStringHash("key1")) assert.NoError(t, err) assert.Equal(t, version.NewHeight(1, 4), committedVersion) diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index d5ad5d3452c..749aaca4cf3 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -294,7 +294,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version // STEP 1: GATHER DOCUMENT REVISION NUMBERS REQUIRED FOR THE COUCHDB BULK UPDATE // initialize a missing key list - missingKeys := []*statedb.CompositeKey{} + var missingKeys []*statedb.CompositeKey // Revision numbers are needed for couchdb updates. // vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID @@ -313,6 +313,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version if !keyFound { // Add the key to the missing key list + // As there can be no duplicates in UpdateBatch, no need check for duplicates. missingKeys = append(missingKeys, &compositeKey) } } @@ -439,11 +440,11 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version // printCompositeKeys is a convenience method to print readable log entries for arrays of pointers // to composite keys -func printCompositeKeys(keyPointers []*statedb.CompositeKey) string { +func printCompositeKeys(keys []*statedb.CompositeKey) string { compositeKeyString := []string{} - for _, keyPointer := range keyPointers { - compositeKeyString = append(compositeKeyString, "["+keyPointer.Namespace+","+keyPointer.Key+"]") + for _, key := range keys { + compositeKeyString = append(compositeKeyString, "["+key.Namespace+","+key.Key+"]") } return strings.Join(compositeKeyString, ",") } diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go index 49ca28c53b1..f9cafd3d1c1 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go @@ -11,7 +11,6 @@ import ( "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate" - "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" @@ -116,10 +115,8 @@ func (txmgr *LockBasedTxMgr) Rollback() { // clearCache empty the cache maintained by the statedb implementation func (txmgr *LockBasedTxMgr) clearCache() { - commonStorageDB, _ := txmgr.db.(*privacyenabledstate.CommonStorageDB) - bulkOptimizable, ok := commonStorageDB.VersionedDB.(statedb.BulkOptimizable) - if ok { - bulkOptimizable.ClearCachedVersions() + if txmgr.db.IsBulkOptimizable() { + txmgr.db.ClearCommittedVersions() } } diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go index fa8178a8414..3f5518d02e4 100644 --- a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go @@ -33,36 +33,50 @@ func NewValidator(db privacyenabledstate.DB) *Validator { // transaction's read set into a cache. func (v *Validator) preLoadCommittedVersionOfRSet(block *valinternal.Block) { - // Collect read set of all transactions in a given block - var readSetKeys []*statedb.CompositeKey + // Collect both public and hashed keys in read sets of all transactions in a given block + var pubKeys []*statedb.CompositeKey + var hashedKeys []*privacyenabledstate.HashedCompositeKey + + // pubKeysMap and hashedKeysMap are used to avoid duplicate entries in the + // pubKeys and hashedKeys. Though map alone can be used to collect keys in + // read sets and pass as an argument in LoadCommittedVersionOfPubAndHashedKeys(), + // array is used for better code readability. On the negative side, this approach + // might use some extra memory. + pubKeysMap := make(map[statedb.CompositeKey]interface{}) + hashedKeysMap := make(map[privacyenabledstate.HashedCompositeKey]interface{}) + for _, tx := range block.Txs { for _, nsRWSet := range tx.RWSet.NsRwSets { for _, kvRead := range nsRWSet.KvRwSet.Reads { - readSetKeys = append(readSetKeys, &statedb.CompositeKey{ + compositeKey := statedb.CompositeKey{ Namespace: nsRWSet.NameSpace, Key: kvRead.Key, - }) + } + if _, ok := pubKeysMap[compositeKey]; !ok { + pubKeysMap[compositeKey] = nil + pubKeys = append(pubKeys, &compositeKey) + } + } for _, colHashedRwSet := range nsRWSet.CollHashedRwSets { for _, kvHashedRead := range colHashedRwSet.HashedRwSet.HashedReads { - ns, key := v.db.GetHashedDataNsAndKeyHashStr(nsRWSet.NameSpace, colHashedRwSet.CollectionName, - kvHashedRead.KeyHash) - readSetKeys = append(readSetKeys, &statedb.CompositeKey{ - Namespace: ns, - Key: key, - }) + hashedCompositeKey := privacyenabledstate.HashedCompositeKey{ + Namespace: nsRWSet.NameSpace, + CollectionName: colHashedRwSet.CollectionName, + KeyHash: string(kvHashedRead.KeyHash), + } + if _, ok := hashedKeysMap[hashedCompositeKey]; !ok { + hashedKeysMap[hashedCompositeKey] = nil + hashedKeys = append(hashedKeys, &hashedCompositeKey) + } } } } } // Load committed version of all keys into a cache - if len(readSetKeys) > 0 { - commonStorageDB := v.db.(*privacyenabledstate.CommonStorageDB) - bulkOptimizable, ok := commonStorageDB.VersionedDB.(statedb.BulkOptimizable) - if ok { - bulkOptimizable.LoadCommittedVersions(readSetKeys) - } + if len(pubKeys) > 0 || len(hashedKeys) > 0 { + v.db.LoadCommittedVersionsOfPubAndHashedKeys(pubKeys, hashedKeys) } } @@ -71,9 +85,7 @@ func (v *Validator) ValidateAndPrepareBatch(block *valinternal.Block, doMVCCVali // Check whether statedb implements BulkOptimizable interface. For now, // only CouchDB implements BulkOptimizable to reduce the number of REST // API calls from peer to CouchDB instance. - commonStorageDB := v.db.(*privacyenabledstate.CommonStorageDB) - _, ok := commonStorageDB.VersionedDB.(statedb.BulkOptimizable) - if ok { + if v.db.IsBulkOptimizable() { v.preLoadCommittedVersionOfRSet(block) } @@ -250,8 +262,7 @@ func (v *Validator) validateKVReadHash(ns, coll string, kvReadHash *kvrwset.KVRe if updates.Contains(ns, coll, kvReadHash.KeyHash) { return false, nil } - ns, key := v.db.GetHashedDataNsAndKeyHashStr(ns, coll, kvReadHash.KeyHash) - committedVersion, err := v.db.GetVersion(ns, key) + committedVersion, err := v.db.GetKeyHashVersion(ns, coll, kvReadHash.KeyHash) if err != nil { return false, err }