Skip to content

Commit

Permalink
Merge "[FAB-6074] rm endorserid from tstore APIs"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Sep 14, 2017
2 parents b29c935 + 9c58f13 commit 40e41a5
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 134 deletions.
70 changes: 22 additions & 48 deletions core/transientstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hyperledger/fabric/protos/ledger/rwset"

"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
"github.com/syndtr/goleveldb/leveldb/iterator"
Expand All @@ -38,13 +39,10 @@ type StoreProvider interface {
// the permanent storage or the pruning of some data items is enforced by the policy
type Store interface {
// Persist stores the private read-write set of a transaction in the transient store
Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorsers (via Gossip)
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error)
// GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation
// performed by the peer itself
GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error)
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*RwsetScanner, error)
// Purge removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
Expand All @@ -56,7 +54,6 @@ type Store interface {

// EndorserPvtSimulationResults captures the deatils of the simulation results specific to an endorser
type EndorserPvtSimulationResults struct {
EndorserID string
EndorsementBlockHeight uint64
PvtSimulationResults *rwset.TxPvtReadWriteSet
}
Expand All @@ -78,7 +75,7 @@ type store struct {
ledgerID string
}

type rwsetScanner struct {
type RwsetScanner struct {
txid string
dbItr iterator.Iterator
filter ledger.PvtNsCollFilter
Expand All @@ -102,56 +99,38 @@ func (provider *storeProvider) Close() {
}

// Persist stores the private read-write set of a transaction in the transient store
func (s *store) Persist(txid string, endorserid string,
endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error {
func (s *store) Persist(txid string, endorsementBlkHt uint64,
privateSimulationResults *rwset.TxPvtReadWriteSet) error {
dbBatch := leveldbhelper.NewUpdateBatch()

// Create compositeKey with appropriate prefix, txid, endorserid and endorsementBlkHt
compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
// Create compositeKey with appropriate prefix, txid, uuid and endorsementBlkHt
// Due to the fact that the txid may have multiple private RWSets persisted from different
// endorsers (via Gossip), we postfix an uuid with the txid to avoid collision.
uuid := util.GenerateUUID()
compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults)
if err != nil {
return err
}
dbBatch.Put(compositeKey, privateSimulationResultsBytes)

// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, endorserid & Store
// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, uuid & Store
// the compositeKey (purge index) a null byte as value.
compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, endorserid)
compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, uuid)
dbBatch.Put(compositeKey, emptyValue)

return s.db.WriteBatch(dbBatch, true)
}

// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorserids. Eventually, we may pass a set of collections name
// (filters) along with txid.
func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error) {
// RWSets persisted from different endorsers.
func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*RwsetScanner, error) {
// Construct startKey and endKey to do an range query
startKey := createTxidRangeStartKey(txid)
endKey := createTxidRangeEndKey(txid)

iter := s.db.GetIterator(startKey, endKey)
return &rwsetScanner{txid, iter, filter}, nil
}

// GetSelfSimulatedTxPvtRWSetByTxid returns the private write set generated from the simulation performed
// by the peer itself. Eventually, we may pass a set of collections name (filters) along with txid.
func (s *store) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) {
var err error
var itr *rwsetScanner
if itr, err = s.GetTxPvtRWSetByTxid(txid, nil); err != nil {
return nil, err
}
defer itr.Close()
var res *EndorserPvtSimulationResults
for {
if res, err = itr.Next(); res == nil || err != nil {
return nil, err
}
if selfSimulated(res) {
return res, nil
}
}
return &RwsetScanner{txid, iter, filter}, nil
}

// Purge removes private read-writes set generated by endorsers at block height lesser than
Expand All @@ -165,12 +144,12 @@ func (s *store) Purge(maxBlockNumToRetain uint64) error {

dbBatch := leveldbhelper.NewUpdateBatch()

// Get all txid and endorserid from above result and remove it from transient store (both
// Get all txid and uuid from above result and remove it from transient store (both
// read/write set and the corresponding index.
for iter.Next() {
dbKey := iter.Key()
txid, endorserid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
dbBatch.Delete(compositeKey)
dbBatch.Delete(dbKey)
}
Expand Down Expand Up @@ -203,13 +182,13 @@ func (s *store) Shutdown() {

// Next moves the iterator to the next key/value pair.
// It returns whether the iterator is exhausted.
func (scanner *rwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
if !scanner.dbItr.Next() {
return nil, nil
}
dbKey := scanner.dbItr.Key()
dbVal := scanner.dbItr.Value()
endorserid, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)
_, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)

txPvtRWSet := &rwset.TxPvtReadWriteSet{}
if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {
Expand All @@ -218,17 +197,12 @@ func (scanner *rwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
filteredTxPvtRWSet := pvtdatastorage.TrimPvtWSet(txPvtRWSet, scanner.filter)

return &EndorserPvtSimulationResults{
EndorserID: endorserid,
EndorsementBlockHeight: endorsementBlkHt,
PvtSimulationResults: filteredTxPvtRWSet,
}, nil
}

// Close releases resource held by the iterator
func (scanner *rwsetScanner) Close() {
func (scanner *RwsetScanner) Close() {
scanner.dbItr.Release()
}

func selfSimulated(pvtSimRes *EndorserPvtSimulationResults) bool {
return pvtSimRes.EndorserID == ""
}
40 changes: 15 additions & 25 deletions core/transientstore/store_helper.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package transientstore
Expand All @@ -31,14 +21,14 @@ var (
)

// createCompositeKeyForPvtRWSet creates a key for storing private read-write set
// in the transient store. The structure of the key is <prwsetPrefix>~txid~endorserid~endorsementBlkHt.
func createCompositeKeyForPvtRWSet(txid string, endorserid string, endorsementBlkHt uint64) []byte {
// in the transient store. The structure of the key is <prwsetPrefix>~txid~uuid~endorsementBlkHt.
func createCompositeKeyForPvtRWSet(txid string, uuid string, endorsementBlkHt uint64) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, prwsetPrefix)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, []byte(txid)...)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, []byte(endorserid)...)
compositeKey = append(compositeKey, []byte(uuid)...)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)

Expand All @@ -47,37 +37,37 @@ func createCompositeKeyForPvtRWSet(txid string, endorserid string, endorsementBl

// createCompositeKeyForPurgeIndex creates a key to index private read-write set based on
// endorsement block height such that purge based on block height can be achieved. The structure
// of the key is <purgeIndexPrefix>~endorsementBlkHt~txid~endorserid.
func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, endorserid string) []byte {
// of the key is <purgeIndexPrefix>~endorsementBlkHt~txid~uuid.
func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, uuid string) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, purgeIndexPrefix)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, []byte(txid)...)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, []byte(endorserid)...)
compositeKey = append(compositeKey, []byte(uuid)...)

return compositeKey
}

// splitCompositeKeyOfPvtRWSet splits the compositeKey (<prwsetPrefix>~txid~endorserid~endorsementBlkHt) into endorserId and endorsementBlkHt.
func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (endorserid string, endorsementBlkHt uint64) {
// splitCompositeKeyOfPvtRWSet splits the compositeKey (<prwsetPrefix>~txid~uuid~endorsementBlkHt) into endorserId and endorsementBlkHt.
func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (uuid string, endorsementBlkHt uint64) {
compositeKey = compositeKey[2:]
firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep)
secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1
endorserid = string(compositeKey[firstSepIndex+1 : secondSepIndex])
uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex])
endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:])
return endorserid, endorsementBlkHt
return uuid, endorsementBlkHt
}

// splitCompositeKeyOfPurgeIndex splits the compositeKey (<purgeIndexPrefix>~endorsementBlkHt~txid~endorserid) into txid, endorserid and endorsementBlkHt.
func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, endorserid string, endorsementBlkHt uint64) {
// splitCompositeKeyOfPurgeIndex splits the compositeKey (<purgeIndexPrefix>~endorsementBlkHt~txid~uuid) into txid, uuid and endorsementBlkHt.
func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, uuid string, endorsementBlkHt uint64) {
var n int
endorsementBlkHt, n = util.DecodeOrderPreservingVarUint64(compositeKey[2:])
splits := bytes.Split(compositeKey[n+3:], []byte{compositeKeySep})
txid = string(splits[0])
endorserid = string(splits[1])
uuid = string(splits[1])
return
}

Expand Down
Loading

0 comments on commit 40e41a5

Please sign in to comment.