From 5bdca86793e75a4b68eb40c26382dd736349f64f Mon Sep 17 00:00:00 2001 From: Srinivasan Muralidharan Date: Wed, 28 Dec 2016 19:23:06 -0500 Subject: [PATCH] fab-1475 make CC fmk allow concurrent invokes https://jira.hyperledger.org/browse/FAB-1475 Summary ======= With pre-consensus simulation, multiple chains and relaxation by the ledger to simulate versions of chaincode state concurrently, we can now allow chaincode framework to execute invokes concurrently. This CR enables this. This CR enables concurrency basically by removing the FSM states that enforced serialization (so basically all the FSM changes in chaincode/hander.go and chaincode/shim/handler.go). The CR also has a "Chaincode Checker" program which has the potential for much bigger things . the tooling test their chaincodes for consistency . the tooling for stressing the fabric The concurrency enablement was tested with the "ccchecker". Details ======= The submit will basically have 4 things . changes to 3 chaincode framework files handler.go files to enable concurrency . concurrency_test.go to run 100 concurrent invokes followed by 100 concurrent queries . a complete "ccchecker" example framework for testing and validating chaincodes . exports some functions under fabric/peer/chaincode CLI for use by the above ccchecker example framework "ccchecker" comes with a sample "newkeyperinvoke" chaincode that should NEVER fail ledger consistency checks. To test simply follow these steps . vagrant window 1 - start orderer ./orderer . vagrant window 2 - start peer peer node start . vagrant window 3 - bring up chaincode for test cd peer //deploy the chaincode used by ccchecker out of the box peer chaincode deploy -n mycc -p github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke -c '{"Args":[""]}' //wait for commit say for about 10 secs and then issue a query to bring the CC up peer chaincode query -n mycc -c '{"Args":["get","a"]}' //verify the chaincode is up docker ps . vagrant window 4 - run test cd examples/ccchecker go build ./ccchecker The above reads from ccchecker.json and executes tests concurrently. Change-Id: I5267b19f03ed10003eb28facf87693525f0dcd1a Signed-off-by: Srinivasan Muralidharan --- core/chaincode/chaincode_support.go | 23 +- core/chaincode/concurrency_test.go | 141 ++++++++ core/chaincode/exectransaction_test.go | 46 +-- core/chaincode/handler.go | 135 +++---- core/chaincode/shim/chaincode.go | 20 +- core/chaincode/shim/handler.go | 137 ++++--- core/committer/txvalidator/validator.go | 10 +- core/util/utils.go | 2 +- examples/ccchecker/ccchecker.go | 179 +++++++++ examples/ccchecker/ccchecker.json | 17 + examples/ccchecker/chaincodes/chaincodes.go | 339 ++++++++++++++++++ .../newkeyperinvoke/newkeyperinvoke.go | 64 ++++ .../newkeyperinvoke/shadow/newkeyperinvoke.go | 110 ++++++ .../ccchecker/chaincodes/registershadow.go | 60 ++++ examples/ccchecker/init.go | 79 ++++ examples/ccchecker/main.go | 62 ++++ peer/chaincode/common.go | 118 +++--- peer/common/common.go | 49 +++ peer/main.go | 45 +-- 19 files changed, 1345 insertions(+), 291 deletions(-) create mode 100644 core/chaincode/concurrency_test.go create mode 100644 examples/ccchecker/ccchecker.go create mode 100644 examples/ccchecker/ccchecker.json create mode 100644 examples/ccchecker/chaincodes/chaincodes.go create mode 100644 examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go create mode 100644 examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go create mode 100644 examples/ccchecker/chaincodes/registershadow.go create mode 100644 examples/ccchecker/init.go create mode 100644 examples/ccchecker/main.go diff --git a/core/chaincode/chaincode_support.go b/core/chaincode/chaincode_support.go index a34865a1590..21ebfd7ade2 100644 --- a/core/chaincode/chaincode_support.go +++ b/core/chaincode/chaincode_support.go @@ -397,22 +397,21 @@ func (chaincodeSupport *ChaincodeSupport) getArgsAndEnv(cccid *CCContext, cLang } // launchAndWaitForRegister will launch container if not already running. Use the targz to create the image if not found -func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, targz io.Reader) (bool, error) { +func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, targz io.Reader) error { canName := cccid.GetCanonicalName() if canName == "" { - return false, fmt.Errorf("chaincode name not set") + return fmt.Errorf("chaincode name not set") } chaincodeSupport.runningChaincodes.Lock() - var ok bool - //if its in the map, there must be a connected stream...nothing to do - if _, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); ok { - chaincodeLogger.Debugf("chaincode is running and ready: %s", canName) + //if its in the map, its either up or being launched. Either case break the + //multiple launch by failing + if _, hasBeenLaunched := chaincodeSupport.chaincodeHasBeenLaunched(canName); hasBeenLaunched { chaincodeSupport.runningChaincodes.Unlock() - return true, nil + return fmt.Errorf("Error chaincode is being launched: %s", canName) } - alreadyRunning := false + //chaincodeHasBeenLaunch false... its not in the map, add it and proceed to launch notfy := chaincodeSupport.preLaunchSetup(canName) chaincodeSupport.runningChaincodes.Unlock() @@ -420,7 +419,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. args, env, err := chaincodeSupport.getArgsAndEnv(cccid, cLang) if err != nil { - return alreadyRunning, err + return err } chaincodeLogger.Debugf("start container: %s(networkid:%s,peerid:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID) @@ -440,7 +439,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. chaincodeSupport.runningChaincodes.Lock() delete(chaincodeSupport.runningChaincodes.chaincodeMap, canName) chaincodeSupport.runningChaincodes.Unlock() - return alreadyRunning, err + return err } //wait for REGISTER state @@ -459,7 +458,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. chaincodeLogger.Debugf("error on stop %s(%s)", errIgnore, err) } } - return alreadyRunning, err + return err } //Stop stops a chaincode if running @@ -577,7 +576,7 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid //launch container if it is a System container or not in dev mode if (!chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) && (chrte == nil || chrte.handler == nil) { var targz io.Reader = bytes.NewBuffer(cds.CodePackage) - _, err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, targz) + err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, targz) if err != nil { chaincodeLogger.Errorf("launchAndWaitForRegister failed %s", err) return cID, cMsg, err diff --git a/core/chaincode/concurrency_test.go b/core/chaincode/concurrency_test.go new file mode 100644 index 00000000000..371641e959c --- /dev/null +++ b/core/chaincode/concurrency_test.go @@ -0,0 +1,141 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaincode + +import ( + "fmt" + "sync" + "testing" + + "github.com/hyperledger/fabric/core/util" + pb "github.com/hyperledger/fabric/protos/peer" + + "golang.org/x/net/context" +) + +//TestExecuteConcurrentInvokes deploys newkeyperinvoke and runs 100 concurrent invokes +//followed by concurrent 100 queries to validate +func TestExecuteConcurrentInvokes(t *testing.T) { + chainID := util.GetTestChainID() + + lis, err := initPeer(chainID) + if err != nil { + t.Fail() + t.Logf("Error creating peer: %s", err) + } + + defer finitPeer(lis, chainID) + + var ctxt = context.Background() + + url := "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke" + + chaincodeID := &pb.ChaincodeID{Name: "nkpi", Path: url} + + args := util.ToChaincodeArgs("init", "") + + spec := &pb.ChaincodeSpec{Type: 1, ChaincodeID: chaincodeID, CtorMsg: &pb.ChaincodeInput{Args: args}} + + cccid := NewCCContext(chainID, "nkpi", "0", "", false, nil) + + defer theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + + _, err = deploy(ctxt, cccid, spec) + if err != nil { + t.Fail() + t.Logf("Error initializing chaincode %s(%s)", chaincodeID, err) + return + } + + var wg sync.WaitGroup + + //run 100 invokes in parallel + numTrans := 100 + + results := make([][]byte, numTrans) + errs := make([]error, numTrans) + + e := func(inv bool, qnum int) { + defer wg.Done() + + newkey := fmt.Sprintf("%d", qnum) + + var args [][]byte + if inv { + args = util.ToChaincodeArgs("put", newkey, newkey) + } else { + args = util.ToChaincodeArgs("get", newkey) + } + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: chaincodeID, CtorMsg: &pb.ChaincodeInput{Args: args}} + + //start with a new background + _, _, results[qnum], err = invoke(context.Background(), chainID, spec) + + if err != nil { + errs[qnum] = fmt.Errorf("Error executing <%s>: %s", chaincodeID.Name, err) + return + } + } + + wg.Add(numTrans) + + //execute transactions concurrently. + for i := 0; i < numTrans; i++ { + go e(true, i) + } + + wg.Wait() + + for i := 0; i < numTrans; i++ { + if errs[i] != nil { + t.Fail() + t.Logf("Error invoking chaincode iter %d %s(%s)", i, chaincodeID.Name, errs[i]) + } + if results[i] == nil || string(results[i]) != "OK" { + t.Fail() + t.Logf("Error concurrent invoke %d %s", i, chaincodeID.Name) + return + } + } + + wg.Add(numTrans) + + //execute queries concurrently. + for i := 0; i < numTrans; i++ { + go e(false, i) + } + + wg.Wait() + + for i := 0; i < numTrans; i++ { + if errs[i] != nil { + t.Fail() + t.Logf("Error querying chaincode iter %d %s(%s)", i, chaincodeID.Name, errs[i]) + return + } + if results[i] == nil || string(results[i]) != fmt.Sprintf("%d", i) { + t.Fail() + if results[i] == nil { + t.Logf("Error concurrent query %d(%s)", i, chaincodeID.Name) + } else { + t.Logf("Error concurrent query %d(%s, %s, %v)", i, chaincodeID.Name, string(results[i]), results[i]) + } + return + } + } +} diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index cac219c35c8..5315e321063 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -161,6 +161,16 @@ func endTxSimulationCIS(chainID string, txid string, txsim ledger.TxSimulator, p return endTxSimulation(chainID, txsim, payload, commit, prop) } +//getting a crash from ledger.Commit when doing concurrent invokes +//It is likely intentional that ledger.Commit is serial (ie, the real +//Committer will invoke this serially on each block). Mimic that here +//by forcing serialization of the ledger.Commit call. +// +//NOTE-this should NOT have any effect on the older serial tests. +//This affects only the tests in concurrent_test.go which call these +//concurrently (100 concurrent invokes followed by 100 concurrent queries) +var _commitLock_ sync.Mutex + func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, commit bool, prop *pb.Proposal) error { txsim.Done() if lgr := peer.GetLedger(chainID); lgr != nil { @@ -194,6 +204,10 @@ func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, c block := common.NewBlock(1, []byte{}) block.Data.Data = [][]byte{envBytes} //commit the block + + //see comment on _commitLock_ + _commitLock_.Lock() + defer _commitLock_.Unlock() if err := lgr.Commit(block); err != nil { return err } @@ -601,38 +615,6 @@ func TestExecuteInvokeTransaction(t *testing.T) { theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: &pb.ChaincodeSpec{ChaincodeID: chaincodeID}}) } -// Execute multiple transactions and queries. -func exec(ctxt context.Context, chainID string, chaincodeID string, numTrans int, numQueries int) []error { - var wg sync.WaitGroup - errs := make([]error, numTrans+numQueries) - - e := func(qnum int) { - defer wg.Done() - var spec *pb.ChaincodeSpec - args := util.ToChaincodeArgs("invoke", "a", "b", "10") - - spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: &pb.ChaincodeID{Name: chaincodeID}, CtorMsg: &pb.ChaincodeInput{Args: args}} - - _, _, _, err := invoke(ctxt, chainID, spec) - - if err != nil { - errs[qnum] = fmt.Errorf("Error executing <%s>: %s", chaincodeID, err) - return - } - } - wg.Add(numTrans + numQueries) - - //execute transactions sequentially.. - go func() { - for i := 0; i < numTrans; i++ { - e(i) - } - }() - - wg.Wait() - return errs -} - // Test the execution of an invalid transaction. func TestExecuteInvokeInvalidTransaction(t *testing.T) { chainID := util.GetTestChainID() diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index fb51b898206..21356b1707c 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -38,9 +38,6 @@ const ( establishedstate = "established" //in: CREATED, rcv: REGISTER, send: REGISTERED, INIT initstate = "init" //in:ESTABLISHED, rcv:-, send: INIT readystate = "ready" //in:ESTABLISHED,TRANSACTION, rcv:COMPLETED - transactionstate = "transaction" //in:READY, rcv: xact from consensus, send: TRANSACTION - busyinitstate = "busyinit" //in:INIT, rcv: PUT_STATE, DEL_STATE, INVOKE_CHAINCODE - busyxactstate = "busyxact" //in:TRANSACION, rcv: PUT_STATE, DEL_STATE, INVOKE_CHAINCODE endstate = "end" //in:INIT,ESTABLISHED, rcv: error, terminate container ) @@ -145,14 +142,31 @@ func (handler *Handler) getCCRootName() string { return handler.ccCompParts.name } +//serialSend serializes msgs so gRPC will be happy func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error { handler.serialLock.Lock() defer handler.serialLock.Unlock() - if err := handler.ChatStream.Send(msg); err != nil { - chaincodeLogger.Errorf("Error sending %s: %s", msg.Type.String(), err) - return fmt.Errorf("Error sending %s: %s", msg.Type.String(), err) + + var err error + if err = handler.ChatStream.Send(msg); err != nil { + err = fmt.Errorf("[%s]Error sending %s: %s", shorttxid(msg.Txid), msg.Type.String(), err) + chaincodeLogger.Errorf("%s", err) } - return nil + return err +} + +//serialSendAsync serves the same purpose as serialSend (serializ msgs so gRPC will +//be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop +//can be nonblocking. Only errors need to be handled and these are handled by +//communication on supplied error channel. A typical use will be a non-blocking or +//nil channel +func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) { + go func() { + err := handler.serialSend(msg) + if errc != nil { + errc <- err + } + }() } func (handler *Handler) createTxContext(ctxt context.Context, chainID string, txid string, prop *pb.Proposal) (*transactionContext, error) { @@ -253,6 +267,9 @@ func (handler *Handler) processStream() error { //recv is used to spin Recv routine after previous received msg //has been processed recv := true + + //catch send errors and bail now that sends aren't synchronous + errc := make(chan error, 1) for { in = nil err = nil @@ -266,6 +283,12 @@ func (handler *Handler) processStream() error { }() } select { + case sendErr := <-errc: + if sendErr != nil { + return sendErr + } + //send was successful, just continue + continue case in = <-msgAvail: // Defer the deregistering of the this handler. if err == io.EOF { @@ -307,14 +330,9 @@ func (handler *Handler) processStream() error { continue } - //TODO we could use this to hook into container lifecycle (kill the chaincode if not in use, etc) - kaerr := handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}) - if kaerr != nil { - chaincodeLogger.Errorf("Error sending keepalive, err=%s", kaerr) - } else { - chaincodeLogger.Debug("Sent KEEPALIVE request") - } - //keepalive message kicked in. just continue + //if no error message from serialSend, KEEPALIVE happy, and don't care about error + //(maybe it'll work later) + handler.serialSendAsync(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}, nil) continue } @@ -326,10 +344,8 @@ func (handler *Handler) processStream() error { if nsInfo != nil && nsInfo.sendToCC { chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String()) - if err = handler.serialSend(in); err != nil { - chaincodeLogger.Debugf("[%s]serial sending received error %s", shorttxid(in.Txid), err) - return fmt.Errorf("[%s]serial sending received error %s", shorttxid(in.Txid), err) - } + //if error bail in select + handler.serialSendAsync(in, errc) } } } @@ -357,40 +373,26 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre {Name: pb.ChaincodeMessage_REGISTER.String(), Src: []string{createdstate}, Dst: establishedstate}, {Name: pb.ChaincodeMessage_INIT.String(), Src: []string{establishedstate}, Dst: initstate}, {Name: pb.ChaincodeMessage_READY.String(), Src: []string{establishedstate}, Dst: readystate}, - {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{transactionstate}, Dst: busyxactstate}, - {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{transactionstate}, Dst: busyxactstate}, - {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{transactionstate}, Dst: busyxactstate}, - {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{initstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{initstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{initstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{initstate, readystate, transactionstate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{initstate, readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{initstate}, Dst: endstate}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{transactionstate}, Dst: readystate}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{busyinitstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{busyxactstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{busyinitstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{busyxactstate}, Dst: transactionstate}, + {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: readystate}, }, fsm.Callbacks{ "before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) }, @@ -400,14 +402,12 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre "after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE.String(): func(e *fsm.Event) { v.afterRangeQueryState(e, v.FSM.Current()) }, "after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterRangeQueryStateNext(e, v.FSM.Current()) }, "after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterRangeQueryStateClose(e, v.FSM.Current()) }, - "after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.afterPutState(e, v.FSM.Current()) }, - "after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.afterDelState(e, v.FSM.Current()) }, - "after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.afterInvokeChaincode(e, v.FSM.Current()) }, + "after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, + "after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, + "after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, "enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) }, "enter_" + initstate: func(e *fsm.Event) { v.enterInitState(e, v.FSM.Current()) }, "enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) }, - "enter_" + busyinitstate: func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, - "enter_" + busyxactstate: func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, "enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) }, }, ) @@ -563,7 +563,7 @@ func (handler *Handler) handleGetState(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleGetState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() key := string(msg.Payload) @@ -635,7 +635,7 @@ func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() rangeQueryState := &pb.RangeQueryState{} @@ -742,7 +742,7 @@ func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() rangeQueryStateNext := &pb.RangeQueryStateNext{} @@ -840,7 +840,7 @@ func (handler *Handler) handleRangeQueryStateClose(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() rangeQueryStateClose := &pb.RangeQueryStateClose{} @@ -1132,6 +1132,11 @@ func (handler *Handler) initOrReady(ctxt context.Context, chainID string, txid s func (handler *Handler) HandleMessage(msg *pb.ChaincodeMessage) error { chaincodeLogger.Debugf("[%s]Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, handler.FSM.Current()) + if (msg.Type == pb.ChaincodeMessage_COMPLETED || msg.Type == pb.ChaincodeMessage_ERROR) && handler.FSM.Current() == "ready" { + chaincodeLogger.Debugf("[%s]HandleMessage- COMPLETED. Notify", msg.Txid) + handler.notify(msg) + return nil + } if handler.FSM.Cannot(msg.Type.String()) { // Other errors return fmt.Errorf("[%s]Chaincode handler validator FSM cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type.String(), len(msg.Payload), handler.FSM.Current()) @@ -1207,27 +1212,3 @@ func (handler *Handler) isRunning() bool { return true } } - -/**************** -func (handler *Handler) initEvent() (chan *pb.ChaincodeMessage, error) { - if handler.responseNotifiers == nil { - return nil,fmt.Errorf("SendMessage called before registration for Txid:%s", msg.Txid) - } - var notfy chan *pb.ChaincodeMessage - handler.Lock() - if handler.responseNotifiers[msg.Txid] != nil { - handler.Unlock() - return nil, fmt.Errorf("SendMessage Txid:%s exists", msg.Txid) - } - //note the explicit use of buffer 1. We won't block if the receiver times outi and does not wait - //for our response - handler.responseNotifiers[msg.Txid] = make(chan *pb.ChaincodeMessage, 1) - handler.Unlock() - - if err := c.serialSend(msg); err != nil { - deleteNotifier(msg.Txid) - return nil, fmt.Errorf("SendMessage error sending %s(%s)", msg.Txid, err) - } - return notfy, nil -} -*******************/ diff --git a/core/chaincode/shim/chaincode.go b/core/chaincode/shim/chaincode.go index a125b409b46..6050e50125c 100644 --- a/core/chaincode/shim/chaincode.go +++ b/core/chaincode/shim/chaincode.go @@ -187,8 +187,11 @@ func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode } // Register on the stream chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER) - handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}) + if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil { + return fmt.Errorf("Error sending chaincode REGISTER: %s", err) + } waitc := make(chan struct{}) + errc := make(chan error) go func() { defer close(waitc) msgAvail := make(chan *pb.ChaincodeMessage) @@ -208,6 +211,14 @@ func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode }() } select { + case sendErr := <-errc: + //serialSendAsync successful? + if sendErr == nil { + continue + } + //no, bail + err = fmt.Errorf("Error sending %s: %s", in.Type.String(), sendErr) + return case in = <-msgAvail: if err == io.EOF { chaincodeLogger.Debugf("Received EOF, ending chaincode stream, %s", err) @@ -241,12 +252,11 @@ func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode if (nsInfo != nil && nsInfo.sendToCC) || (in.Type == pb.ChaincodeMessage_KEEPALIVE) { if in.Type == pb.ChaincodeMessage_KEEPALIVE { chaincodeLogger.Debug("Sending KEEPALIVE response") + //ignore any errors, maybe next KEEPALIVE will work + handler.serialSendAsync(in, nil) } else { chaincodeLogger.Debugf("[%s]send state message %s", shorttxid(in.Txid), in.Type.String()) - } - if err = handler.serialSend(in); err != nil { - err = fmt.Errorf("Error sending %s: %s", in.Type.String(), err) - return + handler.serialSendAsync(in, errc) } } } diff --git a/core/chaincode/shim/handler.go b/core/chaincode/shim/handler.go index 09d819bf4e1..59c6d1061d7 100644 --- a/core/chaincode/shim/handler.go +++ b/core/chaincode/shim/handler.go @@ -64,14 +64,28 @@ func shorttxid(txid string) string { return txid[0:8] } +//serialSend serializes msgs so gRPC will be happy func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error { handler.serialLock.Lock() defer handler.serialLock.Unlock() - if err := handler.ChatStream.Send(msg); err != nil { - chaincodeLogger.Errorf("[%s]Error sending %s: %s", shorttxid(msg.Txid), msg.Type.String(), err) - return fmt.Errorf("Error sending %s: %s", msg.Type.String(), err) - } - return nil + + err := handler.ChatStream.Send(msg) + + return err +} + +//serialSendAsync serves the same purpose as serialSend (serializ msgs so gRPC will +//be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop +//can be nonblocking. Only errors need to be handled and these are handled by +//communication on supplied error channel. A typical use will be a non-blocking or +//nil channel +func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) { + go func() { + err := handler.serialSend(msg) + if errc != nil { + errc <- err + } + }() } func (handler *Handler) createChannel(txid string) (chan pb.ChaincodeMessage, error) { @@ -105,9 +119,32 @@ func (handler *Handler) sendChannel(msg *pb.ChaincodeMessage) error { return nil } -func (handler *Handler) receiveChannel(c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, bool) { - msg, val := <-c - return msg, val +//sends a message and selects +func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) { + errc := make(chan error, 1) + handler.serialSendAsync(msg, errc) + + //the serialsend above will send an err or nil + //the select filters that first error(or nil) + //and continues to wait for the response + //it is possible that the response triggers first + //in which case the errc obviously worked and is + //ignored + for { + select { + case err := <-errc: + if err == nil { + continue + } + //would have been logged, return false + return pb.ChaincodeMessage{}, err + case outmsg, val := <-c: + if !val { + return pb.ChaincodeMessage{}, fmt.Errorf("unexpected failure on receive") + } + return outmsg, nil + } + } } func (handler *Handler) deleteChannel(txid string) { @@ -136,19 +173,18 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode {Name: pb.ChaincodeMessage_READY.String(), Src: []string{"established"}, Dst: "ready"}, {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{"init"}, Dst: "established"}, {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{"init"}, Dst: "init"}, - {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"init"}, Dst: "ready"}, - {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{"ready"}, Dst: "transaction"}, - {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"transaction"}, Dst: "ready"}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{"transaction"}, Dst: "ready"}, - {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{"transaction"}, Dst: "transaction"}, + {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{"ready"}, Dst: "ready"}, {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{"ready"}, Dst: "ready"}, + {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{"ready"}, Dst: "ready"}, + {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"init"}, Dst: "ready"}, + {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"ready"}, Dst: "ready"}, }, fsm.Callbacks{ "before_" + pb.ChaincodeMessage_REGISTERED.String(): func(e *fsm.Event) { v.beforeRegistered(e) }, "after_" + pb.ChaincodeMessage_RESPONSE.String(): func(e *fsm.Event) { v.afterResponse(e) }, "after_" + pb.ChaincodeMessage_ERROR.String(): func(e *fsm.Event) { v.afterError(e) }, "enter_init": func(e *fsm.Event) { v.enterInitState(e) }, - "enter_transaction": func(e *fsm.Event) { v.enterTransactionState(e) }, + "before_" + pb.ChaincodeMessage_TRANSACTION.String(): func(e *fsm.Event) { v.enterTransactionState(e) }, }, ) return v @@ -283,9 +319,6 @@ func (handler *Handler) enterTransactionState(e *fsm.Event) { } } -// enterReadyState will need to handle COMPLETED event by sending message to the peer -//func (handler *Handler) enterReadyState(e *fsm.Event) { - // afterCompleted will need to handle COMPLETED event by sending message to the peer func (handler *Handler) afterCompleted(e *fsm.Event) { msg, ok := e.Args[0].(*pb.ChaincodeMessage) @@ -347,18 +380,12 @@ func (handler *Handler) handleGetState(key string, txid string) ([]byte, error) payload := []byte(key) msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_GET_STATE, Payload: payload, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_STATE) - if err := handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending GET_STATE %s", shorttxid(txid), err) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", shorttxid(responseMsg.Txid)) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]GetState received payload %s", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -397,18 +424,12 @@ func (handler *Handler) handlePutState(key string, value []byte, txid string) er // Send PUT_STATE message to validator chaincode support msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_PUT_STATE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_PUT_STATE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending PUT_STATE %s", msg.Txid, err) return errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", msg.Txid) - return errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully updated state", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -441,18 +462,12 @@ func (handler *Handler) handleDelState(key string, txid string) error { payload := []byte(key) msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_DEL_STATE, Payload: payload, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_DEL_STATE) - if err := handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending DEL_STATE %s", shorttxid(msg.Txid), pb.ChaincodeMessage_DEL_STATE) return errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", shorttxid(msg.Txid)) - return errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully deleted state", msg.Txid, pb.ChaincodeMessage_RESPONSE) @@ -487,18 +502,12 @@ func (handler *Handler) handleRangeQueryState(startKey, endKey string, txid stri } msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", txid) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -541,18 +550,12 @@ func (handler *Handler) handleRangeQueryStateNext(id, txid string) (*pb.RangeQue } msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", txid) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -595,18 +598,12 @@ func (handler *Handler) handleRangeQueryStateClose(id, txid string) (*pb.RangeQu } msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", txid) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -653,18 +650,12 @@ func (handler *Handler) handleInvokeChaincode(chaincodeName string, args [][]byt // Send INVOKE_CHAINCODE message to validator chaincode support msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_INVOKE_CHAINCODE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_INVOKE_CHAINCODE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_INVOKE_CHAINCODE) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", shorttxid(msg.Txid)) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully invoked chaincode", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) diff --git a/core/committer/txvalidator/validator.go b/core/committer/txvalidator/validator.go index 849960284f7..19636551378 100644 --- a/core/committer/txvalidator/validator.go +++ b/core/committer/txvalidator/validator.go @@ -88,7 +88,7 @@ func (v *txValidator) Validate(block *common.Block) { // TODO: this code needs to receive a bit more attention and discussion: // it's not clear what it means if a transaction which causes a failure // in validation is just dropped on the floor - logger.Errorf("Invalid transaction with index %s, error %s", tIdx, err) + logger.Errorf("Invalid transaction with index %d, error %s", tIdx, err) txsfltr.Set(uint(tIdx)) } else { //the payload is used to get headers @@ -153,9 +153,13 @@ func (v *vsccValidatorImpl) VSCCValidateTx(payload *common.Payload, envBytes []b defer txsim.Done() ctxt := context.WithValue(context.Background(), chaincode.TXSimulatorKey, txsim) + //generate an internal txid for executing system chaincode calls below on behalf + //of original txid + vscctxid := coreUtil.GenerateUUID() + // Extracting vscc from lccc /* - data, err := chaincode.GetChaincodeDataFromLCCC(ctxt, txid, nil, chainID, "vscc") + data, err := chaincode.GetChaincodeDataFromLCCC(ctxt, vscctxid, nil, chainID, "vscc") if err != nil { logger.Errorf("Unable to get chaincode data from LCCC for txid %s, due to %s", txid, err) return err @@ -164,7 +168,7 @@ func (v *vsccValidatorImpl) VSCCValidateTx(payload *common.Payload, envBytes []b // Get chaincode version version := coreUtil.GetSysCCVersion() - cccid := chaincode.NewCCContext(chainID, "vscc", version, txid, true, nil) + cccid := chaincode.NewCCContext(chainID, "vscc", version, vscctxid, true, nil) // invoke VSCC _, _, err = chaincode.ExecuteChaincode(ctxt, cccid, args) diff --git a/core/util/utils.go b/core/util/utils.go index 056263ad49b..fa93b69b4b1 100644 --- a/core/util/utils.go +++ b/core/util/utils.go @@ -25,7 +25,7 @@ import ( "strings" "time" - "github.com/hyperledger/fabric/metadata" + "github.com/hyperledger/fabric/common/metadata" "github.com/golang/protobuf/ptypes/timestamp" "golang.org/x/crypto/sha3" diff --git a/examples/ccchecker/ccchecker.go b/examples/ccchecker/ccchecker.go new file mode 100644 index 00000000000..77c5f11c228 --- /dev/null +++ b/examples/ccchecker/ccchecker.go @@ -0,0 +1,179 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "sync" + "time" + + "golang.org/x/net/context" + + "github.com/hyperledger/fabric/examples/ccchecker/chaincodes" + "github.com/hyperledger/fabric/peer/common" +) + +//global ccchecker params +var ccchecker *CCChecker + +//CCChecker encapsulates ccchecker properties and runtime +type CCChecker struct { + //Chaincodes to do ccchecker over (see ccchecker.json for defaults) + Chaincodes []*chaincodes.CC + //TimeoutToAbortSecs abort deadline + TimeoutToAbortSecs int + //ChainName name of the chain + ChainName string +} + +//LoadCCCheckerParams read the ccchecker params from a file +func LoadCCCheckerParams(file string) error { + var b []byte + var err error + if b, err = ioutil.ReadFile(file); err != nil { + return fmt.Errorf("Cannot read config file %s\n", err) + } + sp := &CCChecker{} + err = json.Unmarshal(b, &sp) + if err != nil { + return fmt.Errorf("error unmarshalling ccchecker: %s\n", err) + } + + ccchecker = &CCChecker{} + id := 0 + for _, scc := range sp.Chaincodes { + //concurrency <=0 will be dropped + if scc.Concurrency > 0 { + for i := 0; i < scc.Concurrency; i++ { + tmp := &chaincodes.CC{} + *tmp = *scc + tmp.ID = id + id = id + 1 + ccchecker.Chaincodes = append(ccchecker.Chaincodes, tmp) + } + } + } + + ccchecker.TimeoutToAbortSecs = sp.TimeoutToAbortSecs + ccchecker.ChainName = sp.ChainName + + return nil +} + +//CCCheckerInit assigns shadow chaincode to each of the CC from registered shadow chaincodes +func CCCheckerInit() { + if ccchecker == nil { + fmt.Printf("LoadCCCheckerParams needs to be called before init\n") + os.Exit(1) + } + + if err := chaincodes.RegisterCCs(ccchecker.Chaincodes); err != nil { + panic(fmt.Sprintf("%s", err)) + } +} + +//CCCheckerRun main loops that will run the tests and cleanup +func CCCheckerRun(report bool, verbose bool) error { + //connect with Broadcast client + bc, err := common.GetBroadcastClient() + if err != nil { + return err + } + defer bc.Close() + + ec, err := common.GetEndorserClient() + if err != nil { + return err + } + + signer, err := common.GetDefaultSigner() + if err != nil { + return err + } + + //when the wait's timeout and get out of ccchecker, we + //cancel and release all goroutines + ctxt, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ccsWG sync.WaitGroup + ccsWG.Add(len(ccchecker.Chaincodes)) + + //an anonymous struct to hold failures + var failures struct { + sync.Mutex + failedCCs int + } + + //run the invokes + ccerrs := make([]error, len(ccchecker.Chaincodes)) + for _, cc := range ccchecker.Chaincodes { + go func(cc2 *chaincodes.CC) { + if ccerrs[cc2.ID] = cc2.Run(ctxt, ccchecker.ChainName, bc, ec, signer, &ccsWG); ccerrs[cc2.ID] != nil { + failures.Lock() + failures.failedCCs = failures.failedCCs + 1 + failures.Unlock() + } + }(cc) + } + + //wait or timeout + err = ccchecker.wait(&ccsWG) + + //verify results + if err == nil && failures.failedCCs < len(ccchecker.Chaincodes) { + ccsWG = sync.WaitGroup{} + ccsWG.Add(len(ccchecker.Chaincodes) - failures.failedCCs) + for _, cc := range ccchecker.Chaincodes { + go func(cc2 *chaincodes.CC) { + if ccerrs[cc2.ID] == nil { + ccerrs[cc2.ID] = cc2.Validate(ctxt, ccchecker.ChainName, bc, ec, signer, &ccsWG) + } else { + fmt.Printf("Ignoring [%v] for validation as it returned err %s\n", cc2, ccerrs[cc2.ID]) + } + }(cc) + } + + //wait or timeout + err = ccchecker.wait(&ccsWG) + } + + if report { + for _, cc := range ccchecker.Chaincodes { + cc.Report(verbose, ccchecker.ChainName) + } + } + + return err +} + +func (s *CCChecker) wait(ccsWG *sync.WaitGroup) error { + done := make(chan struct{}) + go func() { + ccsWG.Wait() + done <- struct{}{} + }() + select { + case <-done: + return nil + case <-time.After(time.Duration(s.TimeoutToAbortSecs) * time.Second): + return fmt.Errorf("Aborting due to timeoutout!!") + } +} diff --git a/examples/ccchecker/ccchecker.json b/examples/ccchecker/ccchecker.json new file mode 100644 index 00000000000..9c2aebe2d70 --- /dev/null +++ b/examples/ccchecker/ccchecker.json @@ -0,0 +1,17 @@ +{"Chaincodes": + [ + {"Name": "mycc", + "Path": "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke", + "NumFinalQueryAttempts": 10, + "NumberOfIterations": 10, + "DelayBetweenInvokeMs": 1, + "DelayBetweenQueryMs": 10, + "TimeoutToAbortSecs": 60, + "Lang": "GOLANG", + "WaitAfterInvokeMs": 10000, + "Concurrency": 10 + } + ], + "TimeoutToAbortSecs": 60, + "ChainName": "**TEST_CHAINID**" +} diff --git a/examples/ccchecker/chaincodes/chaincodes.go b/examples/ccchecker/chaincodes/chaincodes.go new file mode 100644 index 00000000000..21725726606 --- /dev/null +++ b/examples/ccchecker/chaincodes/chaincodes.go @@ -0,0 +1,339 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaincodes + +import ( + "fmt" + "sync" + "time" + + "github.com/hyperledger/fabric/msp" + "github.com/hyperledger/fabric/peer/chaincode" + "github.com/hyperledger/fabric/peer/common" + pb "github.com/hyperledger/fabric/protos/peer" + + "golang.org/x/net/context" +) + +//ShadowCCIntf interfaces to be implemented by shadow chaincodes +type ShadowCCIntf interface { + //InitShadowCC initializes the shadow chaincode (will be called once for each chaincode) + InitShadowCC() + + //GetInvokeArgs gets invoke arguments from shadow + GetInvokeArgs(ccnum int, iter int) [][]byte + + //PostInvoke passes the retvalue from the invoke to the shadow for post-processing + PostInvoke(args [][]byte, retval []byte) error + + //GetQueryArgs mimics the Invoke and gets the query for an invoke + GetQueryArgs(ccnum int, iter int) [][]byte + + //Validate the results against the query arguments + Validate(args [][]byte, value []byte) error +} + +//CC chaincode properties, config and runtime +type CC struct { + //-------------config properties ------------ + //Name of the chaincode + Name string + + //Path to the chaincode + Path string + + //NumFinalQueryAttempts number of times to try final query before giving up + NumFinalQueryAttempts int + + //NumberOfInvokeIterations number of iterations to do invoke on + NumberOfIterations int + + //DelayBetweenInvokeMs delay between each invoke + DelayBetweenInvokeMs int + + //DelayBetweenQueryMs delay between each query + DelayBetweenQueryMs int + + //TimeoutToAbortSecs timeout for aborting this chaincode processing + TimeoutToAbortSecs int + + //Lang of chaincode + Lang string + + //WaitAfterInvokeMs wait time before validating invokes for this chaincode + WaitAfterInvokeMs int + + //Concurrency number of goroutines to spin + Concurrency int + + //-------------runtime properties ------------ + //Unique number assigned to this CC by CCChecker + ID int + + //shadow CC where the chaincode stats is maintained + shadowCC ShadowCCIntf + + //current iteration of invoke + currentInvokeIter int + + //start of invokes in epoch seconds + invokeStartTime int + + //end of invokes in epoch seconds + invokeEndTime int + + //error that stopped invoke iterations + invokeErr error + + //current iteration of query + currQueryIter []int + + //did the query work ? + queryWorked []bool + + //error on a query in an iteration + queryErrs []error +} + +func (cc *CC) getChaincodeSpec(args [][]byte) *pb.ChaincodeSpec { + return &pb.ChaincodeSpec{ + Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value[cc.Lang]), + ChaincodeID: &pb.ChaincodeID{Path: cc.Path, Name: cc.Name}, + CtorMsg: &pb.ChaincodeInput{Args: args}, + } +} + +//doInvokes calls invoke for each iteration for the chaincode +//Stops at the first invoke with error +//currentInvokeIter contains the number of successful iterations +func (cc *CC) doInvokes(ctxt context.Context, chainID string, + bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, + wg *sync.WaitGroup, quit func() bool) error { + + var err error + for cc.currentInvokeIter = 0; cc.currentInvokeIter < cc.NumberOfIterations; cc.currentInvokeIter++ { + if quit() { + break + } + args := cc.shadowCC.GetInvokeArgs(cc.ID, cc.currentInvokeIter) + + spec := cc.getChaincodeSpec(args) + + if quit() { + break + } + + var pResp *pb.ProposalResponse + if pResp, err = chaincode.ChaincodeInvokeOrQuery(spec, chainID, true, signer, ec, bc); err != nil { + cc.invokeErr = err + break + } + + resp := pResp.Response.Payload + if err = cc.shadowCC.PostInvoke(args, resp); err != nil { + cc.invokeErr = err + break + } + + if quit() { + break + } + + //don't sleep for the last iter + if cc.DelayBetweenInvokeMs > 0 && cc.currentInvokeIter < (cc.NumberOfIterations-1) { + time.Sleep(time.Duration(cc.DelayBetweenInvokeMs) * time.Millisecond) + } + } + + return err +} + +//Run test over given number of iterations +// i will be unique across chaincodes and can be used as a key +// this is useful if chaincode occurs multiple times in the array of chaincodes +func (cc *CC) Run(ctxt context.Context, chainID string, bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, wg *sync.WaitGroup) error { + defer wg.Done() + + var ( + quit bool + err error + ) + + done := make(chan struct{}) + go func() { + defer func() { done <- struct{}{} }() + + //return the quit closure for validation within validateIter + quitF := func() bool { return quit } + + //start of invokes + cc.invokeStartTime = time.Now().Second() + + err = cc.doInvokes(ctxt, chainID, bc, ec, signer, wg, quitF) + + //end of invokes + cc.invokeEndTime = time.Now().Second() + }() + + //we could be done or cancelled or timedout + select { + case <-ctxt.Done(): + quit = true + return nil + case <-done: + return err + case <-time.After(time.Duration(cc.TimeoutToAbortSecs) * time.Second): + quit = true + return fmt.Errorf("Aborting due to timeoutout!!") + } +} + +//validates the invoke iteration for this chaincode +func (cc *CC) validateIter(ctxt context.Context, iter int, chainID string, bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, wg *sync.WaitGroup, quit func() bool) { + defer wg.Done() + args := cc.shadowCC.GetQueryArgs(cc.ID, iter) + + spec := cc.getChaincodeSpec(args) + + //lets try a few times + for cc.currQueryIter[iter] = 0; cc.currQueryIter[iter] < cc.NumFinalQueryAttempts; cc.currQueryIter[iter]++ { + if quit() { + break + } + + var pResp *pb.ProposalResponse + var err error + if pResp, err = chaincode.ChaincodeInvokeOrQuery(spec, chainID, false, signer, ec, bc); err != nil { + cc.queryErrs[iter] = err + break + } + + resp := pResp.Response.Payload + + if quit() { + break + } + + //if it fails, we try again + if err = cc.shadowCC.Validate(args, resp); err == nil { + //appears to have worked + cc.queryWorked[iter] = true + cc.queryErrs[iter] = nil + break + } + + //save query error + cc.queryErrs[iter] = err + + if quit() { + break + } + + //try again + if cc.DelayBetweenQueryMs > 0 { + time.Sleep(time.Duration(cc.DelayBetweenQueryMs) * time.Millisecond) + } + } + + return +} + +//Validate test that was Run. Each successful iteration in the run is validated against +func (cc *CC) Validate(ctxt context.Context, chainID string, bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, wg *sync.WaitGroup) error { + defer wg.Done() + + //this will signal inner validators to get out via + //closure + var quit bool + + //use 1 so sender doesn't block (he doesn't care if is was receivd. + //makes sure goroutine exits) + done := make(chan struct{}, 1) + go func() { + defer func() { done <- struct{}{} }() + + var innerwg sync.WaitGroup + innerwg.Add(cc.currentInvokeIter) + + //initialize for querying + cc.currQueryIter = make([]int, cc.currentInvokeIter) + cc.queryWorked = make([]bool, cc.currentInvokeIter) + cc.queryErrs = make([]error, cc.currentInvokeIter) + + //give some time for the invokes to commit for this cc + time.Sleep(time.Duration(cc.WaitAfterInvokeMs) * time.Millisecond) + + //return the quit closure for validation within validateIter + quitF := func() bool { return quit } + + //try only till successful invoke iterations + for i := 0; i < cc.currentInvokeIter; i++ { + go func(iter int) { + cc.validateIter(ctxt, iter, chainID, bc, ec, signer, &innerwg, quitF) + }(i) + } + + //shouldn't block the sender go routine on cleanup + qDone := make(chan struct{}, 1) + + //wait for the above queries to be done + go func() { innerwg.Wait(); qDone <- struct{}{} }() + + //we could be done or cancelled + select { + case <-qDone: + case <-ctxt.Done(): + } + }() + + //we could be done or cancelled or timedout + select { + case <-ctxt.Done(): + //we don't know why it was cancelled but it was cancelled + quit = true + return nil + case <-done: + //for done does not return an err. The query validation stores chaincode errors + //Only error that's left to handle is timeout error for this chaincode below + return nil + case <-time.After(time.Duration(cc.TimeoutToAbortSecs) * time.Second): + quit = true + return fmt.Errorf("Aborting due to timeoutout!!") + } +} + +//Report reports chaincode test execution, iter by iter +func (cc *CC) Report(verbose bool, chainID string) { + fmt.Printf("%s/%s(%d)\n", cc.Name, chainID, cc.ID) + fmt.Printf("\tNum successful invokes: %d(%d)\n", cc.currentInvokeIter, cc.NumberOfIterations) + if cc.invokeErr != nil { + fmt.Printf("\tError on invoke: %s\n", cc.invokeErr) + } + //test to see if validate was called (validate alloc the arrays, one of which is queryWorked) + if cc.queryWorked != nil { + for i := 0; i < cc.currentInvokeIter; i++ { + fmt.Printf("\tQuery(%d) : succeeded-%t, num trials-%d(%d), error if any(%s)\n", i, cc.queryWorked[i], cc.currQueryIter[i], cc.NumFinalQueryAttempts, cc.queryErrs[i]) + } + } else { + fmt.Printf("\tQuery validation appears not have been performed(#invokes-%d). timed out ?\n", cc.currentInvokeIter) + } + //total actual time for cc.currentInvokeIter + invokeTime := (cc.invokeEndTime-cc.invokeStartTime)*1000 - (cc.DelayBetweenInvokeMs * (cc.currentInvokeIter - 1)) + fmt.Printf("\tTime for invokes(ms): %d\n", invokeTime) + + fmt.Printf("\tFinal query worked ? %t\n", cc.queryWorked) +} diff --git a/examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go b/examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go new file mode 100644 index 00000000000..c36ef974b5e --- /dev/null +++ b/examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go @@ -0,0 +1,64 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + + "github.com/hyperledger/fabric/core/chaincode/shim" +) + +// NewKeyPerInvoke is allows the following transactions +// "put", "key", val - returns "OK" on success +// "get", "key" - returns val stored previously +type NewKeyPerInvoke struct { +} + +//Init implements chaincode's Init interface +func (t *NewKeyPerInvoke) Init(stub shim.ChaincodeStubInterface) ([]byte, error) { + return nil, nil +} + +//Invoke implements chaincode's Invoke interface +func (t *NewKeyPerInvoke) Invoke(stub shim.ChaincodeStubInterface) ([]byte, error) { + args := stub.GetArgs() + if len(args) < 2 { + return nil, fmt.Errorf("invalid number of args %d", len(args)) + } + f := string(args[0]) + if f == "put" { + if len(args) < 3 { + return nil, fmt.Errorf("invalid number of args for put %d", len(args)) + } + err := stub.PutState(string(args[1]), args[2]) + if err != nil { + return nil, err + } + return []byte("OK"), nil + } else if f == "get" { + // Get the state from the ledger + return stub.GetState(string(args[1])) + } + return nil, fmt.Errorf("unknown function %s", f) +} + +func main() { + err := shim.Start(new(NewKeyPerInvoke)) + if err != nil { + fmt.Printf("Error starting New key per invoke: %s", err) + } +} diff --git a/examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go b/examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go new file mode 100644 index 00000000000..74ff80155a5 --- /dev/null +++ b/examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go @@ -0,0 +1,110 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package shadow + +import ( + "bytes" + "fmt" + "sync" +) + +// NewKeyPerInvoke is the shadow implementation for NewKeyPerInvoke in the parent package +// The shadow provides invoke arguments that are guaranteed to be result in unique ledger +// entries as long as the parameters to GetInvokeArgs are unique +type NewKeyPerInvoke struct { + sync.Mutex + state map[string][]byte +} + +//---------- implements ShadowCCIntf functions ------- + +//InitShadowCC initializes CC +func (t *NewKeyPerInvoke) InitShadowCC() { + t.state = make(map[string][]byte) +} + +//setState sets the state +func (t *NewKeyPerInvoke) setState(key []byte, val []byte) { + t.Lock() + t.state[string(key)] = val + t.Unlock() +} + +//getState gets the state +func (t *NewKeyPerInvoke) getState(key []byte) ([]byte, bool) { + t.Lock() + defer t.Unlock() + v, ok := t.state[string(key)] + return v, ok +} + +//GetInvokeArgs get args for invoke based on chaincode ID and iteration num +func (t *NewKeyPerInvoke) GetInvokeArgs(ccnum int, iter int) [][]byte { + args := make([][]byte, 3) + args[0] = []byte("put") + args[1] = []byte(fmt.Sprintf("%d_%d", ccnum, iter)) + args[2] = []byte(fmt.Sprintf("%d", ccnum)) + + return args +} + +//PostInvoke store the the key/val for later verification +func (t *NewKeyPerInvoke) PostInvoke(args [][]byte, resp []byte) error { + if len(args) < 3 { + return fmt.Errorf("invalid number of args posted %d", len(args)) + } + + if string(args[0]) != "put" { + return fmt.Errorf("invalid args posted %s", args[0]) + } + + //the actual CC should have returned OK for success + if string(resp) != "OK" { + return fmt.Errorf("invalid response %s", string(resp)) + } + + t.setState(args[1], args[2]) + + return nil +} + +//Validate the key/val with mem storage +func (t *NewKeyPerInvoke) Validate(args [][]byte, value []byte) error { + if len(args) < 2 { + return fmt.Errorf("invalid number of args for validate %d", len(args)) + } + + if string(args[0]) != "get" { + return fmt.Errorf("invalid validate function %s", args[0]) + } + + if v, ok := t.getState(args[1]); !ok { + return fmt.Errorf("key not found %s", args[1]) + } else if !bytes.Equal(v, value) { + return fmt.Errorf("expected(%s) but found (%s)", string(v), string(value)) + } + + return nil +} + +//GetQueryArgs returns the query for the iter to test against +func (t *NewKeyPerInvoke) GetQueryArgs(ccnum int, iter int) [][]byte { + args := make([][]byte, 2) + args[0] = []byte("get") + args[1] = []byte(fmt.Sprintf("%d_%d", ccnum, iter)) + return args +} diff --git a/examples/ccchecker/chaincodes/registershadow.go b/examples/ccchecker/chaincodes/registershadow.go new file mode 100644 index 00000000000..b57d3d9a264 --- /dev/null +++ b/examples/ccchecker/chaincodes/registershadow.go @@ -0,0 +1,60 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaincodes + +import ( + "fmt" + + //shadow chaincodes to be registered + nkpi "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke/shadow" +) + +//all the statically registered shadow chaincodes that can be used +var shadowCCs = map[string]ShadowCCIntf{ + "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke": &nkpi.NewKeyPerInvoke{}, +} + +//RegisterCCs registers all possible chaincodes that can be used in test +func RegisterCCs(ccs []*CC) error { + inUse := make(map[string]ShadowCCIntf) + for _, cc := range ccs { + scc, ok := shadowCCs[cc.Path] + if !ok || scc == nil { + return fmt.Errorf("%s not a registered chaincode", cc.Path) + } + if _, ok := inUse[cc.Path]; !ok { + inUse[cc.Path] = scc + } + //setup the shadow chaincode to plug into the ccchecker framework + cc.shadowCC = scc + } + + //initialize a shadow chaincode just once. A chaincode may be used + //multiple times in test run + for _, cc := range inUse { + cc.InitShadowCC() + } + + return nil +} + +//ListShadowCCs lists all registered shadow ccs in the library +func ListShadowCCs() { + for key := range shadowCCs { + fmt.Printf("\t%s\n", key) + } +} diff --git a/examples/ccchecker/init.go b/examples/ccchecker/init.go new file mode 100644 index 00000000000..85ca1eccc8d --- /dev/null +++ b/examples/ccchecker/init.go @@ -0,0 +1,79 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + "strings" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/hyperledger/fabric/peer/common" +) + +//This is where all initializations take place. These closley follow CLI +//initializations. + +//read CC checker configuration from -s . Defaults to ccchecker.json +func initCCCheckerParams(mainFlags *pflag.FlagSet) { + configFile := "" + mainFlags.StringVarP(&configFile, "config", "s", "ccchecker.json", "CC Checker config file ") + + err := LoadCCCheckerParams(configFile) + if err != nil { + fmt.Printf("error unmarshalling ccchecker: %s\n", err) + os.Exit(1) + } +} + +//read yaml file from -y . Defaults to ../../peer +func initYaml(mainFlags *pflag.FlagSet) { + // For environment variables. + viper.SetEnvPrefix(cmdRoot) + viper.AutomaticEnv() + replacer := strings.NewReplacer(".", "_") + viper.SetEnvKeyReplacer(replacer) + + pathToYaml := "" + mainFlags.StringVarP(&pathToYaml, "yamlfile", "y", "../../peer", "Path to core.yaml defined for peer") + + err := common.InitConfig(cmdRoot) + if err != nil { // Handle errors reading the config file + fmt.Printf("Fatal error when reading %s config file: %s\n", cmdRoot, err) + os.Exit(2) + } +} + +//initialize MSP from -m . Defaults to ../../msp/sampleconfig +func initMSP(mainFlags *pflag.FlagSet) { + mspMgrConfigDir := "" + mainFlags.StringVarP(&mspMgrConfigDir, "mspcfgdir", "m", "../../msp/sampleconfig/", "Path to MSP dir") + + err := common.InitCrypto(mspMgrConfigDir) + if err != nil { + panic(err.Error()) + } +} + +//InitCCCheckerEnv initialize the CCChecker environment +func InitCCCheckerEnv(mainFlags *pflag.FlagSet) { + initCCCheckerParams(mainFlags) + initYaml(mainFlags) + initMSP(mainFlags) +} diff --git a/examples/ccchecker/main.go b/examples/ccchecker/main.go new file mode 100644 index 00000000000..138d5f3b75f --- /dev/null +++ b/examples/ccchecker/main.go @@ -0,0 +1,62 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + + "github.com/op/go-logging" + "github.com/spf13/cobra" + + _ "net/http/pprof" +) + +var logger = logging.MustGetLogger("main") + +// Constants go here. +const cmdRoot = "core" + +// The main command describes the service and +// defaults to printing the help message. +var mainCmd = &cobra.Command{ + Use: "", + Run: func(cmd *cobra.Command, args []string) { + run(args) + }, +} + +func main() { + mainFlags := mainCmd.PersistentFlags() + + //initialize the env + InitCCCheckerEnv(mainFlags) + + // On failure Cobra prints the usage message and error string, so we only + // need to exit with a non-0 status + if mainCmd.Execute() != nil { + os.Exit(1) + } +} + +func run(args []string) { + CCCheckerInit() + //TODO make parameters out of report and verbose + CCCheckerRun(true, true) + fmt.Printf("Test complete\n") + return +} diff --git a/peer/chaincode/common.go b/peer/chaincode/common.go index 7517b814108..aaffa1b235c 100755 --- a/peer/chaincode/common.go +++ b/peer/chaincode/common.go @@ -145,65 +145,18 @@ func getChaincodeSpecification(cmd *cobra.Command) (*pb.ChaincodeSpec, error) { return spec, nil } -// chaincodeInvokeOrQuery invokes or queries the chaincode. If successful, the -// INVOKE form prints the ProposalResponse to STDOUT, and the QUERY form prints -// the query result on STDOUT. A command-line flag (-r, --raw) determines -// whether the query result is output as raw bytes, or as a printable string. -// The printable form is optionally (-x, --hex) a hexadecimal representation -// of the query response. If the query response is NIL, nothing is output. -// -// NOTE - Query will likely go away as all interactions with the endorser are -// Proposal and ProposalResponses func chaincodeInvokeOrQuery(cmd *cobra.Command, args []string, invoke bool, cf *ChaincodeCmdFactory) (err error) { spec, err := getChaincodeSpecification(cmd) if err != nil { return err } - // Build the ChaincodeInvocationSpec message - invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec} - if customIDGenAlg != common.UndefinedParamValue { - invocation.IdGenerationAlg = customIDGenAlg - } - - creator, err := cf.Signer.Serialize() + proposalResp, err := ChaincodeInvokeOrQuery(spec, chainID, invoke, cf.Signer, cf.EndorserClient, cf.BroadcastClient) if err != nil { - return fmt.Errorf("Error serializing identity for %s: %s\n", cf.Signer.GetIdentifier(), err) - } - - uuid := cutil.GenerateUUID() - - var prop *pb.Proposal - prop, err = putils.CreateProposalFromCIS(uuid, chainID, invocation, creator) - if err != nil { - return fmt.Errorf("Error creating proposal %s: %s\n", chainFuncName, err) - } - - var signedProp *pb.SignedProposal - signedProp, err = putils.GetSignedProposal(prop, cf.Signer) - if err != nil { - return fmt.Errorf("Error creating signed proposal %s: %s\n", chainFuncName, err) - } - - var proposalResp *pb.ProposalResponse - proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp) - if err != nil { - return fmt.Errorf("Error endorsing %s: %s\n", chainFuncName, err) + return err } if invoke { - if proposalResp != nil { - // assemble a signed transaction (it's an Envelope message) - env, err := putils.CreateSignedTx(prop, cf.Signer, proposalResp) - if err != nil { - return fmt.Errorf("Could not assemble transaction, err %s", err) - } - - // send the envelope for ordering - if err = cf.BroadcastClient.Send(env); err != nil { - return fmt.Errorf("Error sending transaction %s: %s\n", chainFuncName, err) - } - } logger.Infof("Invoke result: %v", proposalResp) } else { if proposalResp == nil { @@ -225,8 +178,7 @@ func chaincodeInvokeOrQuery(cmd *cobra.Command, args []string, invoke bool, cf * } } } - - return nil + return err } func checkChaincodeCmdParams(cmd *cobra.Command) error { @@ -301,3 +253,67 @@ func InitCmdFactory() (*ChaincodeCmdFactory, error) { BroadcastClient: broadcastClient, }, nil } + +// ChaincodeInvokeOrQuery invokes or queries the chaincode. If successful, the +// INVOKE form prints the ProposalResponse to STDOUT, and the QUERY form prints +// the query result on STDOUT. A command-line flag (-r, --raw) determines +// whether the query result is output as raw bytes, or as a printable string. +// The printable form is optionally (-x, --hex) a hexadecimal representation +// of the query response. If the query response is NIL, nothing is output. +// +// NOTE - Query will likely go away as all interactions with the endorser are +// Proposal and ProposalResponses +func ChaincodeInvokeOrQuery(spec *pb.ChaincodeSpec, cID string, invoke bool, signer msp.SigningIdentity, endorserClient pb.EndorserClient, bc common.BroadcastClient) (*pb.ProposalResponse, error) { + // Build the ChaincodeInvocationSpec message + invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec} + if customIDGenAlg != common.UndefinedParamValue { + invocation.IdGenerationAlg = customIDGenAlg + } + + creator, err := signer.Serialize() + if err != nil { + return nil, fmt.Errorf("Error serializing identity for %s: %s", signer.GetIdentifier(), err) + } + + uuid := cutil.GenerateUUID() + + funcName := "invoke" + if !invoke { + funcName = "query" + } + + var prop *pb.Proposal + prop, err = putils.CreateProposalFromCIS(uuid, cID, invocation, creator) + if err != nil { + return nil, fmt.Errorf("Error creating proposal %s: %s", funcName, err) + } + + var signedProp *pb.SignedProposal + signedProp, err = putils.GetSignedProposal(prop, signer) + if err != nil { + return nil, fmt.Errorf("Error creating signed proposal %s: %s", funcName, err) + } + + var proposalResp *pb.ProposalResponse + proposalResp, err = endorserClient.ProcessProposal(context.Background(), signedProp) + if err != nil { + return nil, fmt.Errorf("Error endorsing %s: %s", funcName, err) + } + + if invoke { + if proposalResp != nil { + // assemble a signed transaction (it's an Envelope message) + env, err := putils.CreateSignedTx(prop, signer, proposalResp) + if err != nil { + return proposalResp, fmt.Errorf("Could not assemble transaction, err %s", err) + } + + // send the envelope for ordering + if err = bc.Send(env); err != nil { + return proposalResp, fmt.Errorf("Error sending transaction %s: %s", funcName, err) + } + } + } + + return proposalResp, nil +} diff --git a/peer/common/common.go b/peer/common/common.go index 1f693bf923b..b18f6747dc8 100755 --- a/peer/common/common.go +++ b/peer/common/common.go @@ -18,7 +18,10 @@ package common import ( "fmt" + "os" + "path/filepath" + "github.com/hyperledger/fabric/core/crypto/primitives" "github.com/hyperledger/fabric/core/errors" "github.com/hyperledger/fabric/core/flogging" "github.com/hyperledger/fabric/core/peer" @@ -31,6 +34,52 @@ import ( // UndefinedParamValue defines what undefined parameters in the command line will initialise to const UndefinedParamValue = "" +//InitConfig initializes viper config +func InitConfig(cmdRoot string) error { + var alternativeCfgPath = os.Getenv("PEER_CFG_PATH") + if alternativeCfgPath != "" { + viper.AddConfigPath(alternativeCfgPath) // Path to look for the config file in + } else { + viper.AddConfigPath("./") // Path to look for the config file in + // Path to look for the config file in based on GOPATH + gopath := os.Getenv("GOPATH") + for _, p := range filepath.SplitList(gopath) { + peerpath := filepath.Join(p, "src/github.com/hyperledger/fabric/peer") + viper.AddConfigPath(peerpath) + } + } + + // Now set the configuration file. + viper.SetConfigName(cmdRoot) // Name of config file (without extension) + + err := viper.ReadInConfig() // Find and read the config file + if err != nil { // Handle errors reading the config file + return fmt.Errorf("Fatal error when reading %s config file: %s\n", cmdRoot, err) + } + + return nil +} + +//InitCrypto initializes crypto for this peer +func InitCrypto(mspMgrConfigDir string) error { + // Init the crypto layer + //TODO: integrate new crypto / idp code + primitives.SetSecurityLevel("SHA2", 256) + + // FIXME: when this peer joins a chain, it should get the + // config for that chain with the list of MSPs that the + // chain uses; however this is not yet implemented. + // Additionally, we might always want to have an MSP for + // the local test chain so that we can run tests with the + // peer CLI. This is why we create this fake setup here for now + err := mspmgmt.LoadFakeSetupWithLocalMspAndTestChainMsp(mspMgrConfigDir) + if err != nil { + return fmt.Errorf("Fatal error when setting up MSP from directory %s: err %s\n", mspMgrConfigDir, err) + } + + return nil +} + // GetEndorserClient returns a new endorser client connection for this peer func GetEndorserClient() (pb.EndorserClient, error) { clientConn, err := peer.NewPeerClientConnection() diff --git a/peer/main.go b/peer/main.go index ae5c283c46f..10aa660d613 100644 --- a/peer/main.go +++ b/peer/main.go @@ -19,7 +19,6 @@ package main import ( "fmt" "os" - "path/filepath" "runtime" "strings" @@ -30,11 +29,10 @@ import ( _ "net/http/pprof" "github.com/hyperledger/fabric/core" - "github.com/hyperledger/fabric/core/crypto/primitives" "github.com/hyperledger/fabric/core/flogging" - "github.com/hyperledger/fabric/core/peer/msp" "github.com/hyperledger/fabric/peer/chaincode" "github.com/hyperledger/fabric/peer/clilogging" + "github.com/hyperledger/fabric/peer/common" "github.com/hyperledger/fabric/peer/node" "github.com/hyperledger/fabric/peer/version" ) @@ -83,26 +81,9 @@ func main() { testCoverProfile := "" mainFlags.StringVarP(&testCoverProfile, "test.coverprofile", "", "coverage.cov", "Done") - var alternativeCfgPath = os.Getenv("PEER_CFG_PATH") - if alternativeCfgPath != "" { - logger.Infof("User defined config file path: %s", alternativeCfgPath) - viper.AddConfigPath(alternativeCfgPath) // Path to look for the config file in - } else { - viper.AddConfigPath("./") // Path to look for the config file in - // Path to look for the config file in based on GOPATH - gopath := os.Getenv("GOPATH") - for _, p := range filepath.SplitList(gopath) { - peerpath := filepath.Join(p, "src/github.com/hyperledger/fabric/peer") - viper.AddConfigPath(peerpath) - } - } - - // Now set the configuration file. - viper.SetConfigName(cmdRoot) // Name of config file (without extension) - - err := viper.ReadInConfig() // Find and read the config file - if err != nil { // Handle errors reading the config file - panic(fmt.Errorf("Fatal error when reading %s config file: %s\n", cmdRoot, err)) + err := common.InitConfig(cmdRoot) + if err != nil { // Handle errors reading the config file + panic(fmt.Errorf("Fatal error when initializing %s config : %s\n", cmdRoot, err)) } mainCmd.AddCommand(version.Cmd()) @@ -112,13 +93,10 @@ func main() { runtime.GOMAXPROCS(viper.GetInt("peer.gomaxprocs")) - // Init the crypto layer - //TODO: integrate new crypto / idp code - primitives.SetSecurityLevel("SHA2", 256) - // Init the MSP // TODO: determine the location of this config file var mspMgrConfigDir string + var alternativeCfgPath = os.Getenv("PEER_CFG_PATH") if alternativeCfgPath != "" { mspMgrConfigDir = alternativeCfgPath + "/msp/sampleconfig/" } else if _, err := os.Stat("./msp/sampleconfig/"); err == nil { @@ -127,17 +105,10 @@ func main() { mspMgrConfigDir = os.Getenv("GOPATH") + "/src/github.com/hyperledger/fabric/msp/sampleconfig/" } - // FIXME: when this peer joins a chain, it should get the - // config for that chain with the list of MSPs that the - // chain uses; however this is not yet implemented. - // Additionally, we might always want to have an MSP for - // the local test chain so that we can run tests with the - // peer CLI. This is why we create this fake setup here for now - err = mspmgmt.LoadFakeSetupWithLocalMspAndTestChainMsp(mspMgrConfigDir) - if err != nil { - panic(fmt.Errorf("Fatal error when setting up MSP from directory %s: err %s\n", mspMgrConfigDir, err)) + err = common.InitCrypto(mspMgrConfigDir) + if err != nil { // Handle errors reading the config file + panic(err.Error()) } - // On failure Cobra prints the usage message and error string, so we only // need to exit with a non-0 status if mainCmd.Execute() != nil {