diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index f42a562f5a1..a49c70719ed 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -191,11 +191,8 @@ func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, c //create the block with 1 transaction block := common.NewBlock(1, []byte{}) block.Data.Data = [][]byte{envBytes} - if _, _, err = lgr.RemoveInvalidTransactionsAndPrepare(block); err != nil { - return err - } //commit the block - if err := lgr.Commit(); err != nil { + if err := lgr.Commit(block); err != nil { return err } } diff --git a/core/committer/committer_impl.go b/core/committer/committer_impl.go index bd9fa5d256f..ef186181429 100644 --- a/core/committer/committer_impl.go +++ b/core/committer/committer_impl.go @@ -48,10 +48,7 @@ func NewLedgerCommitter(ledger ledger.ValidatedLedger) *LedgerCommitter { // CommitBlock commits block to into the ledger func (lc *LedgerCommitter) CommitBlock(block *common.Block) error { - if _, _, err := lc.ledger.RemoveInvalidTransactionsAndPrepare(block); err != nil { - return err - } - if err := lc.ledger.Commit(); err != nil { + if err := lc.ledger.Commit(block); err != nil { return err } return nil diff --git a/core/endorser/endorser.go b/core/endorser/endorser.go index 6ede68b9f39..fc3bfe9a89c 100644 --- a/core/endorser/endorser.go +++ b/core/endorser/endorser.go @@ -334,11 +334,7 @@ func (e *Endorser) commitTxSimulation(proposal *pb.Proposal, chainID string, sig block := common.NewBlock(1, []byte{}) block.Data.Data = [][]byte{txBytes} block.Header.DataHash = block.Data.Hash() - if _, _, err = lgr.RemoveInvalidTransactionsAndPrepare(block); err != nil { - return err - } - - if err = lgr.Commit(); err != nil { + if err = lgr.Commit(block); err != nil { return err } diff --git a/core/ledger/kvledger/example/committer.go b/core/ledger/kvledger/example/committer.go index 574bb24c22b..8dd0a87ea84 100644 --- a/core/ledger/kvledger/example/committer.go +++ b/core/ledger/kvledger/example/committer.go @@ -20,7 +20,6 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" ) // Committer a toy committer @@ -34,17 +33,10 @@ func ConstructCommitter(ledger ledger.ValidatedLedger) *Committer { } // CommitBlock commits the block -func (c *Committer) CommitBlock(rawBlock *common.Block) (*common.Block, []*pb.InvalidTransaction, error) { - var validBlock *common.Block - var invalidTxs []*pb.InvalidTransaction - var err error +func (c *Committer) CommitBlock(rawBlock *common.Block) error { logger.Debugf("Committer validating the block...") - if validBlock, invalidTxs, err = c.ledger.RemoveInvalidTransactionsAndPrepare(rawBlock); err != nil { - return nil, nil, err + if err := c.ledger.Commit(rawBlock); err != nil { + return err } - logger.Debugf("Committer committing the block...") - if err = c.ledger.Commit(); err != nil { - return nil, nil, err - } - return validBlock, invalidTxs, err + return nil } diff --git a/core/ledger/kvledger/example/main/example.go b/core/ledger/kvledger/example/main/example.go index fafce032aef..3be5a1381e1 100644 --- a/core/ledger/kvledger/example/main/example.go +++ b/core/ledger/kvledger/example/main/example.go @@ -24,8 +24,8 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/ledger/kvledger/example" "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" ) const ( @@ -97,9 +97,9 @@ func initApp() { accounts[3]: 100}) handleError(err, true) rawBlock := consenter.ConstructBlock(tx) - finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + err = committer.CommitBlock(rawBlock) handleError(err, true) - printBlocksInfo(rawBlock, finalBlock, invalidTx) + printBlocksInfo(rawBlock) } func transferFunds() { @@ -112,9 +112,9 @@ func transferFunds() { rawBlock := consenter.ConstructBlock(tx1, tx2) // act as committing peer to commit the Raw Block - finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + err = committer.CommitBlock(rawBlock) handleError(err, true) - printBlocksInfo(rawBlock, finalBlock, invalidTx) + printBlocksInfo(rawBlock) } func tryInvalidTransfer() { @@ -128,14 +128,23 @@ func tryDoubleSpend() { tx2, err := app.TransferFunds("account1", "account4", 50) handleError(err, true) rawBlock := consenter.ConstructBlock(tx1, tx2) - finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + err = committer.CommitBlock(rawBlock) handleError(err, true) - printBlocksInfo(rawBlock, finalBlock, invalidTx) + printBlocksInfo(rawBlock) } -func printBlocksInfo(rawBlock *common.Block, finalBlock *common.Block, invalidTxs []*pb.InvalidTransaction) { +func printBlocksInfo(block *common.Block) { + // Read invalid transactions filter + txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + numOfInvalid := 0 + // Count how many transaction indeed invalid + for i := 0; i < len(block.Data.Data); i++ { + if txsFltr.IsSet(uint(i)) { + numOfInvalid++ + } + } fmt.Printf("Num txs in rawBlock = [%d], num invalidTxs = [%d]\n", - len(rawBlock.Data.Data), len(invalidTxs)) + len(block.Data.Data), numOfInvalid) } func printBalances() { diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 4c6a56edb64..a22af39ceb0 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -27,12 +27,12 @@ import ( "github.com/hyperledger/fabric/core/ledger/history" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" logging "github.com/op/go-logging" - "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" "github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric/protos/peer" ) @@ -60,10 +60,9 @@ func NewConf(filesystemPath string, maxBlockfileSize int) *Conf { // KVLedger provides an implementation of `ledger.ValidatedLedger`. // This implementation provides a key-value based data model type KVLedger struct { - blockStore blkstorage.BlockStore - txtmgmt txmgr.TxMgr - historymgmt history.HistMgr - pendingBlockToCommit *common.Block + blockStore blkstorage.BlockStore + txtmgmt txmgr.TxMgr + historymgmt history.HistMgr } // NewKVLedger constructs new `KVLedger` @@ -114,10 +113,13 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { couchDBDef.Username, //enter couchDB id here couchDBDef.Password) //enter couchDB pw here } - l := &KVLedger{blockStore, txmgmt, historymgmt, nil} + + l := &KVLedger{blockStore, txmgmt, historymgmt} + if err := recoverStateDB(l); err != nil { panic(fmt.Errorf(`Error during state DB recovery:%s`, err)) } + return l, nil } @@ -135,7 +137,7 @@ func recoverStateDB(l *KVLedger) error { if savepointValue, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil { return err } - logger.Debugf("savepointValue=%d, info.Height=%d", savepointValue, info.Height) + //Checking whether the savepointValue is in sync with block storage height if savepointValue == info.Height { return nil @@ -145,11 +147,12 @@ func recoverStateDB(l *KVLedger) error { //Compute updateSet for each missing savepoint and commit to state DB for blockNumber := savepointValue + 1; blockNumber <= info.Height; blockNumber++ { - if l.pendingBlockToCommit, err = l.GetBlockByNumber(blockNumber); err != nil { + var block *common.Block + if block, err = l.GetBlockByNumber(blockNumber); err != nil { return err } logger.Debugf("Constructing updateSet for the block %d", blockNumber) - if _, _, err = l.txtmgmt.ValidateAndPrepare(l.pendingBlockToCommit, false); err != nil { + if err = l.txtmgmt.ValidateAndPrepare(block, false); err != nil { return err } logger.Debugf("Committing block %d to state database", blockNumber) @@ -157,7 +160,7 @@ func recoverStateDB(l *KVLedger) error { return err } } - l.pendingBlockToCommit = nil + return nil } @@ -208,32 +211,23 @@ func (l *KVLedger) NewQueryExecutor() (ledger.QueryExecutor, error) { return l.txtmgmt.NewQueryExecutor() } -// RemoveInvalidTransactionsAndPrepare validates all the transactions in the given block -// and returns a block that contains only valid transactions and a list of transactions that are invalid -func (l *KVLedger) RemoveInvalidTransactionsAndPrepare(block *common.Block) (*common.Block, []*pb.InvalidTransaction, error) { - var validBlock *common.Block - var invalidTxs []*pb.InvalidTransaction +// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes +func (l *KVLedger) Commit(block *common.Block) error { var err error - validBlock, invalidTxs, err = l.txtmgmt.ValidateAndPrepare(block, true) - if err == nil { - l.pendingBlockToCommit = validBlock - } - return validBlock, invalidTxs, err -} -// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes -func (l *KVLedger) Commit() error { - if l.pendingBlockToCommit == nil { - panic(fmt.Errorf(`Nothing to commit. RemoveInvalidTransactionsAndPrepare() method should have been called and should not have thrown error`)) + logger.Debugf("Validating block") + err = l.txtmgmt.ValidateAndPrepare(block, true) + if err != nil { + return err } logger.Debugf("Committing block to storage") - if err := l.blockStore.AddBlock(l.pendingBlockToCommit); err != nil { + if err = l.blockStore.AddBlock(block); err != nil { return err } logger.Debugf("Committing block to state database") - if err := l.txtmgmt.Commit(); err != nil { + if err = l.txtmgmt.Commit(); err != nil { panic(fmt.Errorf(`Error during commit to txmgr:%s`, err)) } @@ -241,21 +235,14 @@ func (l *KVLedger) Commit() error { logger.Debugf("===HISTORYDB=== Commit() will write to hisotry if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled()) if ledgerconfig.IsHistoryDBEnabled() == true { logger.Debugf("Committing transactions to history database") - if err := l.historymgmt.Commit(l.pendingBlockToCommit); err != nil { + if err := l.historymgmt.Commit(block); err != nil { panic(fmt.Errorf(`Error during commit to txthistory:%s`, err)) } } - l.pendingBlockToCommit = nil return nil } -// Rollback rollbacks the changes caused by the last invocation to method `RemoveInvalidTransactionsAndPrepare` -func (l *KVLedger) Rollback() { - l.txtmgmt.Rollback() - l.pendingBlockToCommit = nil -} - // Close closes `KVLedger` func (l *KVLedger) Close() { l.blockStore.Shutdown() diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 9114d5363f1..cc717827370 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/testutil" pb "github.com/hyperledger/fabric/protos/peer" + "github.com/stretchr/testify/assert" ) func TestKVLedgerBlockStorage(t *testing.T) { @@ -41,8 +42,7 @@ func TestKVLedgerBlockStorage(t *testing.T) { simRes, _ := simulator.GetTxSimulationResults() bg := testutil.NewBlockGenerator(t) block1 := bg.NextBlock([][]byte{simRes}, false) - ledger.RemoveInvalidTransactionsAndPrepare(block1) - ledger.Commit() + ledger.Commit(block1) bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() @@ -56,8 +56,7 @@ func TestKVLedgerBlockStorage(t *testing.T) { simulator.Done() simRes, _ = simulator.GetTxSimulationResults() block2 := bg.NextBlock([][]byte{simRes}, false) - ledger.RemoveInvalidTransactionsAndPrepare(block2) - ledger.Commit() + ledger.Commit(block2) bcInfo, _ = ledger.GetBlockchainInfo() block2Hash := block2.Header.Hash() @@ -99,9 +98,7 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { bg := testutil.NewBlockGenerator(t) block1 := bg.NextBlock([][]byte{simRes}, false) //performing validation of read and write set to find valid transactions - ledger.RemoveInvalidTransactionsAndPrepare(block1) - //writing the validated block to block storage and committing the transaction to state DB - ledger.Commit() + ledger.Commit(block1) bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() @@ -119,10 +116,11 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { //generating a block based on the simulation result block2 := bg.NextBlock([][]byte{simRes}, false) //performing validation of read and write set to find valid transactions - ledger.RemoveInvalidTransactionsAndPrepare(block2) + ledger.txtmgmt.ValidateAndPrepare(block2, true) //writing the validated block to block storage but not committing the transaction to state DB - ledger.blockStore.AddBlock(ledger.pendingBlockToCommit) + err := ledger.blockStore.AddBlock(block2) //assume that peer fails here before committing the transaction + assert.NoError(t, err) bcInfo, _ = ledger.GetBlockchainInfo() block2Hash := block2.Header.Hash() @@ -186,8 +184,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { bg := testutil.NewBlockGenerator(t) block1 := bg.NextBlock([][]byte{simRes}, false) - ledger.RemoveInvalidTransactionsAndPrepare(block1) - ledger.Commit() + ledger.Commit(block1) bcInfo, _ = ledger.GetBlockchainInfo() block1Hash := block1.Header.Hash() @@ -214,8 +211,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { simulationResults = append(simulationResults, simRes2) block2 := bg.NextBlock(simulationResults, false) - ledger.RemoveInvalidTransactionsAndPrepare(block2) - ledger.Commit() + ledger.Commit(block2) bcInfo, _ = ledger.GetBlockchainInfo() block2Hash := block2.Header.Hash() diff --git a/core/ledger/kvledger/marble_example/main/marble_example.go b/core/ledger/kvledger/marble_example/main/marble_example.go index 4eeea15c548..9932b1dbb4e 100644 --- a/core/ledger/kvledger/marble_example/main/marble_example.go +++ b/core/ledger/kvledger/marble_example/main/marble_example.go @@ -24,8 +24,8 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/ledger/kvledger/example" "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" logging "github.com/op/go-logging" ) @@ -81,9 +81,9 @@ func initApp() { tx, err := marbleApp.CreateMarble(marble) handleError(err, true) rawBlock := consenter.ConstructBlock(tx) - finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + err = committer.CommitBlock(rawBlock) handleError(err, true) - printBlocksInfo(rawBlock, finalBlock, invalidTx) + printBlocksInfo(rawBlock) } func transferMarble() { @@ -91,14 +91,23 @@ func transferMarble() { tx1, err := marbleApp.TransferMarble([]string{"marble1", "jerry"}) handleError(err, true) rawBlock := consenter.ConstructBlock(tx1) - finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + err = committer.CommitBlock(rawBlock) handleError(err, true) - printBlocksInfo(rawBlock, finalBlock, invalidTx) + printBlocksInfo(rawBlock) } -func printBlocksInfo(rawBlock *common.Block, finalBlock *common.Block, invalidTxs []*pb.InvalidTransaction) { +func printBlocksInfo(block *common.Block) { + // Read invalid transactions filter + txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + numOfInvalid := 0 + // Count how many transaction indeed invalid + for i := 0; i < len(block.Data.Data); i++ { + if txsFltr.IsSet(uint(i)) { + numOfInvalid++ + } + } fmt.Printf("Num txs in rawBlock = [%d], num invalidTxs = [%d]\n", - len(rawBlock.Data.Data), len(invalidTxs)) + len(block.Data.Data), numOfInvalid) } func handleError(err error, quit bool) { diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go index 9a8cd0baff0..4ad5b38e15f 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go @@ -19,6 +19,7 @@ package couchdbtxmgmt import ( "bytes" "encoding/json" + "errors" "sync" "github.com/golang/protobuf/proto" @@ -27,12 +28,10 @@ import ( "github.com/hyperledger/fabric/core/ledger/util/db" "github.com/op/go-logging" - "fmt" - "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" putils "github.com/hyperledger/fabric/protos/utils" ) @@ -130,7 +129,7 @@ func (txmgr *CouchDBTxMgr) NewTxSimulator() (ledger.TxSimulator, error) { } // ValidateAndPrepare implements method in interface `txmgmt.TxMgr` -func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, error) { +func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) error { if doMVCCValidation == true { logger.Debugf("===COUCHDB=== Entering CouchDBTxMgr.ValidateAndPrepare()") logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data)) @@ -138,19 +137,22 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidat logger.Debugf("New block arrived for write set computation:%#v", block) logger.Debugf("Computing write set for a block with [%d] transactions", len(block.Data.Data)) } - invalidTxs := []*pb.InvalidTransaction{} var valid bool txmgr.updateSet = newUpdateSet() txmgr.blockNum = block.Header.Number + txsFilter := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) for txIndex, envBytes := range block.Data.Data { - //TODO: Process valid txs bitmap in block.BlockMetadata.Metadata and skip - //this transaction if found invalid. + if txsFilter.IsSet(uint(txIndex)) { + // Skiping invalid transaction + logger.Debug("Skipping transaction marked as invalid, txIndex=", txIndex) + continue + } // extract actions from the envelope message respPayload, err := putils.GetActionFromEnvelope(envBytes) if err != nil { - return nil, nil, err + return err } //preparation for extracting RWSet from transaction @@ -159,7 +161,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidat // Get the Result from the Action // and then Unmarshal it into a TxReadWriteSet using custom unmarshalling if err = txRWSet.Unmarshal(respPayload.Results); err != nil { - return nil, nil, err + return err } // trace the first 2000 characters of RWSet only, in case it is huge @@ -177,7 +179,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidat } if doMVCCValidation == true { if valid, err = txmgr.validateTx(txRWSet); err != nil { - return nil, nil, err + return err } } else { valid = true @@ -185,16 +187,17 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidat if valid { if err := txmgr.addWriteSetToBatch(txRWSet, version.NewHeight(block.Header.Number, uint64(txIndex+1))); err != nil { - return nil, nil, err + return err } } else { - invalidTxs = append(invalidTxs, &pb.InvalidTransaction{ - Transaction: &pb.Transaction{ /* FIXME */ }, Cause: pb.InvalidTransaction_RWConflictDuringCommit}) + // Unset bit in byte array corresponded to the invalid transaction + txsFilter.Set(uint(txIndex)) } } + block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter.ToBytes() logger.Debugf("===COUCHDB=== Exiting CouchDBTxMgr.ValidateAndPrepare()") - return block, invalidTxs, nil + return nil } // Shutdown implements method in interface `txmgmt.TxMgr` @@ -260,7 +263,7 @@ func (txmgr *CouchDBTxMgr) Commit() error { // SaveDoc using couchdb client and use JSON format rev, err := txmgr.couchDB.SaveDoc(k, "", v.value, nil) if err != nil { - logger.Debugf("===COUCHDB=== Error during Commit(): %s\n", err) + logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error()) return err } if rev != "" { @@ -281,7 +284,7 @@ func (txmgr *CouchDBTxMgr) Commit() error { // SaveDoc using couchdb client and use attachment rev, err := txmgr.couchDB.SaveDoc(k, "", nil, attachments) if err != nil { - logger.Debugf("===COUCHDB=== Error during Commit(): %s\n", err) + logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error()) return err } if rev != "" { @@ -295,7 +298,7 @@ func (txmgr *CouchDBTxMgr) Commit() error { // Record a savepoint err := txmgr.recordSavepoint() if err != nil { - logger.Debugf("===COUCHDB=== Error during recordSavepoint: %s\n", err) + logger.Errorf("===COUCHDB=== Error during recordSavepoint: %s\n", err.Error()) return err } @@ -313,15 +316,15 @@ func (txmgr *CouchDBTxMgr) recordSavepoint() error { // ensure full commit to flush all changes until now to disk dbResponse, err := txmgr.couchDB.EnsureFullCommit() if err != nil || dbResponse.Ok != true { - logger.Debugf("====COUCHDB==== Failed to perform full commit\n") - return fmt.Errorf("Failed to perform full commit. Err: %s", err) + logger.Errorf("====COUCHDB==== Failed to perform full commit\n") + return errors.New("Failed to perform full commit") } // construct savepoint document // UpdateSeq would be useful if we want to get all db changes since a logical savepoint dbInfo, _, err := txmgr.couchDB.GetDatabaseInfo() if err != nil { - logger.Debugf("====COUCHDB==== Failed to get DB info %s\n", err) + logger.Errorf("====COUCHDB==== Failed to get DB info %s\n", err.Error()) return err } savepointDoc.BlockNum = txmgr.blockNum @@ -329,21 +332,21 @@ func (txmgr *CouchDBTxMgr) recordSavepoint() error { savepointDocJSON, err := json.Marshal(savepointDoc) if err != nil { - logger.Debugf("====COUCHDB==== Failed to create savepoint data %s\n", err) + logger.Errorf("====COUCHDB==== Failed to create savepoint data %s\n", err.Error()) return err } // SaveDoc using couchdb client and use JSON format _, err = txmgr.couchDB.SaveDoc(savepointDocID, "", savepointDocJSON, nil) if err != nil { - logger.Debugf("====CouchDB==== Failed to save the savepoint to DB %s\n", err) + logger.Errorf("====CouchDB==== Failed to save the savepoint to DB %s\n", err.Error()) } // ensure full commit to flush savepoint to disk dbResponse, err = txmgr.couchDB.EnsureFullCommit() if err != nil || dbResponse.Ok != true { - logger.Debugf("====COUCHDB==== Failed to perform full commit\n") - return fmt.Errorf("Failed to perform full commit. Err:%s", err) + logger.Errorf("====COUCHDB==== Failed to perform full commit\n") + return errors.New("Failed to perform full commit") } return nil } @@ -355,14 +358,14 @@ func (txmgr *CouchDBTxMgr) GetBlockNumFromSavepoint() (uint64, error) { savepointJSON, _, err := txmgr.couchDB.ReadDoc(savepointDocID) if err != nil { // TODO: differentiate between 404 and some other error code - logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err) + logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error()) return 0, err } savepointDoc := &couchSavepointData{} err = json.Unmarshal(savepointJSON, &savepointDoc) if err != nil { - logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err) + logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error()) return 0, err } diff --git a/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go b/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go index 8ef57a88fe4..58baac7755b 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go @@ -24,6 +24,8 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr" "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/util" + "github.com/hyperledger/fabric/protos/common" ) type testEnv interface { @@ -78,16 +80,30 @@ func newTxMgrTestHelper(t *testing.T, txMgr txmgr.TxMgr) *txMgrTestHelper { func (h *txMgrTestHelper) validateAndCommitRWSet(txRWSet []byte) { block := h.bg.NextBlock([][]byte{txRWSet}, false) - _, invalidTx, err := h.txMgr.ValidateAndPrepare(block, true) + err := h.txMgr.ValidateAndPrepare(block, true) testutil.AssertNoError(h.t, err, "") - testutil.AssertEquals(h.t, len(invalidTx), 0) + txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + invalidTxNum := 0 + for i := 0; i < len(block.Data.Data); i++ { + if txsFltr.IsSet(uint(i)) { + invalidTxNum++ + } + } + testutil.AssertEquals(h.t, invalidTxNum, 0) err = h.txMgr.Commit() testutil.AssertNoError(h.t, err, "") } func (h *txMgrTestHelper) checkRWsetInvalid(txRWSet []byte) { block := h.bg.NextBlock([][]byte{txRWSet}, false) - _, invalidTx, err := h.txMgr.ValidateAndPrepare(block, true) + err := h.txMgr.ValidateAndPrepare(block, true) testutil.AssertNoError(h.t, err, "") - testutil.AssertEquals(h.t, len(invalidTx), 1) + txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + invalidTxNum := 0 + for i := 0; i < len(block.Data.Data); i++ { + if txsFltr.IsSet(uint(i)) { + invalidTxNum++ + } + } + testutil.AssertEquals(h.t, invalidTxNum, 1) } diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go index 3494d67935f..0efd8e4f22c 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go @@ -25,7 +25,6 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/statebasedval" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" "github.com/op/go-logging" ) @@ -73,15 +72,15 @@ func (txmgr *LockBasedTxMgr) NewTxSimulator() (ledger.TxSimulator, error) { } // ValidateAndPrepare implements method in interface `txmgmt.TxMgr` -func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, error) { +func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) error { logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data)) - block, invalidTxs, batch, err := txmgr.validator.ValidateAndPrepareBatch(block, doMVCCValidation) + batch, err := txmgr.validator.ValidateAndPrepareBatch(block, doMVCCValidation) if err != nil { - return nil, nil, err + return err } txmgr.currentBlock = block txmgr.batch = batch - return block, invalidTxs, err + return err } // Shutdown implements method in interface `txmgmt.TxMgr` diff --git a/core/ledger/kvledger/txmgmt/txmgr/txmgr.go b/core/ledger/kvledger/txmgmt/txmgr/txmgr.go index 1d36cbbb903..7c67c5af4b2 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/txmgr.go +++ b/core/ledger/kvledger/txmgmt/txmgr/txmgr.go @@ -19,14 +19,13 @@ package txmgr import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" ) // TxMgr - an interface that a transaction manager should implement type TxMgr interface { NewQueryExecutor() (ledger.QueryExecutor, error) NewTxSimulator() (ledger.TxSimulator, error) - ValidateAndPrepare(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, error) + ValidateAndPrepare(block *common.Block, doMVCCValidation bool) error GetBlockNumFromSavepoint() (uint64, error) Commit() error Rollback() diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go index 83ae56ce3fb..53e01f966b4 100644 --- a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go @@ -20,8 +20,8 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" putils "github.com/hyperledger/fabric/protos/utils" logging "github.com/op/go-logging" ) @@ -40,18 +40,23 @@ func NewValidator(db statedb.VersionedDB) *Validator { } // ValidateAndPrepareBatch implements method in Validator interface -func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) ( - *common.Block, []*pb.InvalidTransaction, *statedb.UpdateBatch, error) { +func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) { logger.Debugf("New block arrived for validation:%#v, doMVCCValidation=%t", block, doMVCCValidation) - invalidTxs := []*pb.InvalidTransaction{} var valid bool updates := statedb.NewUpdateBatch() logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data)) + txsFilter := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) for txIndex, envBytes := range block.Data.Data { + if txsFilter.IsSet(uint(txIndex)) { + // Skiping invalid transaction + logger.Debug("Skipping transaction marked as invalid, txIndex=", txIndex) + continue + } + // extract actions from the envelope message respPayload, err := putils.GetActionFromEnvelope(envBytes) if err != nil { - return nil, nil, nil, err + return nil, err } //preparation for extracting RWSet from transaction @@ -60,7 +65,7 @@ func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidatio // Get the Result from the Action // and then Unmarshal it into a TxReadWriteSet using custom unmarshalling if err = txRWSet.Unmarshal(respPayload.Results); err != nil { - return nil, nil, nil, err + return nil, err } // trace the first 2000 characters of RWSet only, in case it is huge @@ -76,18 +81,19 @@ func (v *Validator) ValidateAndPrepareBatch(block *common.Block, doMVCCValidatio if !doMVCCValidation { valid = true } else if valid, err = v.validateTx(txRWSet, updates); err != nil { - return nil, nil, nil, err + return nil, err } - //TODO add the validation info to the bitmap in the metadata of the block + if valid { committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex+1)) addWriteSetToBatch(txRWSet, committingTxHeight, updates) } else { - invalidTxs = append(invalidTxs, &pb.InvalidTransaction{ - Transaction: &pb.Transaction{ /* FIXME */ }, Cause: pb.InvalidTransaction_RWConflictDuringCommit}) + // Unset bit in byte array corresponded to the invalid transaction + txsFilter.Set(uint(txIndex)) } } - return block, invalidTxs, updates, nil + block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter.ToBytes() + return updates, nil } func addWriteSetToBatch(txRWSet *rwset.TxReadWriteSet, txHeight *version.Height, batch *statedb.UpdateBatch) { diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go index 9acb3271f2c..a97cac222db 100644 --- a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go @@ -24,6 +24,8 @@ import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/util" + "github.com/hyperledger/fabric/protos/common" ) var testDBPath = "/tmp/fabric/core/ledger/kvledger/txmgmt/validator/statebasedval" @@ -77,8 +79,16 @@ func checkValidation(t *testing.T, validator *Validator, rwsets []*rwset.RWSet, simulationResults = append(simulationResults, sr) } block := testutil.ConstructBlock(t, simulationResults, false) - block, invalidTx, _, err := validator.ValidateAndPrepareBatch(block, true) + _, err := validator.ValidateAndPrepareBatch(block, true) + txsFltr := util.NewFilterBitArrayFromBytes(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + invalidTxNum := 0 + for i := 0; i < len(block.Data.Data); i++ { + if txsFltr.IsSet(uint(i)) { + invalidTxNum++ + } + } + testutil.AssertNoError(t, err, "") - testutil.AssertEquals(t, len(invalidTx), len(invalidTxIndexes)) + testutil.AssertEquals(t, invalidTxNum, len(invalidTxIndexes)) //TODO Add the check for exact txnum that is marked invlid when bitarray is in place } diff --git a/core/ledger/kvledger/txmgmt/validator/validator.go b/core/ledger/kvledger/txmgmt/validator/validator.go index d255c63413e..30b32a6267b 100644 --- a/core/ledger/kvledger/txmgmt/validator/validator.go +++ b/core/ledger/kvledger/txmgmt/validator/validator.go @@ -19,10 +19,9 @@ package validator import ( "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" "github.com/hyperledger/fabric/protos/common" - pb "github.com/hyperledger/fabric/protos/peer" ) // Validator validates a rwset type Validator interface { - ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, *statedb.UpdateBatch, error) + ValidateAndPrepareBatch(block *common.Block, doMVCCValidation bool) (*statedb.UpdateBatch, error) } diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index 5addb391a77..91d1fc393de 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -57,18 +57,12 @@ type ValidatedLedger interface { // A client can obtain more than one 'TxSimulator's for parallel execution. // Any snapshoting/synchronization should be performed at the implementation level if required NewTxSimulator() (TxSimulator, error) - // NewQueryExecuter gives handle to a query executer. + // NewQueryExecuter gives handle to a query executor. // A client can obtain more than one 'QueryExecutor's for parallel execution. // Any synchronization should be performed at the implementation level if required NewQueryExecutor() (QueryExecutor, error) - // RemoveInvalidTransactions validates all the transactions in the given block - // and returns a block that contains only valid transactions and a list of transactions that are invalid - RemoveInvalidTransactionsAndPrepare(block *common.Block) (*common.Block, []*pb.InvalidTransaction, error) - // Commit commits the changes prepared in the method RemoveInvalidTransactionsAndPrepare. - // Commits both the valid block and related state changes - Commit() error - // Rollback rollbacks the changes prepared in the method RemoveInvalidTransactionsAndPrepare - Rollback() + // Commits block into the ledger + Commit(block *common.Block) error } // QueryExecutor executes the queries