Skip to content

Commit

Permalink
[FAB-1384]: Change ValidatedLedger APIs
Browse files Browse the repository at this point in the history
In order to provide support for JIRA [FAB-1038], validated
ledger should be able to receive block to commit with list
of invalid transactions to execute MVCC validation as part
of the flow described in FAB-1038.

Change-Id: Ica9c48c036ecd97a17484e4954a85d1b7abccf2a
Signed-off-by: Artem Barger <bartem@il.ibm.com>
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Dec 15, 2016
1 parent 0eadb03 commit 79aa4df
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 150 deletions.
5 changes: 1 addition & 4 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,8 @@ func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, c
//create the block with 1 transaction
block := common.NewBlock(1, []byte{})
block.Data.Data = [][]byte{envBytes}
if _, _, err = lgr.RemoveInvalidTransactionsAndPrepare(block); err != nil {
return err
}
//commit the block
if err := lgr.Commit(); err != nil {
if err := lgr.Commit(block); err != nil {
return err
}
}
Expand Down
5 changes: 1 addition & 4 deletions core/committer/committer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ func NewLedgerCommitter(ledger ledger.ValidatedLedger) *LedgerCommitter {

// CommitBlock commits block to into the ledger
func (lc *LedgerCommitter) CommitBlock(block *common.Block) error {
if _, _, err := lc.ledger.RemoveInvalidTransactionsAndPrepare(block); err != nil {
return err
}
if err := lc.ledger.Commit(); err != nil {
if err := lc.ledger.Commit(block); err != nil {
return err
}
return nil
Expand Down
6 changes: 1 addition & 5 deletions core/endorser/endorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,7 @@ func (e *Endorser) commitTxSimulation(proposal *pb.Proposal, chainID string, sig
block := common.NewBlock(1, []byte{})
block.Data.Data = [][]byte{txBytes}
block.Header.DataHash = block.Data.Hash()
if _, _, err = lgr.RemoveInvalidTransactionsAndPrepare(block); err != nil {
return err
}

if err = lgr.Commit(); err != nil {
if err = lgr.Commit(block); err != nil {
return err
}

Expand Down
16 changes: 4 additions & 12 deletions core/ledger/kvledger/example/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/hyperledger/fabric/core/ledger"

"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)

// Committer a toy committer
Expand All @@ -34,17 +33,10 @@ func ConstructCommitter(ledger ledger.ValidatedLedger) *Committer {
}

// CommitBlock commits the block
func (c *Committer) CommitBlock(rawBlock *common.Block) (*common.Block, []*pb.InvalidTransaction, error) {
var validBlock *common.Block
var invalidTxs []*pb.InvalidTransaction
var err error
func (c *Committer) CommitBlock(rawBlock *common.Block) error {
logger.Debugf("Committer validating the block...")
if validBlock, invalidTxs, err = c.ledger.RemoveInvalidTransactionsAndPrepare(rawBlock); err != nil {
return nil, nil, err
if err := c.ledger.Commit(rawBlock); err != nil {
return err
}
logger.Debugf("Committer committing the block...")
if err = c.ledger.Commit(); err != nil {
return nil, nil, err
}
return validBlock, invalidTxs, err
return nil
}
27 changes: 18 additions & 9 deletions core/ledger/kvledger/example/main/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/example"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)

const (
Expand Down Expand Up @@ -97,9 +97,9 @@ func initApp() {
accounts[3]: 100})
handleError(err, true)
rawBlock := consenter.ConstructBlock(tx)
finalBlock, invalidTx, err := committer.CommitBlock(rawBlock)
err = committer.CommitBlock(rawBlock)
handleError(err, true)
printBlocksInfo(rawBlock, finalBlock, invalidTx)
printBlocksInfo(rawBlock)
}

func transferFunds() {
Expand All @@ -112,9 +112,9 @@ func transferFunds() {
rawBlock := consenter.ConstructBlock(tx1, tx2)

// act as committing peer to commit the Raw Block
finalBlock, invalidTx, err := committer.CommitBlock(rawBlock)
err = committer.CommitBlock(rawBlock)
handleError(err, true)
printBlocksInfo(rawBlock, finalBlock, invalidTx)
printBlocksInfo(rawBlock)
}

func tryInvalidTransfer() {
Expand All @@ -128,14 +128,23 @@ func tryDoubleSpend() {
tx2, err := app.TransferFunds("account1", "account4", 50)
handleError(err, true)
rawBlock := consenter.ConstructBlock(tx1, tx2)
finalBlock, invalidTx, err := committer.CommitBlock(rawBlock)
err = committer.CommitBlock(rawBlock)
handleError(err, true)
printBlocksInfo(rawBlock, finalBlock, invalidTx)
printBlocksInfo(rawBlock)
}

func printBlocksInfo(rawBlock *common.Block, finalBlock *common.Block, invalidTxs []*pb.InvalidTransaction) {
func printBlocksInfo(block *common.Block) {
// Read invalid transactions filter
txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
numOfInvalid := 0
// Count how many transaction indeed invalid
for i := 0; i < len(block.Data.Data); i++ {
if txsFltr.IsSet(uint(i)) {
numOfInvalid++
}
}
fmt.Printf("Num txs in rawBlock = [%d], num invalidTxs = [%d]\n",
len(rawBlock.Data.Data), len(invalidTxs))
len(block.Data.Data), numOfInvalid)
}

func printBalances() {
Expand Down
57 changes: 22 additions & 35 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
"github.com/hyperledger/fabric/core/ledger/history"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"

logging "github.com/op/go-logging"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)
Expand Down Expand Up @@ -60,10 +60,9 @@ func NewConf(filesystemPath string, maxBlockfileSize int) *Conf {
// KVLedger provides an implementation of `ledger.ValidatedLedger`.
// This implementation provides a key-value based data model
type KVLedger struct {
blockStore blkstorage.BlockStore
txtmgmt txmgr.TxMgr
historymgmt history.HistMgr
pendingBlockToCommit *common.Block
blockStore blkstorage.BlockStore
txtmgmt txmgr.TxMgr
historymgmt history.HistMgr
}

// NewKVLedger constructs new `KVLedger`
Expand Down Expand Up @@ -114,10 +113,13 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
couchDBDef.Username, //enter couchDB id here
couchDBDef.Password) //enter couchDB pw here
}
l := &KVLedger{blockStore, txmgmt, historymgmt, nil}

l := &KVLedger{blockStore, txmgmt, historymgmt}

if err := recoverStateDB(l); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
}

return l, nil
}

Expand All @@ -135,7 +137,7 @@ func recoverStateDB(l *KVLedger) error {
if savepointValue, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil {
return err
}
logger.Debugf("savepointValue=%d, info.Height=%d", savepointValue, info.Height)

//Checking whether the savepointValue is in sync with block storage height
if savepointValue == info.Height {
return nil
Expand All @@ -145,19 +147,20 @@ func recoverStateDB(l *KVLedger) error {

//Compute updateSet for each missing savepoint and commit to state DB
for blockNumber := savepointValue + 1; blockNumber <= info.Height; blockNumber++ {
if l.pendingBlockToCommit, err = l.GetBlockByNumber(blockNumber); err != nil {
var block *common.Block
if block, err = l.GetBlockByNumber(blockNumber); err != nil {
return err
}
logger.Debugf("Constructing updateSet for the block %d", blockNumber)
if _, _, err = l.txtmgmt.ValidateAndPrepare(l.pendingBlockToCommit, false); err != nil {
if err = l.txtmgmt.ValidateAndPrepare(block, false); err != nil {
return err
}
logger.Debugf("Committing block %d to state database", blockNumber)
if err = l.txtmgmt.Commit(); err != nil {
return err
}
}
l.pendingBlockToCommit = nil

return nil
}

Expand Down Expand Up @@ -208,54 +211,38 @@ func (l *KVLedger) NewQueryExecutor() (ledger.QueryExecutor, error) {
return l.txtmgmt.NewQueryExecutor()
}

// RemoveInvalidTransactionsAndPrepare validates all the transactions in the given block
// and returns a block that contains only valid transactions and a list of transactions that are invalid
func (l *KVLedger) RemoveInvalidTransactionsAndPrepare(block *common.Block) (*common.Block, []*pb.InvalidTransaction, error) {
var validBlock *common.Block
var invalidTxs []*pb.InvalidTransaction
// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
func (l *KVLedger) Commit(block *common.Block) error {
var err error
validBlock, invalidTxs, err = l.txtmgmt.ValidateAndPrepare(block, true)
if err == nil {
l.pendingBlockToCommit = validBlock
}
return validBlock, invalidTxs, err
}

// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
func (l *KVLedger) Commit() error {
if l.pendingBlockToCommit == nil {
panic(fmt.Errorf(`Nothing to commit. RemoveInvalidTransactionsAndPrepare() method should have been called and should not have thrown error`))
logger.Debugf("Validating block")
err = l.txtmgmt.ValidateAndPrepare(block, true)
if err != nil {
return err
}

logger.Debugf("Committing block to storage")
if err := l.blockStore.AddBlock(l.pendingBlockToCommit); err != nil {
if err = l.blockStore.AddBlock(block); err != nil {
return err
}

logger.Debugf("Committing block to state database")
if err := l.txtmgmt.Commit(); err != nil {
if err = l.txtmgmt.Commit(); err != nil {
panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
}

//TODO future will want to run async with state db writes. History needs to wait for chain (FSBlock) to write but not the state db
logger.Debugf("===HISTORYDB=== Commit() will write to hisotry if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled())
if ledgerconfig.IsHistoryDBEnabled() == true {
logger.Debugf("Committing transactions to history database")
if err := l.historymgmt.Commit(l.pendingBlockToCommit); err != nil {
if err := l.historymgmt.Commit(block); err != nil {
panic(fmt.Errorf(`Error during commit to txthistory:%s`, err))
}
}

l.pendingBlockToCommit = nil
return nil
}

// Rollback rollbacks the changes caused by the last invocation to method `RemoveInvalidTransactionsAndPrepare`
func (l *KVLedger) Rollback() {
l.txtmgmt.Rollback()
l.pendingBlockToCommit = nil
}

// Close closes `KVLedger`
func (l *KVLedger) Close() {
l.blockStore.Shutdown()
Expand Down
22 changes: 9 additions & 13 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/testutil"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/stretchr/testify/assert"
)

func TestKVLedgerBlockStorage(t *testing.T) {
Expand All @@ -41,8 +42,7 @@ func TestKVLedgerBlockStorage(t *testing.T) {
simRes, _ := simulator.GetTxSimulationResults()
bg := testutil.NewBlockGenerator(t)
block1 := bg.NextBlock([][]byte{simRes}, false)
ledger.RemoveInvalidTransactionsAndPrepare(block1)
ledger.Commit()
ledger.Commit(block1)

bcInfo, _ = ledger.GetBlockchainInfo()
block1Hash := block1.Header.Hash()
Expand All @@ -56,8 +56,7 @@ func TestKVLedgerBlockStorage(t *testing.T) {
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
block2 := bg.NextBlock([][]byte{simRes}, false)
ledger.RemoveInvalidTransactionsAndPrepare(block2)
ledger.Commit()
ledger.Commit(block2)

bcInfo, _ = ledger.GetBlockchainInfo()
block2Hash := block2.Header.Hash()
Expand Down Expand Up @@ -99,9 +98,7 @@ func TestKVLedgerStateDBRecovery(t *testing.T) {
bg := testutil.NewBlockGenerator(t)
block1 := bg.NextBlock([][]byte{simRes}, false)
//performing validation of read and write set to find valid transactions
ledger.RemoveInvalidTransactionsAndPrepare(block1)
//writing the validated block to block storage and committing the transaction to state DB
ledger.Commit()
ledger.Commit(block1)

bcInfo, _ = ledger.GetBlockchainInfo()
block1Hash := block1.Header.Hash()
Expand All @@ -119,10 +116,11 @@ func TestKVLedgerStateDBRecovery(t *testing.T) {
//generating a block based on the simulation result
block2 := bg.NextBlock([][]byte{simRes}, false)
//performing validation of read and write set to find valid transactions
ledger.RemoveInvalidTransactionsAndPrepare(block2)
ledger.txtmgmt.ValidateAndPrepare(block2, true)
//writing the validated block to block storage but not committing the transaction to state DB
ledger.blockStore.AddBlock(ledger.pendingBlockToCommit)
err := ledger.blockStore.AddBlock(block2)
//assume that peer fails here before committing the transaction
assert.NoError(t, err)

bcInfo, _ = ledger.GetBlockchainInfo()
block2Hash := block2.Header.Hash()
Expand Down Expand Up @@ -186,8 +184,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
bg := testutil.NewBlockGenerator(t)
block1 := bg.NextBlock([][]byte{simRes}, false)

ledger.RemoveInvalidTransactionsAndPrepare(block1)
ledger.Commit()
ledger.Commit(block1)

bcInfo, _ = ledger.GetBlockchainInfo()
block1Hash := block1.Header.Hash()
Expand All @@ -214,8 +211,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulationResults = append(simulationResults, simRes2)

block2 := bg.NextBlock(simulationResults, false)
ledger.RemoveInvalidTransactionsAndPrepare(block2)
ledger.Commit()
ledger.Commit(block2)

bcInfo, _ = ledger.GetBlockchainInfo()
block2Hash := block2.Header.Hash()
Expand Down
23 changes: 16 additions & 7 deletions core/ledger/kvledger/marble_example/main/marble_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/example"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
logging "github.com/op/go-logging"
)

Expand Down Expand Up @@ -81,24 +81,33 @@ func initApp() {
tx, err := marbleApp.CreateMarble(marble)
handleError(err, true)
rawBlock := consenter.ConstructBlock(tx)
finalBlock, invalidTx, err := committer.CommitBlock(rawBlock)
err = committer.CommitBlock(rawBlock)
handleError(err, true)
printBlocksInfo(rawBlock, finalBlock, invalidTx)
printBlocksInfo(rawBlock)
}

func transferMarble() {
logger.Debugf("===COUCHDB=== Marble Example transferMarble()")
tx1, err := marbleApp.TransferMarble([]string{"marble1", "jerry"})
handleError(err, true)
rawBlock := consenter.ConstructBlock(tx1)
finalBlock, invalidTx, err := committer.CommitBlock(rawBlock)
err = committer.CommitBlock(rawBlock)
handleError(err, true)
printBlocksInfo(rawBlock, finalBlock, invalidTx)
printBlocksInfo(rawBlock)
}

func printBlocksInfo(rawBlock *common.Block, finalBlock *common.Block, invalidTxs []*pb.InvalidTransaction) {
func printBlocksInfo(block *common.Block) {
// Read invalid transactions filter
txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
numOfInvalid := 0
// Count how many transaction indeed invalid
for i := 0; i < len(block.Data.Data); i++ {
if txsFltr.IsSet(uint(i)) {
numOfInvalid++
}
}
fmt.Printf("Num txs in rawBlock = [%d], num invalidTxs = [%d]\n",
len(rawBlock.Data.Data), len(invalidTxs))
len(block.Data.Data), numOfInvalid)
}

func handleError(err error, quit bool) {
Expand Down
Loading

0 comments on commit 79aa4df

Please sign in to comment.