Skip to content

Commit

Permalink
[FAB-6265] Add PurgeByTxids() in transient store
Browse files Browse the repository at this point in the history
This CR adds a PurgeByTxids() in transient store API so that coordinator
can call this function to clean up private write set entries in the
store once the corresponding transactions got committed in the ledger.

Transient store pruning using PurgeByTxids() will be done as soon as
the private block is persisted in the private block storage. If any peer
request for private simulation results for certain txids, gossip will
retrieve the data from private block storage if not found in transient
store.

Change-Id: I79156752e339f38542dec6759c24f8ca512c8fb1
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Sep 26, 2017
1 parent 3145da5 commit 31863bc
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 51 deletions.
125 changes: 102 additions & 23 deletions core/transientstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ type Store interface {
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorsers (via Gossip)
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error)
// Purge removes private read-writes set generated by endorsers at block height lesser than
// PurgeByTxids removes private read-write set of a given set of transactions from the
// transient store
PurgeByTxids(txids []string) error
// PurgeByHeight removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
Purge(maxBlockNumToRetain uint64) error
// that were generated at block height of maxBlockNumToRetain or higher. Though the private
// read-write sets stored in transient store is removed by coordinator using PurgebyTxids()
// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as
// transaction that gets endorsed may not be submitted by the client for commit)
PurgeByHeight(maxBlockNumToRetain uint64) error
// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
GetMinEndorsementBlkHt() (uint64, error)
Shutdown()
Expand Down Expand Up @@ -117,17 +123,35 @@ func (s *store) Persist(txid string, endorsementBlkHt uint64,
// Due to the fact that the txid may have multiple private RWSets persisted from different
// endorsers (via Gossip), we postfix an uuid with the txid to avoid collision.
uuid := util.GenerateUUID()
compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults)
if err != nil {
return err
}
dbBatch.Put(compositeKey, privateSimulationResultsBytes)

// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, uuid & Store
// the compositeKey (purge index) a null byte as value.
compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, uuid)
dbBatch.Put(compositeKey, emptyValue)
dbBatch.Put(compositeKeyPvtRWSet, privateSimulationResultsBytes)

// Create two index: (i) by txid, and (ii) by height

// Create compositeKey for purge index by height with appropriate prefix, endorsementBlkHt,
// txid, uuid and store the compositeKey (purge index) with a null byte as value. Note that
// the purge index is used to remove orphan entries in the transient store (which are not removed
// by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction
// that gets endorsed but not submitted by the client for commit)
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt, txid, uuid)
dbBatch.Put(compositeKeyPurgeIndexByHeight, emptyValue)

// Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,
// endorsementBlkHt and store the compositeKey (purge index) with a null byte as value.
// Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,
// we create a separate composite key with a null byte as value. The reason is that
// if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write
// set associated with the key from db. Note that this purge index is used to remove non-orphan
// entries in the transient store and is used by PurgeTxids()
// Note: We can create compositeKeyPurgeIndexByTxid by just replacing the prefix of compositeKeyPvtRWSet
// with purgeIndexByTxidPrefix. For code readability and to be expressive, we use a
// createCompositeKeyForPurgeIndexByTxid() instead.
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, endorsementBlkHt)
dbBatch.Put(compositeKeyPurgeIndexByTxid, emptyValue)

return s.db.WriteBatch(dbBatch, true)
}
Expand All @@ -143,26 +167,80 @@ func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter)
return &RwsetScanner{txid, iter, filter}, nil
}

// Purge removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
func (s *store) Purge(maxBlockNumToRetain uint64) error {
// PurgeByTxids removes private read-write set of a given set of transactions from the
// transient store. PurgeByTxids() is expected to be called by coordinator after
// committing a block to ledger.
func (s *store) PurgeByTxids(txids []string) error {
dbBatch := leveldbhelper.NewUpdateBatch()

for _, txid := range txids {
// Construct startKey and endKey to do an range query
startKey := createPurgeIndexByTxidRangeStartKey(txid)
endKey := createPurgeIndexByTxidRangeEndKey(txid)

iter := s.db.GetIterator(startKey, endKey)

// Get all txid and uuid from above result and remove it from transient store (both
// read/write set and the corresponding indexes.
for iter.Next() {
// For each entry, remove the private read-write set and correponding indexes

// Remove private read-write set
compositeKeyPurgeIndexByTxid := iter.Key()
// Note: We can create compositeKeyPvtRWSet by just replacing the prefix of compositeKeyPurgeIndexByTxid
// with prwsetPrefix. For code readability and to be expressive, we split and create again.
uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
dbBatch.Delete(compositeKeyPvtRWSet)

// Remove purge index -- purgeIndexByHeight
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt, txid, uuid)
dbBatch.Delete(compositeKeyPurgeIndexByHeight)

// Remove purge index -- purgeIndexByTxid
dbBatch.Delete(compositeKeyPurgeIndexByTxid)
}
iter.Release()
}
// If peer fails before/while writing the batch to golevelDB, these entries will be
// removed as per BTL policy later by PurgeByHeight()
return s.db.WriteBatch(dbBatch, true)
}

// PurgeByHeight removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, PurgeByHeight only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher. Though the private
// read-write sets stored in transient store is removed by coordinator using PurgebyTxids()
// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as
// transaction that gets endorsed may not be submitted by the client for commit)
func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error {
// Do a range query with 0 as startKey and maxBlockNumToRetain-1 as endKey
startKey := createEndorsementBlkHtRangeStartKey(0)
endKey := createEndorsementBlkHtRangeEndKey(maxBlockNumToRetain - 1)
startKey := createPurgeIndexByHeightRangeStartKey(0)
endKey := createPurgeIndexByHeightRangeEndKey(maxBlockNumToRetain - 1)
iter := s.db.GetIterator(startKey, endKey)

dbBatch := leveldbhelper.NewUpdateBatch()

// Get all txid and uuid from above result and remove it from transient store (both
// read/write set and the corresponding index.
for iter.Next() {
dbKey := iter.Key()
txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
compositeKey := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
dbBatch.Delete(compositeKey)
dbBatch.Delete(dbKey)
// For each entry, remove the private read-write set and correponding indexes

// Remove private read-write set
compositeKeyPurgeIndexByHeight := iter.Key()
txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
dbBatch.Delete(compositeKeyPvtRWSet)

// Remove purge index -- purgeIndexByTxid
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, endorsementBlkHt)
dbBatch.Delete(compositeKeyPurgeIndexByTxid)

// Remove purge index -- purgeIndexByHeight
dbBatch.Delete(compositeKeyPurgeIndexByHeight)
}
iter.Release()

return s.db.WriteBatch(dbBatch, true)
}

Expand All @@ -172,14 +250,15 @@ func (s *store) GetMinEndorsementBlkHt() (uint64, error) {
// as 0 (i.e., endorsementBlkHt) and returns the first key which denotes
// the lowest retained endorsement block height. An alternative approach
// is to explicitly store the minEndorsementBlkHt in the transientStore.
startKey := createEndorsementBlkHtRangeStartKey(0)
startKey := createPurgeIndexByHeightRangeStartKey(0)
iter := s.db.GetIterator(startKey, nil)
// Fetch the minimum endorsement block height
if iter.Next() {
dbKey := iter.Key()
_, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
_, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByHeight(dbKey)
return endorsementBlkHt, nil
}
iter.Release()
// Returning an error may not be the right thing to do here. May be
// return a bool. -1 is not possible due to unsigned int as first
// return value
Expand Down
101 changes: 79 additions & 22 deletions core/transientstore/store_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

var (
prwsetPrefix = []byte("P")[0] // key prefix for storing private read-write set in transient store.
purgeIndexPrefix = []byte("I")[0] // key prefix for storing index on private read-write set using endorsement block height.
compositeKeySep = byte(0x00)
prwsetPrefix = []byte("P")[0] // key prefix for storing private read-write set in transient store.
purgeIndexByHeightPrefix = []byte("H")[0] // key prefix for storing index on private read-write set using endorsement block height.
purgeIndexByTxidPrefix = []byte("T")[0] // key prefix for storing index on private read-write set using txid
compositeKeySep = byte(0x00)
)

// createCompositeKeyForPvtRWSet creates a key for storing private read-write set
Expand All @@ -26,6 +27,26 @@ func createCompositeKeyForPvtRWSet(txid string, uuid string, endorsementBlkHt ui
var compositeKey []byte
compositeKey = append(compositeKey, prwsetPrefix)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, createCompositeKeyWithoutPrefixForTxid(txid, uuid, endorsementBlkHt)...)

return compositeKey
}

// createCompositeKeyForPurgeIndexByTxid creates a key to index private read-write set based on
// txid such that purge based on txid can be achieved. The structure
// of the key is <purgeIndexByTxidPrefix>~txid~uuid~endorsementBlkHt.
func createCompositeKeyForPurgeIndexByTxid(txid string, uuid string, endorsementBlkHt uint64) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, purgeIndexByTxidPrefix)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, createCompositeKeyWithoutPrefixForTxid(txid, uuid, endorsementBlkHt)...)

return compositeKey
}

// createCompositeKeyWithoutPrefixForTxid creates a composite key of structure txid~uuid~endorsementBlkHt.
func createCompositeKeyWithoutPrefixForTxid(txid string, uuid string, endorsementBlkHt uint64) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, []byte(txid)...)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, []byte(uuid)...)
Expand All @@ -35,12 +56,12 @@ func createCompositeKeyForPvtRWSet(txid string, uuid string, endorsementBlkHt ui
return compositeKey
}

// createCompositeKeyForPurgeIndex creates a key to index private read-write set based on
// createCompositeKeyForPurgeIndexByHeight creates a key to index private read-write set based on
// endorsement block height such that purge based on block height can be achieved. The structure
// of the key is <purgeIndexPrefix>~endorsementBlkHt~txid~uuid.
func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, uuid string) []byte {
// of the key is <purgeIndexByHeightPrefix>~endorsementBlkHt~txid~uuid.
func createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt uint64, txid string, uuid string) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, purgeIndexPrefix)
compositeKey = append(compositeKey, purgeIndexByHeightPrefix)
compositeKey = append(compositeKey, compositeKeySep)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
compositeKey = append(compositeKey, compositeKeySep)
Expand All @@ -51,18 +72,21 @@ func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, uuid
return compositeKey
}

// splitCompositeKeyOfPvtRWSet splits the compositeKey (<prwsetPrefix>~txid~uuid~endorsementBlkHt) into endorserId and endorsementBlkHt.
// splitCompositeKeyOfPvtRWSet splits the compositeKey (<prwsetPrefix>~txid~uuid~endorsementBlkHt)
// into uuid and endorsementBlkHt.
func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (uuid string, endorsementBlkHt uint64) {
compositeKey = compositeKey[2:]
firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep)
secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1
uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex])
endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:])
return uuid, endorsementBlkHt
return splitCompositeKeyWithoutPrefixForTxid(compositeKey[2:])
}

// splitCompositeKeyOfPurgeIndex splits the compositeKey (<purgeIndexPrefix>~endorsementBlkHt~txid~uuid) into txid, uuid and endorsementBlkHt.
func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, uuid string, endorsementBlkHt uint64) {
// splitCompositeKeyOfPurgeIndexByTxid splits the compositeKey (<purgeIndexByTxidPrefix>~txid~uuid~endorsementBlkHt)
// into uuid and endorsementBlkHt.
func splitCompositeKeyOfPurgeIndexByTxid(compositeKey []byte) (uuid string, endorsementBlkHt uint64) {
return splitCompositeKeyWithoutPrefixForTxid(compositeKey[2:])
}

// splitCompositeKeyOfPurgeIndexByHeight splits the compositeKey (<purgeIndexByHeightPrefix>~endorsementBlkHt~txid~uuid)
// into txid, uuid and endorsementBlkHt.
func splitCompositeKeyOfPurgeIndexByHeight(compositeKey []byte) (txid string, uuid string, endorsementBlkHt uint64) {
var n int
endorsementBlkHt, n = util.DecodeOrderPreservingVarUint64(compositeKey[2:])
splits := bytes.Split(compositeKey[n+3:], []byte{compositeKeySep})
Expand All @@ -71,6 +95,17 @@ func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, uuid strin
return
}

// splitCompositeKeyWithoutPrefixForTxid splits the composite key txid~uuid~endorsementBlkHt into
// uuid and endorsementBlkHt
func splitCompositeKeyWithoutPrefixForTxid(compositeKey []byte) (uuid string, endorsementBlkHt uint64) {
// skip txid as all functions which requires split of composite key already has it
firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep)
secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1
uuid = string(compositeKey[firstSepIndex+1 : secondSepIndex])
endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:])
return
}

// createTxidRangeStartKey returns a startKey to do a range query on transient store using txid
func createTxidRangeStartKey(txid string) []byte {
var startKey []byte
Expand All @@ -91,28 +126,50 @@ func createTxidRangeEndKey(txid string) []byte {
return endKey
}

// createEndorsementBlkHtRangeStartKey returns a startKey to do a range query on index stored in transient store
// createPurgeIndexByHeightRangeStartKey returns a startKey to do a range query on index stored in transient store
// using endorsementBlkHt
func createEndorsementBlkHtRangeStartKey(endorsementBlkHt uint64) []byte {
func createPurgeIndexByHeightRangeStartKey(endorsementBlkHt uint64) []byte {
var startKey []byte
startKey = append(startKey, purgeIndexPrefix)
startKey = append(startKey, purgeIndexByHeightPrefix)
startKey = append(startKey, compositeKeySep)
startKey = append(startKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
startKey = append(startKey, compositeKeySep)
return startKey
}

// createEndorsementBlkHtRangeStartKey returns a endKey to do a range query on index stored in transient store
// createPurgeIndexByHeightRangeStartKey returns a endKey to do a range query on index stored in transient store
// using endorsementBlkHt
func createEndorsementBlkHtRangeEndKey(endorsementBlkHt uint64) []byte {
func createPurgeIndexByHeightRangeEndKey(endorsementBlkHt uint64) []byte {
var endKey []byte
endKey = append(endKey, purgeIndexPrefix)
endKey = append(endKey, purgeIndexByHeightPrefix)
endKey = append(endKey, compositeKeySep)
endKey = append(endKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
endKey = append(endKey, byte(0xff))
return endKey
}

// createPurgeIndexByTxidRangeStartKey returns a startKey to do a range query on index stored in transient store
// using txid
func createPurgeIndexByTxidRangeStartKey(txid string) []byte {
var startKey []byte
startKey = append(startKey, purgeIndexByTxidPrefix)
startKey = append(startKey, compositeKeySep)
startKey = append(startKey, []byte(txid)...)
startKey = append(startKey, compositeKeySep)
return startKey
}

// createPurgeIndexByTxidRangeStartKey returns a endKey to do a range query on index stored in transient store
// using txid
func createPurgeIndexByTxidRangeEndKey(txid string) []byte {
var endKey []byte
endKey = append(endKey, purgeIndexByTxidPrefix)
endKey = append(endKey, compositeKeySep)
endKey = append(endKey, []byte(txid)...)
endKey = append(endKey, byte(0xff))
return endKey
}

// GetTransientStorePath returns the filesystem path for temporarily storing the private rwset
func GetTransientStorePath() string {
sysPath := config.GetPath("peer.fileSystemPath")
Expand Down
Loading

0 comments on commit 31863bc

Please sign in to comment.