From 9b6b8fe6057360cc13dc9a676a79c87d815beabc Mon Sep 17 00:00:00 2001 From: Alessandro Sorniotti Date: Thu, 31 Aug 2017 15:43:04 +0200 Subject: [PATCH] [FAB-5932] - Parallel tx validation This change-set introduces parallelism on the committer. In particular, tx generic validation (e.g. well-formedness, signature checks) and VSCC validation are conducted in parallel for all transactions inside a block. This change-set scales the *validation* throughput with number of cores on the machine that runs the peer. Change-Id: I80c03bd713a30f3db56627c7ae8fcf6fdd0d7891 Signed-off-by: Alessandro Sorniotti --- core/chaincode/ccproviderimpl.go | 16 +- .../committer/txvalidator/txvalidator_test.go | 79 +++- core/committer/txvalidator/validator.go | 384 +++++++++++++----- core/committer/txvalidator/validator_test.go | 19 +- core/common/ccprovider/ccprovider.go | 8 +- core/mocks/ccprovider/ccprovider.go | 79 +++- core/peer/peer.go | 18 +- core/scc/sysccapi.go | 4 +- sampleconfig/core.yaml | 7 + vendor/golang.org/x/sync/LICENSE | 27 ++ vendor/golang.org/x/sync/PATENTS | 22 + .../golang.org/x/sync/semaphore/semaphore.go | 131 ++++++ vendor/vendor.json | 6 + 13 files changed, 660 insertions(+), 140 deletions(-) create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go diff --git a/core/chaincode/ccproviderimpl.go b/core/chaincode/ccproviderimpl.go index c56af84b573..7af91e1612b 100644 --- a/core/chaincode/ccproviderimpl.go +++ b/core/chaincode/ccproviderimpl.go @@ -43,7 +43,6 @@ func init() { // ccProviderImpl is an implementation of the ccprovider.ChaincodeProvider interface type ccProviderImpl struct { - txsim ledger.TxSimulator } // ccProviderContextImpl contains the state that is passed around to calls to methods of ccProviderImpl @@ -52,15 +51,15 @@ type ccProviderContextImpl struct { } // GetContext returns a context for the supplied ledger, with the appropriate tx simulator -func (c *ccProviderImpl) GetContext(ledger ledger.PeerLedger, txid string) (context.Context, error) { +func (c *ccProviderImpl) GetContext(ledger ledger.PeerLedger, txid string) (context.Context, ledger.TxSimulator, error) { var err error // get context for the chaincode execution - c.txsim, err = ledger.NewTxSimulator(txid) + txsim, err := ledger.NewTxSimulator(txid) if err != nil { - return nil, err + return nil, nil, err } - ctxt := context.WithValue(context.Background(), TXSimulatorKey, c.txsim) - return ctxt, nil + ctxt := context.WithValue(context.Background(), TXSimulatorKey, txsim) + return ctxt, txsim, nil } // GetCCContext returns an interface that encapsulates a @@ -114,8 +113,3 @@ func (c *ccProviderImpl) Stop(ctxt context.Context, cccid interface{}, spec *pb. } panic("ChaincodeSupport not initialized") } - -// ReleaseContext frees up resources held by the context -func (c *ccProviderImpl) ReleaseContext() { - c.txsim.Done() -} diff --git a/core/committer/txvalidator/txvalidator_test.go b/core/committer/txvalidator/txvalidator_test.go index 9aff89f2cdd..7a3ed1ebdb6 100644 --- a/core/committer/txvalidator/txvalidator_test.go +++ b/core/committer/txvalidator/txvalidator_test.go @@ -38,18 +38,10 @@ import ( "github.com/hyperledger/fabric/protos/utils" "github.com/spf13/viper" "github.com/stretchr/testify/assert" + "golang.org/x/sync/semaphore" ) -func TestBlockValidation(t *testing.T) { - viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest") - ledgermgmt.InitializeTestEnv() - defer ledgermgmt.CleanupTestEnv() - - gb, _ := test.MakeGenesisBlock("TestLedger") - gbHash := gb.Header.Hash() - ledger, _ := ledgermgmt.CreateLedger(gb) - defer ledger.Close() - +func testValidationWithNTXes(t *testing.T, ledger ledger2.PeerLedger, gbHash []byte, nBlocks int) { txid := util2.GenerateUUID() simulator, _ := ledger.NewTxSimulator(txid) simulator.SetState("ns1", "key1", []byte("value1")) @@ -65,18 +57,29 @@ func TestBlockValidation(t *testing.T) { } mockVsccValidator := &validator.MockVsccValidator{} - tValidator := &txValidator{&mocktxvalidator.Support{LedgerVal: ledger}, mockVsccValidator} + vcs := struct { + *mocktxvalidator.Support + *semaphore.Weighted + }{&mocktxvalidator.Support{LedgerVal: ledger}, semaphore.NewWeighted(10)} + tValidator := &txValidator{vcs, mockVsccValidator} bcInfo, _ := ledger.GetBlockchainInfo() testutil.AssertEquals(t, bcInfo, &common.BlockchainInfo{ Height: 1, CurrentBlockHash: gbHash, PreviousBlockHash: nil}) - block := testutil.ConstructBlock(t, 1, gbHash, [][]byte{pubSimulationResBytes}, true) + sr := [][]byte{} + for i := 0; i < nBlocks; i++ { + sr = append(sr, pubSimulationResBytes) + } + block := testutil.ConstructBlock(t, 1, gbHash, sr, true) tValidator.Validate(block) txsfltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) - assert.True(t, txsfltr.IsSetTo(0, peer.TxValidationCode_VALID)) + + for i := 0; i < nBlocks; i++ { + assert.True(t, txsfltr.IsSetTo(i, peer.TxValidationCode_VALID)) + } /* @@ -100,6 +103,50 @@ func TestBlockValidation(t *testing.T) { */ } +func TestBlockValidation(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + gb, _ := test.MakeGenesisBlock("TestLedger") + gbHash := gb.Header.Hash() + ledger, _ := ledgermgmt.CreateLedger(gb) + defer ledger.Close() + + // here we test validation of a block with a single tx + testValidationWithNTXes(t, ledger, gbHash, 1) +} + +func TestParallelBlockValidation(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + gb, _ := test.MakeGenesisBlock("TestLedger") + gbHash := gb.Header.Hash() + ledger, _ := ledgermgmt.CreateLedger(gb) + defer ledger.Close() + + // here we test validation of a block with 128 txes + testValidationWithNTXes(t, ledger, gbHash, 128) +} + +func TestVeryLargeParallelBlockValidation(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + gb, _ := test.MakeGenesisBlock("TestLedger") + gbHash := gb.Header.Hash() + ledger, _ := ledgermgmt.CreateLedger(gb) + defer ledger.Close() + + // here we test validation of a block with 4096 txes, + // which is larger than both the number of workers in + // the pool and the buffer in the channels + testValidationWithNTXes(t, ledger, gbHash, 4096) +} + func TestNewTxValidator_DuplicateTransactions(t *testing.T) { viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest") ledgermgmt.InitializeTestEnv() @@ -110,7 +157,11 @@ func TestNewTxValidator_DuplicateTransactions(t *testing.T) { defer ledger.Close() - tValidator := &txValidator{&mocktxvalidator.Support{LedgerVal: ledger}, &validator.MockVsccValidator{}} + vcs := struct { + *mocktxvalidator.Support + *semaphore.Weighted + }{&mocktxvalidator.Support{LedgerVal: ledger}, semaphore.NewWeighted(10)} + tValidator := &txValidator{vcs, &validator.MockVsccValidator{}} // Create simple endorsement transaction payload := &common.Payload{ diff --git a/core/committer/txvalidator/validator.go b/core/committer/txvalidator/validator.go index d6ab30c4210..0c1068f901e 100644 --- a/core/committer/txvalidator/validator.go +++ b/core/committer/txvalidator/validator.go @@ -9,6 +9,8 @@ package txvalidator import ( "fmt" + "golang.org/x/net/context" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/configtx" "github.com/hyperledger/fabric/common/flogging" @@ -32,6 +34,12 @@ import ( // Support provides all of the needed to evaluate the VSCC type Support interface { + // Acquire implements semaphore-like acquire semantics + Acquire(ctx context.Context, n int64) error + + // Release implements semaphore-like release semantics + Release(n int64) + // Ledger returns the ledger associated with this validator Ledger() ledger.PeerLedger @@ -117,6 +125,21 @@ func init() { logger = flogging.MustGetLogger("txvalidator") } +type blockValidationRequest struct { + block *common.Block + d []byte + tIdx int + v *txValidator +} + +type blockValidationResult struct { + tIdx int + validationCode peer.TxValidationCode + txsChaincodeName *sysccprovider.ChaincodeInstance + txsUpgradedChaincode *sysccprovider.ChaincodeInstance + err error +} + // NewTxValidator creates new transactions validator func NewTxValidator(support Support) Validator { // Encapsulates interface implementation @@ -132,7 +155,30 @@ func (v *txValidator) chainExists(chain string) bool { return true } +// Validate performs the validation of a block. The validation +// of each transaction in the block is performed in parallel. +// The approach is as follows: the committer thread starts the +// tx validation function in a goroutine (using a semaphore to cap +// the number of concurrent validating goroutines). The committer +// thread then reads results of validation (in orderer of completion +// of the goroutines) from the results channel. The goroutines +// perform the validation of the txs in the block and enqueue the +// validation result in the results channel. A few note-worthy facts: +// 1) to keep the approach simple, the committer thread enqueues +// all transactions in the block and then moves on to reading the +// results. +// 2) for parallel validation to work, it is important that the +// validation function does not change the state of the system. +// Otherwise the order in which validation is perform matters +// and we have to resort to sequential validation (or some locking). +// This is currently true, because the only function that affects +// state is when a config transaction is received, but they are +// guaranteed to be alone in the block. If/when this assumption +// is violated, this code must be changed. func (v *txValidator) Validate(block *common.Block) error { + var err error + var errPos int + logger.Debug("START Block Validation") defer logger.Debug("END Block Validation") // Initialize trans as valid here, then set invalidation reason code upon invalidation below @@ -141,116 +187,74 @@ func (v *txValidator) Validate(block *common.Block) error { txsChaincodeNames := make(map[int]*sysccprovider.ChaincodeInstance) // upgradedChaincodes records all the chaincodes that are upgraded in a block txsUpgradedChaincodes := make(map[int]*sysccprovider.ChaincodeInstance) - for tIdx, d := range block.Data.Data { - if d != nil { - if env, err := utils.GetEnvelopeFromBlock(d); err != nil { - logger.Warningf("Error getting tx from block(%s)", err) - txsfltr.SetFlag(tIdx, peer.TxValidationCode_INVALID_OTHER_REASON) - } else if env != nil { - // validate the transaction: here we check that the transaction - // is properly formed, properly signed and that the security - // chain binding proposal to endorsements to tx holds. We do - // NOT check the validity of endorsements, though. That's a - // job for VSCC below - logger.Debug("Validating transaction peer.ValidateTransaction()") - var payload *common.Payload - var err error - var txResult peer.TxValidationCode - - if payload, txResult = validation.ValidateTransaction(env); txResult != peer.TxValidationCode_VALID { - logger.Errorf("Invalid transaction with index %d", tIdx) - txsfltr.SetFlag(tIdx, txResult) - continue - } - chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) - if err != nil { - logger.Warningf("Could not unmarshal channel header, err %s, skipping", err) - txsfltr.SetFlag(tIdx, peer.TxValidationCode_INVALID_OTHER_REASON) - continue - } + results := make(chan *blockValidationResult) + go func() { + for tIdx, d := range block.Data.Data { + tIdxLcl := tIdx + dLcl := d + + // ensure that we don't have too many concurrent validation workers + v.support.Acquire(context.Background(), 1) + + go func() { + defer v.support.Release(1) + + validateTx(&blockValidationRequest{ + d: dLcl, + block: block, + tIdx: tIdxLcl, + v: v, + }, results) + }() + } + }() - channel := chdr.ChannelId - logger.Debugf("Transaction is for chain %s", channel) + logger.Debugf("expecting %d block validation responses", len(block.Data.Data)) - if !v.chainExists(channel) { - logger.Errorf("Dropping transaction for non-existent chain %s", channel) - txsfltr.SetFlag(tIdx, peer.TxValidationCode_TARGET_CHAIN_NOT_FOUND) - continue - } + // now we read responses in the order in which they come back + for i := 0; i < len(block.Data.Data); i++ { + res := <-results - if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION { - // Check duplicate transactions - txID := chdr.TxId - if _, err := v.support.Ledger().GetTransactionByID(txID); err == nil { - logger.Error("Duplicate transaction found, ", txID, ", skipping") - txsfltr.SetFlag(tIdx, peer.TxValidationCode_DUPLICATE_TXID) - continue - } + if res.err != nil { + // if there is an error, we buffer its value, wait for + // all workers to complete validation and then return + // the error from the first tx in this block that returned an error + logger.Debugf("got terminal error %s for idx %d", res.err, res.tIdx) - // Validate tx with vscc and policy - logger.Debug("Validating transaction vscc tx validate") - err, cde := v.vscc.VSCCValidateTx(payload, d, env) - if err != nil { - txID := txID - logger.Errorf("VSCCValidateTx for transaction txId = %s returned error %s", txID, err) - switch err.(type) { - case *VSCCExecutionFailureError: - return err - case *VSCCInfoLookupFailureError: - return err - default: - txsfltr.SetFlag(tIdx, cde) - continue - } - } + if err == nil || res.tIdx < errPos { + err = res.err + errPos = res.tIdx + } + } else { + // if there was no error, we set the txsfltr and we set the + // txsChaincodeNames and txsUpgradedChaincodes maps + logger.Debugf("got result for idx %d, code %d", res.tIdx, res.validationCode) - invokeCC, upgradeCC, err := v.getTxCCInstance(payload) - if err != nil { - logger.Errorf("Get chaincode instance from transaction txId = %s returned error %s", txID, err) - txsfltr.SetFlag(tIdx, peer.TxValidationCode_INVALID_OTHER_REASON) - continue - } - txsChaincodeNames[tIdx] = invokeCC - if upgradeCC != nil { - logger.Infof("Find chaincode upgrade transaction for chaincode %s on chain %s with new version %s", upgradeCC.ChaincodeName, upgradeCC.ChainID, upgradeCC.ChaincodeVersion) - txsUpgradedChaincodes[tIdx] = upgradeCC - } - } else if common.HeaderType(chdr.Type) == common.HeaderType_CONFIG { - configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) - if err != nil { - err := fmt.Errorf("Error unmarshaling config which passed initial validity checks: %s", err) - logger.Critical(err) - return err - } + txsfltr.SetFlag(res.tIdx, res.validationCode) - if err := v.support.Apply(configEnvelope); err != nil { - err := fmt.Errorf("Error validating config which passed initial validity checks: %s", err) - logger.Critical(err) - return err - } - logger.Debugf("config transaction received for chain %s", channel) - } else { - logger.Warningf("Unknown transaction type [%s] in block number [%d] transaction index [%d]", - common.HeaderType(chdr.Type), block.Header.Number, tIdx) - txsfltr.SetFlag(tIdx, peer.TxValidationCode_UNKNOWN_TX_TYPE) - continue + if res.validationCode == peer.TxValidationCode_VALID { + if res.txsChaincodeName != nil { + txsChaincodeNames[res.tIdx] = res.txsChaincodeName } - - if _, err := proto.Marshal(env); err != nil { - logger.Warningf("Cannot marshal transaction due to %s", err) - txsfltr.SetFlag(tIdx, peer.TxValidationCode_MARSHAL_TX_ERROR) - continue + if res.txsUpgradedChaincode != nil { + txsUpgradedChaincodes[res.tIdx] = res.txsUpgradedChaincode } - // Succeeded to pass down here, transaction is valid - txsfltr.SetFlag(tIdx, peer.TxValidationCode_VALID) - } else { - logger.Warning("Nil tx from block") - txsfltr.SetFlag(tIdx, peer.TxValidationCode_NIL_ENVELOPE) } } } + // if we're here, all workers have completed the validation. + // If there was an error we return the error from the first + // tx in this block that returned an error + if err != nil { + return err + } + + // if we're here, all workers have completed validation and + // no error was reported; we set the tx filter and return + // success + txsfltr = v.invalidTXsForUpgradeCC(txsChaincodeNames, txsUpgradedChaincodes, txsfltr) // Initialize metadata structure @@ -261,6 +265,183 @@ func (v *txValidator) Validate(block *common.Block) error { return nil } +func validateTx(req *blockValidationRequest, results chan<- *blockValidationResult) { + block := req.block + d := req.d + tIdx := req.tIdx + v := req.v + + if d == nil { + results <- &blockValidationResult{ + tIdx: tIdx, + } + return + } + + if env, err := utils.GetEnvelopeFromBlock(d); err != nil { + logger.Warningf("Error getting tx from block(%s)", err) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_INVALID_OTHER_REASON, + } + return + } else if env != nil { + // validate the transaction: here we check that the transaction + // is properly formed, properly signed and that the security + // chain binding proposal to endorsements to tx holds. We do + // NOT check the validity of endorsements, though. That's a + // job for VSCC below + logger.Debugf("validateTx starts for block %p env %p txn %d", block, env, tIdx) + defer logger.Debugf("validateTx completes for block %p env %p txn %d", block, env, tIdx) + var payload *common.Payload + var err error + var txResult peer.TxValidationCode + var txsChaincodeName *sysccprovider.ChaincodeInstance + var txsUpgradedChaincode *sysccprovider.ChaincodeInstance + + if payload, txResult = validation.ValidateTransaction(env); txResult != peer.TxValidationCode_VALID { + logger.Errorf("Invalid transaction with index %d", tIdx) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: txResult, + } + return + } + + chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) + if err != nil { + logger.Warningf("Could not unmarshal channel header, err %s, skipping", err) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_INVALID_OTHER_REASON, + } + return + } + + channel := chdr.ChannelId + logger.Debugf("Transaction is for chain %s", channel) + + if !v.chainExists(channel) { + logger.Errorf("Dropping transaction for non-existent chain %s", channel) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_TARGET_CHAIN_NOT_FOUND, + } + return + } + + if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION { + // Check duplicate transactions + txID := chdr.TxId + if _, err := v.support.Ledger().GetTransactionByID(txID); err == nil { + logger.Error("Duplicate transaction found, ", txID, ", skipping") + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_DUPLICATE_TXID, + } + return + } + + // Validate tx with vscc and policy + logger.Debug("Validating transaction vscc tx validate") + err, cde := v.vscc.VSCCValidateTx(payload, d, env) + if err != nil { + txID := txID + logger.Errorf("VSCCValidateTx for transaction txId = %s returned error %s", txID, err) + switch err.(type) { + case *VSCCExecutionFailureError: + results <- &blockValidationResult{ + tIdx: tIdx, + err: err, + } + return + case *VSCCInfoLookupFailureError: + results <- &blockValidationResult{ + tIdx: tIdx, + err: err, + } + return + default: + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: cde, + } + return + } + } + + invokeCC, upgradeCC, err := v.getTxCCInstance(payload) + if err != nil { + logger.Errorf("Get chaincode instance from transaction txId = %s returned error %s", txID, err) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_INVALID_OTHER_REASON, + } + return + } + txsChaincodeName = invokeCC + if upgradeCC != nil { + logger.Infof("Find chaincode upgrade transaction for chaincode %s on chain %s with new version %s", upgradeCC.ChaincodeName, upgradeCC.ChainID, upgradeCC.ChaincodeVersion) + txsUpgradedChaincode = upgradeCC + } + } else if common.HeaderType(chdr.Type) == common.HeaderType_CONFIG { + configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) + if err != nil { + err := fmt.Errorf("Error unmarshaling config which passed initial validity checks: %s", err) + logger.Critical(err) + results <- &blockValidationResult{ + tIdx: tIdx, + err: err, + } + return + } + + if err := v.support.Apply(configEnvelope); err != nil { + err := fmt.Errorf("Error validating config which passed initial validity checks: %s", err) + logger.Critical(err) + results <- &blockValidationResult{ + tIdx: tIdx, + err: err, + } + return + } + logger.Debugf("config transaction received for chain %s", channel) + } else { + logger.Warningf("Unknown transaction type [%s] in block number [%d] transaction index [%d]", + common.HeaderType(chdr.Type), block.Header.Number, tIdx) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_UNKNOWN_TX_TYPE, + } + return + } + + if _, err := proto.Marshal(env); err != nil { + logger.Warningf("Cannot marshal transaction due to %s", err) + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_MARSHAL_TX_ERROR, + } + return + } + // Succeeded to pass down here, transaction is valid + results <- &blockValidationResult{ + tIdx: tIdx, + txsChaincodeName: txsChaincodeName, + txsUpgradedChaincode: txsUpgradedChaincode, + validationCode: peer.TxValidationCode_VALID, + } + return + } else { + logger.Warning("Nil tx from block") + results <- &blockValidationResult{ + tIdx: tIdx, + validationCode: peer.TxValidationCode_NIL_ENVELOPE, + } + return + } +} + // generateCCKey generates a unique identifier for chaincode in specific chain func (v *txValidator) generateCCKey(ccName, chainID string) string { return fmt.Sprintf("%s/%s", ccName, chainID) @@ -430,6 +611,9 @@ func (v *vsccValidatorImpl) GetInfoForValidate(txid, chID, ccID string) (*sysccp } func (v *vsccValidatorImpl) VSCCValidateTx(payload *common.Payload, envBytes []byte, env *common.Envelope) (error, peer.TxValidationCode) { + logger.Debugf("VSCCValidateTx starts for env %p envbytes %p", env, envBytes) + defer logger.Debugf("VSCCValidateTx completes for env %p envbytes %p", env, envBytes) + // get header extensions so we have the chaincode ID hdrExt, err := utils.GetChaincodeHeaderExtension(payload.Header) if err != nil { @@ -590,13 +774,15 @@ func (v *vsccValidatorImpl) VSCCValidateTx(payload *common.Payload, envBytes []b } func (v *vsccValidatorImpl) VSCCValidateTxForCC(envBytes []byte, txid, chid, vsccName, vsccVer string, policy []byte) error { - ctxt, err := v.ccprovider.GetContext(v.support.Ledger(), txid) + logger.Debugf("VSCCValidateTxForCC starts for envbytes %p", envBytes) + defer logger.Debugf("VSCCValidateTxForCC completes for envbytes %p", envBytes) + ctxt, txsim, err := v.ccprovider.GetContext(v.support.Ledger(), txid) if err != nil { msg := fmt.Sprintf("Cannot obtain context for txid=%s, err %s", txid, err) logger.Errorf(msg) return &VSCCExecutionFailureError{msg} } - defer v.ccprovider.ReleaseContext() + defer txsim.Done() // build arguments for VSCC invocation // args[0] - function name (not used now) diff --git a/core/committer/txvalidator/validator_test.go b/core/committer/txvalidator/validator_test.go index 82cb3470060..79476d4beab 100644 --- a/core/committer/txvalidator/validator_test.go +++ b/core/committer/txvalidator/validator_test.go @@ -45,6 +45,7 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "golang.org/x/sync/semaphore" ) func signedByAnyMember(ids []string) []byte { @@ -59,7 +60,11 @@ func setupLedgerAndValidator(t *testing.T) (ledger.PeerLedger, Validator) { assert.NoError(t, err) theLedger, err := ledgermgmt.CreateLedger(gb) assert.NoError(t, err) - theValidator := NewTxValidator(&mockSupport{l: theLedger}) + vcs := struct { + *mockSupport + *semaphore.Weighted + }{&mockSupport{l: theLedger}, semaphore.NewWeighted(10)} + theValidator := NewTxValidator(vcs) return theLedger, theValidator } @@ -562,7 +567,11 @@ func (exec *mockQueryExecutor) Done() { // returned from the function call. func TestLedgerIsNoAvailable(t *testing.T) { theLedger := new(mockLedger) - validator := NewTxValidator(&mockSupport{l: theLedger}) + vcs := struct { + *mockSupport + *semaphore.Weighted + }{&mockSupport{l: theLedger}, semaphore.NewWeighted(10)} + validator := NewTxValidator(vcs) ccID := "mycc" tx := getEnv(ccID, createRWset(t, ccID), t) @@ -586,7 +595,11 @@ func TestLedgerIsNoAvailable(t *testing.T) { func TestValidationInvalidEndorsing(t *testing.T) { theLedger := new(mockLedger) - validator := NewTxValidator(&mockSupport{l: theLedger}) + vcs := struct { + *mockSupport + *semaphore.Weighted + }{&mockSupport{l: theLedger}, semaphore.NewWeighted(10)} + validator := NewTxValidator(vcs) ccID := "mycc" tx := getEnv(ccID, createRWset(t, ccID), t) diff --git a/core/common/ccprovider/ccprovider.go b/core/common/ccprovider/ccprovider.go index d8fccd3a77d..b57610a8e22 100644 --- a/core/common/ccprovider/ccprovider.go +++ b/core/common/ccprovider/ccprovider.go @@ -424,8 +424,10 @@ func (*ChaincodeData) ProtoMessage() {} // chaincode package without importing it; more methods // should be added below if necessary type ChaincodeProvider interface { - // GetContext returns a ledger context - GetContext(ledger ledger.PeerLedger, txid string) (context.Context, error) + // GetContext returns a ledger context and a tx simulator; it's the + // caller's responsability to release the simulator by calling its + // done method once it is no longer useful + GetContext(ledger ledger.PeerLedger, txid string) (context.Context, ledger.TxSimulator, error) // GetCCContext returns an opaque chaincode context GetCCContext(cid, name, version, txid string, syscc bool, signedProp *pb.SignedProposal, prop *pb.Proposal) interface{} // GetCCValidationInfoFromLSCC returns the VSCC and the policy listed by LSCC for the supplied chaincode @@ -438,8 +440,6 @@ type ChaincodeProvider interface { ExecuteWithErrorFilter(ctxt context.Context, cccid interface{}, spec interface{}) ([]byte, *pb.ChaincodeEvent, error) // Stop stops the chaincode given context and deployment spec Stop(ctxt context.Context, cccid interface{}, spec *pb.ChaincodeDeploymentSpec) error - // ReleaseContext releases the context returned previously by GetContext - ReleaseContext() } var ccFactory ChaincodeProviderFactory diff --git a/core/mocks/ccprovider/ccprovider.go b/core/mocks/ccprovider/ccprovider.go index f3e61c17b78..313e2c5a9f1 100644 --- a/core/mocks/ccprovider/ccprovider.go +++ b/core/mocks/ccprovider/ccprovider.go @@ -19,6 +19,7 @@ package ccprovider import ( "context" + commonledger "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/core/chaincode/shim" "github.com/hyperledger/fabric/core/common/ccprovider" "github.com/hyperledger/fabric/core/ledger" @@ -48,11 +49,81 @@ type mockCcProviderImpl struct { type mockCcProviderContextImpl struct { } -// GetContext does nothing -func (c *mockCcProviderImpl) GetContext(ledger ledger.PeerLedger, txid string) (context.Context, error) { +type mockTxSim struct { +} + +func (m *mockTxSim) GetState(namespace string, key string) ([]byte, error) { + return nil, nil +} + +func (m *mockTxSim) GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) { + return nil, nil +} + +func (m *mockTxSim) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error) { return nil, nil } +func (m *mockTxSim) ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error) { + return nil, nil +} + +func (m *mockTxSim) Done() { +} + +func (m *mockTxSim) SetState(namespace string, key string, value []byte) error { + return nil +} + +func (m *mockTxSim) DeleteState(namespace string, key string) error { + return nil +} + +func (m *mockTxSim) SetStateMultipleKeys(namespace string, kvs map[string][]byte) error { + return nil +} + +func (m *mockTxSim) ExecuteUpdate(query string) error { + return nil +} + +func (m *mockTxSim) GetTxSimulationResults() (*ledger.TxSimulationResults, error) { + return nil, nil +} + +func (m *mockTxSim) DeletePrivateData(namespace, collection, key string) error { + return nil +} + +func (m *mockTxSim) ExecuteQueryOnPrivateData(namespace, collection, query string) (commonledger.ResultsIterator, error) { + return nil, nil +} + +func (m *mockTxSim) GetPrivateData(namespace, collection, key string) ([]byte, error) { + return nil, nil +} + +func (m *mockTxSim) GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([][]byte, error) { + return nil, nil +} + +func (m *mockTxSim) GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (commonledger.ResultsIterator, error) { + return nil, nil +} + +func (m *mockTxSim) SetPrivateData(namespace, collection, key string, value []byte) error { + return nil +} + +func (m *mockTxSim) SetPrivateDataMultipleKeys(namespace, collection string, kvs map[string][]byte) error { + return nil +} + +// GetContext does nothing +func (c *mockCcProviderImpl) GetContext(ledger ledger.PeerLedger, txid string) (context.Context, ledger.TxSimulator, error) { + return nil, &mockTxSim{}, nil +} + // GetCCContext does nothing func (c *mockCcProviderImpl) GetCCContext(cid, name, version, txid string, syscc bool, signedProp *peer.SignedProposal, prop *peer.Proposal) interface{} { return &mockCcProviderContextImpl{} @@ -85,7 +156,3 @@ func (c *mockCcProviderImpl) ExecuteWithErrorFilter(ctxt context.Context, cccid func (c *mockCcProviderImpl) Stop(ctxt context.Context, cccid interface{}, spec *peer.ChaincodeDeploymentSpec) error { return nil } - -// ReleaseContext does nothing -func (c *mockCcProviderImpl) ReleaseContext() { -} diff --git a/core/peer/peer.go b/core/peer/peer.go index 7b0bdb8dfe3..521ec9f313c 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -9,6 +9,7 @@ package peer import ( "fmt" "net" + "runtime" "sync" "github.com/hyperledger/fabric/common/channelconfig" @@ -35,6 +36,7 @@ import ( "github.com/hyperledger/fabric/protos/utils" "github.com/pkg/errors" "github.com/spf13/viper" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" ) @@ -152,10 +154,20 @@ func MockSetMSPIDGetter(mspIDGetter func(string) []string) { mockMSPIDGetter = mspIDGetter } +// validationWorkersSemaphore is the semaphore used to ensure that +// there are not too many concurrent tx validation goroutines +var validationWorkersSemaphore *semaphore.Weighted + // Initialize sets up any chains that the peer has from the persistence. This // function should be called at the start up when the ledger and gossip // ready func Initialize(init func(string)) { + nWorkers := viper.GetInt("peer.validatorPoolSize") + if nWorkers <= 0 { + nWorkers = runtime.NumCPU() + } + validationWorkersSemaphore = semaphore.NewWeighted(int64(nWorkers)) + chainInitializer = init var cb *common.Block @@ -300,7 +312,11 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error { cs.Resources = bundleSource cs.bundleSource = bundleSource - validator := txvalidator.NewTxValidator(cs) + vcs := struct { + *chainSupport + *semaphore.Weighted + }{cs, validationWorkersSemaphore} + validator := txvalidator.NewTxValidator(vcs) c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error { chainID, err := utils.GetChainIDFromBlock(block) if err != nil { diff --git a/core/scc/sysccapi.go b/core/scc/sysccapi.go index fa8db725fe8..12dc892641f 100644 --- a/core/scc/sysccapi.go +++ b/core/scc/sysccapi.go @@ -110,14 +110,14 @@ func deploySysCC(chainID string, syscc *SystemChaincode) error { //init can do GetState (and other Get's) even if Puts cannot be //be handled. Need ledger for this - ctxt2, err := ccprov.GetContext(lgr, txid) + ctxt2, txsim, err := ccprov.GetContext(lgr, txid) if err != nil { return err } ctxt = ctxt2 - defer ccprov.ReleaseContext() + defer txsim.Done() } chaincodeID := &pb.ChaincodeID{Path: syscc.Path, Name: syscc.Name} diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 431f53525f3..b49aaf670ba 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -276,6 +276,13 @@ peer: authFilter: "DefaultAuth" decorator: "DefaultDecorator" + # Number of goroutines that will execute transaction validation in parallel. + # By default, the peer chooses the number of CPUs on the machine. Set this + # variable to override that choice. + # NOTE: overriding this value might negatively influence the performance of + # the peer so please change this value only if you know what you're doing + validatorPoolSize: + ############################################################################### # # VM section diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000000..6a66aea5eaf --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000000..733099041f8 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 00000000000..e9d2d79a97f --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,131 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "sync" + + // Use the old context because packages that depend on this one + // (e.g. cloud.google.com/go/...) must run on Go 1.6. + // TODO(jba): update to "context" when possible. + "golang.org/x/net/context" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking only until ctx +// is done. On success, returns nil. On failure, returns ctx.Err() and leaves +// the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + s.waiters.Remove(elem) + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: bad release") + } + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } + s.mu.Unlock() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index b57a98b69f3..b0788187d6c 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -638,6 +638,12 @@ "revision": "b7f5d985f9013f771282befb2c58ef0fc45fe332", "revisionTime": "2015-10-26T13:59:20-05:00" }, + { + "checksumSHA1": "vcN67ZjTbGpLLwSghHCbAEvmzMo=", + "path": "golang.org/x/sync/semaphore", + "revision": "8e0aa688b654ef28caa72506fa5ec8dba9fc7690", + "revisionTime": "2017-07-19T03:38:01Z" + }, { "checksumSHA1": "aVgPDgwY3/t4J/JOw9H3FVMHqh0=", "path": "golang.org/x/sys/unix",