diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index 7309d43780c..04c4ed1fe87 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -624,9 +624,11 @@ func invokeExample02Transaction(ctxt context.Context, cccid *ccprovider.CCContex } const ( - chaincodeExample02GolangPath = "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example02" - chaincodeExample02JavaPath = "../../examples/chaincode/java/chaincode_example02" - chaincodeExample06JavaPath = "../../examples/chaincode/java/chaincode_example06" + chaincodeExample02GolangPath = "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example02" + chaincodeEventSenderGolangPath = "github.com/hyperledger/fabric/examples/chaincode/go/eventsender" + chaincodeExample02JavaPath = "../../examples/chaincode/java/chaincode_example02" + chaincodeExample06JavaPath = "../../examples/chaincode/java/chaincode_example06" + chaincodeEventSenderJavaPath = "../../examples/chaincode/java/eventsender" ) func runChaincodeInvokeChaincode(t *testing.T, chainID1 string, chainID2 string, _ string) (err error) { @@ -1307,7 +1309,17 @@ func TestQueries(t *testing.T) { } func TestGetEvent(t *testing.T) { + + testCases := []struct { + chaincodeType pb.ChaincodeSpec_Type + chaincodePath string + }{ + {pb.ChaincodeSpec_GOLANG, chaincodeEventSenderGolangPath}, + {pb.ChaincodeSpec_JAVA, chaincodeEventSenderJavaPath}, + } + chainID := util.GetTestChainID() + var nextBlockNumber uint64 lis, err := initPeer(chainID) if err != nil { @@ -1315,58 +1327,68 @@ func TestGetEvent(t *testing.T) { t.Logf("Error creating peer: %s", err) } + nextBlockNumber++ + defer finitPeer(lis, chainID) - var ctxt = context.Background() + for _, tc := range testCases { + t.Run(tc.chaincodeType.String(), func(t *testing.T) { - url := "github.com/hyperledger/fabric/examples/chaincode/go/eventsender" + if tc.chaincodeType == pb.ChaincodeSpec_JAVA && runtime.GOARCH != "amd64" { + t.Skip("No Java chaincode support yet on non-x86_64.") + } - cID := &pb.ChaincodeID{Name: "esender", Path: url, Version: "0"} - f := "init" - spec := &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: util.ToChaincodeArgs(f)}} + var ctxt = context.Background() - cccid := ccprovider.NewCCContext(chainID, "esender", "0", "", false, nil, nil) - var nextBlockNumber uint64 = 1 - _, err = deploy(ctxt, cccid, spec, nextBlockNumber) - nextBlockNumber++ - ccID := spec.ChaincodeId.Name - if err != nil { - t.Fail() - t.Logf("Error initializing chaincode %s(%s)", ccID, err) - theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) - return - } + cID := &pb.ChaincodeID{Name: generateChaincodeName(tc.chaincodeType), Path: tc.chaincodePath, Version: "0"} + f := "init" + spec := &pb.ChaincodeSpec{Type: tc.chaincodeType, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: util.ToChaincodeArgs(f)}} - time.Sleep(time.Second) + cccid := ccprovider.NewCCContext(chainID, cID.Name, cID.Version, "", false, nil, nil) + _, err = deploy(ctxt, cccid, spec, nextBlockNumber) + nextBlockNumber++ + ccID := spec.ChaincodeId.Name + if err != nil { + t.Fail() + t.Logf("Error initializing chaincode %s(%s)", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } - args := util.ToChaincodeArgs("invoke", "i", "am", "satoshi") + time.Sleep(time.Second) - spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + args := util.ToChaincodeArgs("invoke", "i", "am", "satoshi") - var ccevt *pb.ChaincodeEvent - ccevt, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber, nil) + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} - if err != nil { - t.Logf("Error invoking chaincode %s(%s)", ccID, err) - t.Fail() - } + var ccevt *pb.ChaincodeEvent + ccevt, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber, nil) + nextBlockNumber++ - if ccevt == nil { - t.Logf("Error ccevt is nil %s(%s)", ccID, err) - t.Fail() - } + if err != nil { + t.Logf("Error invoking chaincode %s(%s)", ccID, err) + t.Fail() + } - if ccevt.ChaincodeId != ccID { - t.Logf("Error ccevt id(%s) != cid(%s)", ccevt.ChaincodeId, ccID) - t.Fail() - } + if ccevt == nil { + t.Logf("Error ccevt is nil %s(%s)", ccID, err) + t.Fail() + } - if strings.Index(string(ccevt.Payload), "i,am,satoshi") < 0 { - t.Logf("Error expected event not found (%s)", string(ccevt.Payload)) - t.Fail() + if ccevt.ChaincodeId != ccID { + t.Logf("Error ccevt id(%s) != cid(%s)", ccevt.ChaincodeId, ccID) + t.Fail() + } + + if strings.Index(string(ccevt.Payload), "i,am,satoshi") < 0 { + t.Logf("Error expected event not found (%s)", string(ccevt.Payload)) + t.Fail() + } + + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + }) } - theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) } // Test the execution of a chaincode that queries another chaincode diff --git a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeBase.java b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeBase.java index a1715588e5e..0cf243a930a 100644 --- a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeBase.java +++ b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeBase.java @@ -174,9 +174,9 @@ public void chatWithPeer(ManagedChannel connection) { @Override public void onNext(ChaincodeMessage message) { - logger.info("Got message from peer: " + toJsonString(message)); + logger.debug("Got message from peer: " + toJsonString(message)); try { - logger.info(String.format("[%s]Received message %s from org.hyperledger.fabric.shim", + logger.debug(String.format("[%s]Received message %s from org.hyperledger.fabric.shim", Handler.shortID(message.getTxid()), message.getType())); handler.handleMessage(message); } catch (Exception e) { diff --git a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeStub.java b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeStub.java index d5cc9bb3141..8c540e40475 100644 --- a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeStub.java +++ b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/ChaincodeStub.java @@ -1,217 +1,251 @@ -/* -Copyright DTCC 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package org.hyperledger.fabric.shim; - -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.google.protobuf.ByteString; - -public class ChaincodeStub { - - private static Log logger = LogFactory.getLog(ChaincodeStub.class); - - private final String uuid; - private final Handler handler; - private final List args; - - public ChaincodeStub(String uuid, Handler handler, List args) { - this.uuid = uuid; - this.handler = handler; - this.args = Collections.unmodifiableList(args); - } - - public List getArgs() { - return args - .stream() - .map(x -> x.toByteArray()) - .collect(Collectors.toList()); - } - - public List getArgsAsStrings() { - return args - .stream() - .map(x -> x.toStringUtf8()) - .collect(Collectors.toList()); - } - - /** - * Gets the UUID of this stub - * - * @return the id used to identify this communication channel - */ - public String getUuid() { - return uuid; - } - - /** - * Get the state of the provided key from the ledger, and returns is as a string - * - * @param key the key of the desired state - * @return the String value of the requested state - */ - public String getState(String key) { - return handler.handleGetState(key, uuid).toStringUtf8(); - } - - /** - * Puts the given state into a ledger, automatically wrapping it in a ByteString - * - * @param key reference key - * @param value value to be put - */ - public void putState(String key, String value) { - handler.handlePutState(key, ByteString.copyFromUtf8(value), uuid); - } - - /** - * Deletes the state of the given key from the ledger - * - * @param key key of the state to be deleted - */ - public void delState(String key) { - handler.handleDeleteState(key, uuid); - } - /** - * Given a start key and end key, this method returns a map of items with value converted to UTF-8 string. - * - * @param startKey - * @param endKey - * @return - */ - //TODO: Uncomment and fix range query with new proto type - /* - public Map getStateByRange(String startKey, String endKey) { - Map retMap = new HashMap<>(); - for (Map.Entry item : getStateByRangeRaw(startKey, endKey).entrySet()) { - retMap.put(item.getKey(), item.getValue().toStringUtf8()); - } - return retMap; - } - */ - /** - * This method is same as getStateByRange, except it returns value in ByteString, useful in cases where - * serialized object can be retrieved. - * - * @param startKey - * @param endKey - * @return - */ - //TODO: Uncomment and fix range query with new proto type - /* - public Map getStateByRangeRaw(String startKey, String endKey) { - Map map = new HashMap<>(); - for (ChaincodeShim.QueryStateKeyValue mapping : handler.handleGetStateByRange( - startKey, endKey, uuid).getKeysAndValuesList()) { - map.put(mapping.getKey(), mapping.getValue()); - } - return map; - } - */ - - /** - * Given a partial composite key, this method returns a map of items (whose key's prefix - * matches the given partial composite key) with value converted to UTF-8 string and - * this methid should be used only for a partial composite key; For a full composite key, - * an iter with empty response would be returned. - * - * @param startKey - * @param endKey - * @return - */ - - //TODO: Uncomment and fix range query with new proto type - /* - public Map getStateByPartialCompositeKey(String objectType, String[] attributes) { - String partialCompositeKey = new String(); - partialCompositeKey = createCompositeKey(objectType, attributes); - return getStateByRange(partialCompositeKey+"1", partialCompositeKey+":"); - } - */ - - /** - * Given a set of attributes, this method combines these attributes to return a composite key. - * - * @param objectType - * @param attributes - * @return - */ - public String createCompositeKey(String objectType, String[] attributes) { - String compositeKey = new String(); - compositeKey = compositeKey + objectType; - for (String attribute : attributes) { - compositeKey = compositeKey + attribute.length() + attribute; - } - return compositeKey; - } - - /** - * @param chaincodeName - * @param function - * @param args - * @return - */ - public String invokeChaincode(String chaincodeName, String function, List args) { - return handler.handleInvokeChaincode(chaincodeName, function, args, uuid).toStringUtf8(); - } - - //------RAW CALLS------ - - /** - * @param key - * @return - */ - public ByteString getRawState(String key) { - return handler.handleGetState(key, uuid); - } - - /** - * @param key - * @param value - */ - public void putRawState(String key, ByteString value) { - handler.handlePutState(key, value, uuid); - } - - /** - * - * @param startKey - * @param endKey - * @param limit - * @return - */ -// public GetStateByRangeResponse getStateByRangeRaw(String startKey, String endKey, int limit) { -// return handler.handleGetStateByRange(startKey, endKey, limit, uuid); -// } - - /** - * Invokes the provided chaincode with the given function and arguments, and returns the - * raw ByteString value that invocation generated. - * - * @param chaincodeName The name of the chaincode to invoke - * @param function the function parameter to pass to the chaincode - * @param args the arguments to be provided in the chaincode call - * @return the value returned by the chaincode call - */ - public ByteString invokeRawChaincode(String chaincodeName, String function, List args) { - return handler.handleInvokeChaincode(chaincodeName, function, args, uuid); - } -} +/* +Copyright DTCC, IBM 2016, 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.hyperledger.fabric.shim; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.hyperledger.fabric.protos.peer.ChaincodeEventPackage.ChaincodeEvent; + +import com.google.protobuf.ByteString; + +public class ChaincodeStub { + + private static Log logger = LogFactory.getLog(ChaincodeStub.class); + + private final String uuid; + private final Handler handler; + private final List args; + private ChaincodeEvent event; + + public ChaincodeStub(String uuid, Handler handler, List args) { + this.uuid = uuid; + this.handler = handler; + this.args = Collections.unmodifiableList(args); + } + + public List getArgs() { + return args.stream().map(x -> x.toByteArray()).collect(Collectors.toList()); + } + + public List getArgsAsStrings() { + return args.stream().map(x -> x.toStringUtf8()).collect(Collectors.toList()); + } + + /** + * Defines the CHAINCODE type event that will be posted to interested + * clients when the chaincode's result is committed to the ledger. + * + * @param name + * Name of event. Cannot be null or empty string. + * @param payload + * Optional event payload. + */ + public void setEvent(String name, byte[] payload) { + if(name == null || name.trim().length() == 0) throw new IllegalArgumentException("Event name cannot be null or empty string."); + if(payload != null) { + this.event = ChaincodeEvent.newBuilder() + .setEventName(name) + .setPayload(ByteString.copyFrom(payload)) + .build(); + } else { + this.event = ChaincodeEvent.newBuilder() + .setEventName(name) + .build(); + } + } + + ChaincodeEvent getEvent() { + return event; + } + + /** + * Gets the UUID of this stub + * + * @return the id used to identify this communication channel + */ + public String getUuid() { + return uuid; + } + + /** + * Get the state of the provided key from the ledger, and returns is as a + * string + * + * @param key + * the key of the desired state + * @return the String value of the requested state + */ + public String getState(String key) { + return handler.handleGetState(key, uuid).toStringUtf8(); + } + + /** + * Puts the given state into a ledger, automatically wrapping it in a + * ByteString + * + * @param key + * reference key + * @param value + * value to be put + */ + public void putState(String key, String value) { + handler.handlePutState(key, ByteString.copyFromUtf8(value), uuid); + } + + /** + * Deletes the state of the given key from the ledger + * + * @param key + * key of the state to be deleted + */ + public void delState(String key) { + handler.handleDeleteState(key, uuid); + } + /** + * Given a start key and end key, this method returns a map of items with + * value converted to UTF-8 string. + * + * @param startKey + * @param endKey + * @return + */ + // TODO: Uncomment and fix range query with new proto type + /* + public Map getStateByRange(String startKey, String endKey) { + Map retMap = new HashMap<>(); + for (Map.Entry item : getStateByRangeRaw(startKey, endKey).entrySet()) { + retMap.put(item.getKey(), item.getValue().toStringUtf8()); + } + return retMap; + } + */ + /** + * This method is same as getStateByRange, except it returns value in + * ByteString, useful in cases where serialized object can be retrieved. + * + * @param startKey + * @param endKey + * @return + */ + // TODO: Uncomment and fix range query with new proto type + /* + public Map getStateByRangeRaw(String startKey, String endKey) { + Map map = new HashMap<>(); + for (ChaincodeShim.QueryStateKeyValue mapping : handler.handleGetStateByRange(startKey, endKey, uuid).getKeysAndValuesList()) { + map.put(mapping.getKey(), mapping.getValue()); + } + return map; + } + */ + + /** + * Given a partial composite key, this method returns a map of items (whose + * key's prefix matches the given partial composite key) with value + * converted to UTF-8 string and this methid should be used only for a + * partial composite key; For a full composite key, an iter with empty + * response would be returned. + * + * @param startKey + * @param endKey + * @return + */ + + // TODO: Uncomment and fix range query with new proto type + /* + public Map getStateByPartialCompositeKey(String objectType, String[] attributes) { + String partialCompositeKey = new String(); + partialCompositeKey = createCompositeKey(objectType, attributes); + return getStateByRange(partialCompositeKey + "1", partialCompositeKey + ":"); + } + */ + + /** + * Given a set of attributes, this method combines these attributes to + * return a composite key. + * + * @param objectType + * @param attributes + * @return + */ + public String createCompositeKey(String objectType, String[] attributes) { + String compositeKey = new String(); + compositeKey = compositeKey + objectType; + for (String attribute : attributes) { + compositeKey = compositeKey + attribute.length() + attribute; + } + return compositeKey; + } + + /** + * @param chaincodeName + * @param function + * @param args + * @return + */ + public String invokeChaincode(String chaincodeName, String function, List args) { + return handler.handleInvokeChaincode(chaincodeName, function, args, uuid).toStringUtf8(); + } + + // ------RAW CALLS------ + + /** + * @param key + * @return + */ + public ByteString getRawState(String key) { + return handler.handleGetState(key, uuid); + } + + /** + * @param key + * @param value + */ + public void putRawState(String key, ByteString value) { + handler.handlePutState(key, value, uuid); + } + + /** + * + * @param startKey + * @param endKey + * @param limit + * @return + */ +// public GetStateByRangeResponse getStateByRangeRaw(String startKey, String endKey, int limit) { +// return handler.handleGetStateByRange(startKey, endKey, limit, uuid); +// } + + /** + * Invokes the provided chaincode with the given function and arguments, and + * returns the raw ByteString value that invocation generated. + * + * @param chaincodeName + * The name of the chaincode to invoke + * @param function + * the function parameter to pass to the chaincode + * @param args + * the arguments to be provided in the chaincode call + * @return the value returned by the chaincode call + */ + public ByteString invokeRawChaincode(String chaincodeName, String function, List args) { + return handler.handleInvokeChaincode(chaincodeName, function, args, uuid); + } +} diff --git a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/Handler.java b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/Handler.java index 862ee214696..b75f06592ff 100644 --- a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/Handler.java +++ b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/Handler.java @@ -1,769 +1,750 @@ -/* -Copyright DTCC, IBM 2016, 2017 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package org.hyperledger.fabric.shim; - -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.COMPLETED; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.DEL_STATE; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.ERROR; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.GET_STATE; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.INIT; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.INVOKE_CHAINCODE; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.PUT_STATE; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.READY; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.REGISTERED; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.RESPONSE; -import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.TRANSACTION; -import static org.hyperledger.fabric.shim.HandlerHelper.newCompletedEventMessage; -import static org.hyperledger.fabric.shim.HandlerHelper.newErrorEventMessage; -import static org.hyperledger.fabric.shim.fsm.CallbackType.AFTER_EVENT; -import static org.hyperledger.fabric.shim.fsm.CallbackType.BEFORE_EVENT; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.hyperledger.fabric.protos.common.Common.Status; -import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeID; -import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeInput; -import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeSpec; -import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage; -import org.hyperledger.fabric.protos.peer.ChaincodeShim.PutStateInfo; -import org.hyperledger.fabric.protos.peer.ProposalResponsePackage.Response; -import org.hyperledger.fabric.shim.fsm.CBDesc; -import org.hyperledger.fabric.shim.fsm.Event; -import org.hyperledger.fabric.shim.fsm.EventDesc; -import org.hyperledger.fabric.shim.fsm.FSM; -import org.hyperledger.fabric.shim.fsm.exceptions.CancelledException; -import org.hyperledger.fabric.shim.fsm.exceptions.NoTransitionException; -import org.hyperledger.fabric.shim.helper.Channel; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import io.grpc.stub.StreamObserver; - -public class Handler { - - private static Log logger = LogFactory.getLog(Handler.class); - - private StreamObserver chatStream; - private ChaincodeBase chaincode; - - private Map isTransaction; - private Map> responseChannel; - public Channel nextState; - - private FSM fsm; - - public Handler(StreamObserver chatStream, ChaincodeBase chaincode) { - this.chatStream = chatStream; - this.chaincode = chaincode; - - responseChannel = new HashMap>(); - isTransaction = new HashMap(); - nextState = new Channel(); - - fsm = new FSM("created"); - - fsm.addEvents( - // Event Name From To - new EventDesc(REGISTERED.toString(), "created", "established"), - new EventDesc(READY.toString(), "established", "ready"), - new EventDesc(ERROR.toString(), "init", "established"), - new EventDesc(RESPONSE.toString(), "init", "init"), - new EventDesc(INIT.toString(), "ready", "ready"), - new EventDesc(TRANSACTION.toString(), "ready", "ready"), - new EventDesc(RESPONSE.toString(), "ready", "ready"), - new EventDesc(ERROR.toString(), "ready", "ready"), - new EventDesc(COMPLETED.toString(), "init", "ready"), - new EventDesc(COMPLETED.toString(), "ready", "ready") - ); - - fsm.addCallbacks( - // Type Trigger Callback - new CBDesc(BEFORE_EVENT, REGISTERED.toString(), (event) -> beforeRegistered(event)), - new CBDesc(AFTER_EVENT, RESPONSE.toString(), (event) -> afterResponse(event)), - new CBDesc(AFTER_EVENT, ERROR.toString(), (event) -> afterError(event)), - new CBDesc(BEFORE_EVENT, INIT.toString(), (event) -> beforeInit(event)), - new CBDesc(BEFORE_EVENT, TRANSACTION.toString(),(event) -> beforeTransaction(event)) - ); - } - - public static String shortID(String uuid) { - if (uuid.length() < 8) { - return uuid; - } else { - return uuid.substring(0, 8); - } - } - - public void triggerNextState(ChaincodeMessage message, boolean send) { - if(logger.isTraceEnabled())logger.trace("triggerNextState for message "+message); - nextState.add(new NextStateInfo(message, send)); - } - - public synchronized void serialSend(ChaincodeMessage message) { - logger.debug("Sending message to peer: " + ChaincodeBase.toJsonString(message)); - try { - chatStream.onNext(message); - } catch (Exception e) { - logger.error(String.format("[%s]Error sending %s: %s", - shortID(message), message.getType(), e)); - throw new RuntimeException(String.format("Error sending %s: %s", message.getType(), e)); - } - if(logger.isTraceEnabled())logger.trace("serialSend complete for message "+message); - } - - public synchronized Channel createChannel(String uuid) { - if (responseChannel.containsKey(uuid)) { - throw new IllegalStateException("[" + shortID(uuid) + "] Channel exists"); - } - - Channel channel = new Channel(); - responseChannel.put(uuid, channel); - if(logger.isTraceEnabled())logger.trace("channel created with uuid "+uuid); - - return channel; - } - - public synchronized void sendChannel(ChaincodeMessage message) { - if (!responseChannel.containsKey(message.getTxid())) { - throw new IllegalStateException("[" + shortID(message) + "]sendChannel does not exist"); - } - - logger.debug(String.format("[%s]Before send", shortID(message))); - responseChannel.get(message.getTxid()).add(message); - logger.debug(String.format("[%s]After send", shortID(message))); - } - - public ChaincodeMessage receiveChannel(Channel channel) { - try { - return channel.take(); - } catch (InterruptedException e) { - logger.debug("channel.take() failed with InterruptedException"); - - //Channel has been closed? - //TODO - return null; - } - } - - public synchronized void deleteChannel(String uuid) { - Channel channel = responseChannel.remove(uuid); - if (channel != null) { - channel.close(); - } - - if(logger.isTraceEnabled())logger.trace("deleteChannel done with uuid "+uuid); - } - - /** - * Marks a UUID as either a transaction or a query - * @param uuid ID to be marked - * @param isTransaction true for transaction, false for query - * @return whether or not the UUID was successfully marked - */ - public synchronized boolean markIsTransaction(String uuid, boolean isTransaction) { - if (this.isTransaction == null) { - return false; - } - - this.isTransaction.put(uuid, isTransaction); - return true; - } - - public synchronized void deleteIsTransaction(String uuid) { - isTransaction.remove(uuid); - } - - public void beforeRegistered(Event event) { - extractMessageFromEvent(event); - logger.debug(String.format("Received %s, ready for invocations", REGISTERED)); - } - - /** - * Handles requests to initialize chaincode - * @param message chaincode to be initialized - */ - public void handleInit(ChaincodeMessage message) { - new Thread(() -> { - try { - - // Get the function and args from Payload - final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload()); - - // Mark as a transaction (allow put/del state) - markIsTransaction(message.getTxid(), true); - - // Create the ChaincodeStub which the chaincode can use to callback - final ChaincodeStub stub = new ChaincodeStub(message.getTxid(), this, input.getArgsList()); - - // Call chaincode's Run - final Response result = chaincode.init(stub); - logger.debug(String.format(String.format("[%s]Init succeeded. Sending %s", shortID(message), COMPLETED))); - - if(result.getStatus() == Status.SUCCESS_VALUE) { - // Send COMPLETED with entire result as payload - triggerNextState(ChaincodeMessage.newBuilder() - .setType(COMPLETED) - .setPayload(result.toByteString()) - .setTxid(message.getTxid()) - .build(), true); - } else { - // Send ERROR with entire result.Message as payload - triggerNextState(ChaincodeMessage.newBuilder() - .setType(ERROR) - .setPayload(result.getMessageBytes()) - .setTxid(message.getTxid()) - .build(), true); - } - - } catch (InvalidProtocolBufferException | RuntimeException e) { - logger.error(String.format("[%s]Init failed. Sending %s", shortID(message), ERROR), e); - triggerNextState(ChaincodeMessage.newBuilder() - .setType(ERROR) - .setPayload(Response.newBuilder() - .setStatus(Status.INTERNAL_SERVER_ERROR_VALUE) - .setPayload(ByteString.copyFromUtf8(e.toString())) - .build().toByteString()) - .setTxid(message.getTxid()) - .build(), true); - } finally { - // delete isTransaction entry - deleteIsTransaction(message.getTxid()); - } - }).start(); - } - - // enterInitState will initialize the chaincode if entering init from established. - public void beforeInit(Event event) { - logger.debug(String.format("Before %s event.", event.name)); - logger.debug(String.format("Current state %s", fsm.current())); - final ChaincodeMessage message = extractMessageFromEvent(event); - logger.debug(String.format("[%s]Received %s, initializing chaincode", shortID(message), message.getType())); - if (message.getType() == INIT) { - // Call the chaincode's Run function to initialize - handleInit(message); - } - } - - // handleTransaction Handles request to execute a transaction. - public void handleTransaction(ChaincodeMessage message) { - // The defer followed by triggering a go routine dance is needed to ensure that the previous state transition - // is completed before the next one is triggered. The previous state transition is deemed complete only when - // the beforeInit function is exited. Interesting bug fix!! - Runnable task = () -> { - //better not be nil - ChaincodeMessage nextStatemessage = null; - boolean send = true; - - //Defer - try { - // Get the function and args from Payload - ChaincodeInput input; - try { - input = ChaincodeInput.parseFrom(message.getPayload()); - } catch (InvalidProtocolBufferException e) { - logger.error(String.format("[%s]Incorrect payload format", shortID(message)), e); - logger.debug(String.format("[%s]Incorrect payload format. Sending %s", shortID(message), ERROR)); - // Send ERROR message to chaincode support and change state - nextStatemessage = newErrorEventMessage(message.getTxid(), e); - return; - } - - // Mark as a transaction (allow put/del state) - markIsTransaction(message.getTxid(), true); - - // Create the ChaincodeStub which the chaincode can use to callback - final ChaincodeStub stub = new ChaincodeStub(message.getTxid(), this, input.getArgsList()); - - // Call chaincode's Run - Response response; - try { - response = chaincode.invoke(stub); - } catch (Throwable throwable) { - // Send ERROR message to chaincode support and change state - logger.error(String.format("[%s]Error running chaincode. Transaction execution failed. Sending %s", shortID(message), ERROR)); - nextStatemessage = newErrorEventMessage(message.getTxid(), throwable); - return; - } finally { - deleteIsTransaction(message.getTxid()); - } - - logger.info(String.format("[%s]Transaction completed. Sending %s", shortID(message), COMPLETED)); - logger.info(String.format("[%s] Response ='%s'", shortID(message), response)); - - // Send COMPLETED message to chaincode support and change state - nextStatemessage = newCompletedEventMessage(message.getTxid(), response); - } finally { - triggerNextState(nextStatemessage, send); - } - }; - - new Thread(task).start(); - } - - // enterTransactionState will execute chaincode's Run if coming from a TRANSACTION event. - public void beforeTransaction(Event event) { - ChaincodeMessage message = extractMessageFromEvent(event); - logger.debug(String.format("[%s]Received %s, invoking transaction on chaincode(src:%s, dst:%s)", - shortID(message), message.getType().toString(), event.src, event.dst)); - if (message.getType() == TRANSACTION) { - // Call the chaincode's Run function to invoke transaction - handleTransaction(message); - } - } - - // afterCompleted will need to handle COMPLETED event by sending message to the peer - public void afterCompleted(Event event) { - ChaincodeMessage message = extractMessageFromEvent(event); - logger.debug(String.format("[%s]sending COMPLETED to validator for tid", shortID(message))); - try { - serialSend(message); - } catch (Exception e) { - event.cancel(new Exception("send COMPLETED failed %s", e)); - } - } - - // afterResponse is called to deliver a response or error to the chaincode stub. - public void afterResponse(Event event) { - ChaincodeMessage message = extractMessageFromEvent(event); - try { - sendChannel(message); - logger.debug(String.format("[%s]Received %s, communicated (state:%s)", - shortID(message), message.getType(), fsm.current())); - } catch (Exception e) { - logger.error(String.format("[%s]error sending %s (state:%s): %s", shortID(message), - message.getType(), fsm.current(), e)); - } - } - - private ChaincodeMessage extractMessageFromEvent(Event event) { - try { - return (ChaincodeMessage) event.args[0]; - } catch (ClassCastException | ArrayIndexOutOfBoundsException e) { - final RuntimeException error = new RuntimeException("No chaincode message found in event.", e); - event.cancel(error); - throw error; - } - } - - public void afterError(Event event) { - ChaincodeMessage message = extractMessageFromEvent(event); - /* TODO- revisit. This may no longer be needed with the serialized/streamlined messaging model - * There are two situations in which the ERROR event can be triggered: - * 1. When an error is encountered within handleInit or handleTransaction - some issue at the chaincode side; In this case there will be no responseChannel and the message has been sent to the validator. - * 2. The chaincode has initiated a request (get/put/del state) to the validator and is expecting a response on the responseChannel; If ERROR is received from validator, this needs to be notified on the responseChannel. - */ - try { - sendChannel(message); - } catch (Exception e) { - logger.debug(String.format("[%s]Error received from validator %s, communicated(state:%s)", - shortID(message), message.getType(), fsm.current())); - } - } - - // handleGetState communicates with the validator to fetch the requested state information from the ledger. - public ByteString handleGetState(String key, String uuid) { - try { - //TODO Implement method to get and put entire state map and not one key at a time? - // Create the channel on which to communicate the response from validating peer - // Create the channel on which to communicate the response from validating peer - Channel responseChannel; - try { - responseChannel = createChannel(uuid); - } catch (Exception e) { - logger.debug("Another state request pending for this Uuid. Cannot process."); - throw e; - } - - // Send GET_STATE message to validator chaincode support - ChaincodeMessage message = ChaincodeMessage.newBuilder() - .setType(GET_STATE) - .setPayload(ByteString.copyFromUtf8(key)) - .setTxid(uuid) - .build(); - - logger.debug(String.format("[%s]Sending %s", shortID(message), GET_STATE)); - try { - serialSend(message); - } catch (Exception e) { - logger.error(String.format("[%s]error sending GET_STATE %s", shortID(uuid), e)); - throw new RuntimeException("could not send message"); - } - - // Wait on responseChannel for response - ChaincodeMessage response; - try { - response = receiveChannel(responseChannel); - } catch (Exception e) { - logger.error(String.format("[%s]Received unexpected message type", shortID(uuid))); - throw new RuntimeException("Received unexpected message type"); - } - - // Success response - if (response.getType() == RESPONSE) { - logger.debug(String.format("[%s]GetState received payload %s", shortID(response.getTxid()), RESPONSE)); - return response.getPayload(); - } - - // Error response - if (response.getType() == ERROR) { - logger.error(String.format("[%s]GetState received error %s", shortID(response.getTxid()), ERROR)); - throw new RuntimeException(response.getPayload().toString()); - } - - // Incorrect chaincode message received - logger.error(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", - shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); - throw new RuntimeException("Incorrect chaincode message received"); - } finally { - deleteChannel(uuid); - } - } - - private boolean isTransaction(String uuid) { - return isTransaction.containsKey(uuid) && isTransaction.get(uuid); - } - - public void handlePutState(String key, ByteString value, String uuid) { - // Check if this is a transaction - logger.debug("["+ shortID(uuid)+"]Inside putstate (\""+key+"\":\""+value+"\"), isTransaction = "+isTransaction(uuid)); - - if (!isTransaction(uuid)) { - throw new IllegalStateException("Cannot put state in query context"); - } - - PutStateInfo payload = PutStateInfo.newBuilder() - .setKey(key) - .setValue(value) - .build(); - - // Create the channel on which to communicate the response from validating peer - Channel responseChannel; - try { - responseChannel = createChannel(uuid); - } catch (Exception e) { - logger.error(String.format("[%s]Another state request pending for this Uuid. Cannot process.", shortID(uuid))); - throw e; - } - - //Defer - try { - // Send PUT_STATE message to validator chaincode support - ChaincodeMessage message = ChaincodeMessage.newBuilder() - .setType(PUT_STATE) - .setPayload(payload.toByteString()) - .setTxid(uuid) - .build(); - - logger.debug(String.format("[%s]Sending %s", shortID(message), PUT_STATE)); - - try { - serialSend(message); - } catch (Exception e) { - logger.error(String.format("[%s]error sending PUT_STATE %s", message.getTxid(), e)); - throw new RuntimeException("could not send message"); - } - - // Wait on responseChannel for response - ChaincodeMessage response; - try { - response = receiveChannel(responseChannel); - } catch (Exception e) { - //TODO figure out how to get uuid of receive channel - logger.error(String.format("[%s]Received unexpected message type", e)); - throw e; - } - - // Success response - if (response.getType() == RESPONSE) { - logger.debug(String.format("[%s]Received %s. Successfully updated state", shortID(response.getTxid()), RESPONSE)); - return; - } - - // Error response - if (response.getType() == ERROR) { - logger.error(String.format("[%s]Received %s. Payload: %s", shortID(response.getTxid()), ERROR, response.getPayload())); - throw new RuntimeException(response.getPayload().toStringUtf8()); - } - - // Incorrect chaincode message received - logger.error(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", - shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); - - throw new RuntimeException("Incorrect chaincode message received"); - } catch (Exception e) { - throw e; - } finally { - deleteChannel(uuid); - } - } - - public void handleDeleteState(String key, String uuid) { - // Check if this is a transaction - if (!isTransaction(uuid)) { - throw new RuntimeException("Cannot del state in query context"); - } - - // Create the channel on which to communicate the response from validating peer - Channel responseChannel; - try { - responseChannel = createChannel(uuid); - } catch (Exception e) { - logger.error(String.format("[%s]Another state request pending for this Uuid." - + " Cannot process create createChannel.", shortID(uuid))); - throw e; - } - - //Defer - try { - // Send DEL_STATE message to validator chaincode support - ChaincodeMessage message = ChaincodeMessage.newBuilder() - .setType(DEL_STATE) - .setPayload(ByteString.copyFromUtf8(key)) - .setTxid(uuid) - .build(); - logger.debug(String.format("[%s]Sending %s", shortID(uuid), DEL_STATE)); - try { - serialSend(message); - } catch (Exception e) { - logger.error(String.format("[%s]error sending DEL_STATE %s", shortID(message), DEL_STATE)); - throw new RuntimeException("could not send message"); - } - - // Wait on responseChannel for response - ChaincodeMessage response; - try { - response = receiveChannel(responseChannel); - } catch (Exception e) { - logger.error(String.format("[%s]Received unexpected message type", shortID(message))); - throw new RuntimeException("Received unexpected message type"); - } - - if (response.getType() == RESPONSE) { - // Success response - logger.debug(String.format("[%s]Received %s. Successfully deleted state", message.getTxid(), RESPONSE)); - return; - } - - if (response.getType() == ERROR) { - // Error response - logger.error(String.format("[%s]Received %s. Payload: %s", message.getTxid(), ERROR, response.getPayload())); - throw new RuntimeException(response.getPayload().toStringUtf8()); - } - - // Incorrect chaincode message received - logger.error(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", - shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); - throw new RuntimeException("Incorrect chaincode message received"); - } finally { - deleteChannel(uuid); - } - } - - //TODO: Uncomment and fix range query with new proto type -/* - public QueryStateResponse handleGetStateByRange(String startKey, String endKey, String uuid) { - // Create the channel on which to communicate the response from validating peer - Channel responseChannel; - try { - responseChannel = createChannel(uuid); - } catch (Exception e) { - logger.debug(String.format("[%s]Another state request pending for this Uuid." - + " Cannot process.", shortID(uuid))); - throw e; - } - - //Defer - try { - // Send GET_STATE_BY_RANGE message to validator chaincode support - GetStateByRange payload = GetStateByRange.newBuilder() - .setStartKey(startKey) - .setEndKey(endKey) - .build(); - - ChaincodeMessage message = ChaincodeMessage.newBuilder() - .setType(GET_STATE_BY_RANGE) - .setPayload(payload.toByteString()) - .setTxid(uuid) - .build(); - - logger.debug(String.format("[%s]Sending %s", shortID(message), GET_STATE_BY_RANGE)); - try { - serialSend(message); - } catch (Exception e){ - logger.error(String.format("[%s]error sending %s", shortID(message), GET_STATE_BY_RANGE)); - throw new RuntimeException("could not send message"); - } - - // Wait on responseChannel for response - ChaincodeMessage response; - try { - response = receiveChannel(responseChannel); - } catch (Exception e) { - logger.error(String.format("[%s]Received unexpected message type", uuid)); - throw new RuntimeException("Received unexpected message type"); - } - - if (response.getType() == RESPONSE) { - // Success response - logger.debug(String.format("[%s]Received %s. Successfully got range", - shortID(response.getTxid()), RESPONSE)); - - QueryStateResponse rangeQueryResponse; - try { - rangeQueryResponse = QueryStateResponse.parseFrom(response.getPayload()); - } catch (Exception e) { - logger.error(String.format("[%s]unmarshall error", shortID(response.getTxid()))); - throw new RuntimeException("Error unmarshalling GetStateByRangeResponse."); - } - - return rangeQueryResponse; - } - - if (response.getType() == ERROR) { - // Error response - logger.error(String.format("[%s]Received %s", - shortID(response.getTxid()), ERROR)); - throw new RuntimeException(response.getPayload().toStringUtf8()); - } - - // Incorrect chaincode message received - logger.error(String.format("Incorrect chaincode message %s recieved. Expecting %s or %s", - response.getType(), RESPONSE, ERROR)); - throw new RuntimeException("Incorrect chaincode message received"); - } finally { - deleteChannel(uuid); - } - } -*/ - public ByteString handleInvokeChaincode(String chaincodeName, String function, List args, String uuid) { - // Check if this is a transaction - if (!isTransaction.containsKey(uuid)) { - throw new RuntimeException("Cannot invoke chaincode in query context"); - } - - ChaincodeID id = ChaincodeID.newBuilder() - .setName(chaincodeName).build(); - ChaincodeInput input = ChaincodeInput.newBuilder() - .addArgs(ByteString.copyFromUtf8(function)) - .addAllArgs(args) - .build(); - ChaincodeSpec payload = ChaincodeSpec.newBuilder() - .setChaincodeId(id) - .setInput(input) - .build(); - - // Create the channel on which to communicate the response from validating peer - Channel responseChannel; - try { - responseChannel = createChannel(uuid); - } catch (Exception e) { - logger.error(String.format("[%s]Another state request pending for this Uuid. Cannot process.", shortID(uuid))); - throw e; - } - - //Defer - try { - // Send INVOKE_CHAINCODE message to validator chaincode support - ChaincodeMessage message = ChaincodeMessage.newBuilder() - .setType(INVOKE_CHAINCODE) - .setPayload(payload.toByteString()) - .setTxid(uuid) - .build(); - - logger.debug(String.format("[%s]Sending %s", - shortID(message), INVOKE_CHAINCODE)); - - try { - serialSend(message); - } catch (Exception e) { - logger.error("["+ shortID(message)+"]Error sending "+INVOKE_CHAINCODE+": "+e.getMessage()); - throw e; - } - - // Wait on responseChannel for response - ChaincodeMessage response; - try { - response = receiveChannel(responseChannel); - } catch (Exception e) { - logger.error(String.format("[%s]Received unexpected message type", shortID(message))); - throw new RuntimeException("Received unexpected message type"); - } - - if (response.getType() == RESPONSE) { - // Success response - logger.debug(String.format("[%s]Received %s. Successfully invoked chaincode", shortID(response.getTxid()), RESPONSE)); - return response.getPayload(); - } - - if (response.getType() == ERROR) { - // Error response - logger.error(String.format("[%s]Received %s.", shortID(response.getTxid()), ERROR)); - throw new RuntimeException(response.getPayload().toStringUtf8()); - } - - // Incorrect chaincode message received - logger.debug(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", - shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); - throw new RuntimeException("Incorrect chaincode message received"); - } finally { - deleteChannel(uuid); - } - } - - // handleMessage message handles loop for org.hyperledger.fabric.shim side of chaincode/validator stream. - public synchronized void handleMessage(ChaincodeMessage message) throws Exception { - - if (message.getType() == ChaincodeMessage.Type.KEEPALIVE){ - logger.debug(String.format("[%s] Recieved KEEPALIVE message, do nothing", - shortID(message))); - // Received a keep alive message, we don't do anything with it for now - // and it does not touch the state machine - return; - } - - logger.debug(String.format("[%s]Handling ChaincodeMessage of type: %s(state:%s)", shortID(message), message.getType(), fsm.current())); - - if (fsm.eventCannotOccur(message.getType().toString())) { - String errStr = String.format("[%s]Chaincode handler org.hyperledger.fabric.shim.fsm cannot handle message (%s) with payload size (%d) while in state: %s", - message.getTxid(), message.getType(), message.getPayload().size(), fsm.current()); - serialSend(newErrorEventMessage(message.getTxid(), errStr)); - throw new RuntimeException(errStr); - } - - // Filter errors to allow NoTransitionError and CanceledError - // to not propagate for cases where embedded Err == nil. - try { - fsm.raiseEvent(message.getType().toString(), message); - } catch (NoTransitionException e) { - if (e.error != null) throw e; - logger.debug("["+ shortID(message)+"]Ignoring NoTransitionError"); - } catch (CancelledException e) { - if (e.error != null) throw e; - logger.debug("["+ shortID(message)+"]Ignoring CanceledError"); - } - } - - private String shortID(ChaincodeMessage message) { - return shortID(message.getTxid()); - } - -} +/* +Copyright DTCC, IBM 2016, 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.hyperledger.fabric.shim; + +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.COMPLETED; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.DEL_STATE; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.ERROR; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.GET_STATE; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.INIT; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.INVOKE_CHAINCODE; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.PUT_STATE; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.READY; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.REGISTERED; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.RESPONSE; +import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.TRANSACTION; +import static org.hyperledger.fabric.shim.HandlerHelper.newCompletedEventMessage; +import static org.hyperledger.fabric.shim.HandlerHelper.newErrorEventMessage; +import static org.hyperledger.fabric.shim.fsm.CallbackType.AFTER_EVENT; +import static org.hyperledger.fabric.shim.fsm.CallbackType.BEFORE_EVENT; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.hyperledger.fabric.protos.common.Common.Status; +import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeID; +import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeInput; +import org.hyperledger.fabric.protos.peer.Chaincode.ChaincodeSpec; +import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage; +import org.hyperledger.fabric.protos.peer.ChaincodeShim.PutStateInfo; +import org.hyperledger.fabric.protos.peer.ProposalResponsePackage.Response; +import org.hyperledger.fabric.shim.fsm.CBDesc; +import org.hyperledger.fabric.shim.fsm.Event; +import org.hyperledger.fabric.shim.fsm.EventDesc; +import org.hyperledger.fabric.shim.fsm.FSM; +import org.hyperledger.fabric.shim.fsm.exceptions.CancelledException; +import org.hyperledger.fabric.shim.fsm.exceptions.NoTransitionException; +import org.hyperledger.fabric.shim.helper.Channel; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.grpc.stub.StreamObserver; + +public class Handler { + + private static Log logger = LogFactory.getLog(Handler.class); + + private StreamObserver chatStream; + private ChaincodeBase chaincode; + + private Map isTransaction; + private Map> responseChannel; + public Channel nextState; + + private FSM fsm; + + public Handler(StreamObserver chatStream, ChaincodeBase chaincode) { + this.chatStream = chatStream; + this.chaincode = chaincode; + + responseChannel = new HashMap>(); + isTransaction = new HashMap(); + nextState = new Channel(); + + fsm = new FSM("created"); + + fsm.addEvents( + // Event Name From To + new EventDesc(REGISTERED.toString(), "created", "established"), + new EventDesc(READY.toString(), "established", "ready"), + new EventDesc(ERROR.toString(), "init", "established"), + new EventDesc(RESPONSE.toString(), "init", "init"), + new EventDesc(INIT.toString(), "ready", "ready"), + new EventDesc(TRANSACTION.toString(), "ready", "ready"), + new EventDesc(RESPONSE.toString(), "ready", "ready"), + new EventDesc(ERROR.toString(), "ready", "ready"), + new EventDesc(COMPLETED.toString(), "init", "ready"), + new EventDesc(COMPLETED.toString(), "ready", "ready") + ); + + fsm.addCallbacks( + // Type Trigger Callback + new CBDesc(BEFORE_EVENT, REGISTERED.toString(), (event) -> beforeRegistered(event)), + new CBDesc(AFTER_EVENT, RESPONSE.toString(), (event) -> afterResponse(event)), + new CBDesc(AFTER_EVENT, ERROR.toString(), (event) -> afterError(event)), + new CBDesc(BEFORE_EVENT, INIT.toString(), (event) -> beforeInit(event)), + new CBDesc(BEFORE_EVENT, TRANSACTION.toString(),(event) -> beforeTransaction(event)) + ); + } + + public static String shortID(String uuid) { + if (uuid.length() < 8) { + return uuid; + } else { + return uuid.substring(0, 8); + } + } + + public void triggerNextState(ChaincodeMessage message, boolean send) { + if(logger.isTraceEnabled())logger.trace("triggerNextState for message "+message); + nextState.add(new NextStateInfo(message, send)); + } + + public synchronized void serialSend(ChaincodeMessage message) { + logger.debug("Sending message to peer: " + ChaincodeBase.toJsonString(message)); + try { + chatStream.onNext(message); + } catch (Exception e) { + logger.error(String.format("[%s]Error sending %s: %s", + shortID(message), message.getType(), e)); + throw new RuntimeException(String.format("Error sending %s: %s", message.getType(), e)); + } + if(logger.isTraceEnabled())logger.trace("serialSend complete for message "+message); + } + + public synchronized Channel createChannel(String uuid) { + if (responseChannel.containsKey(uuid)) { + throw new IllegalStateException("[" + shortID(uuid) + "] Channel exists"); + } + + Channel channel = new Channel(); + responseChannel.put(uuid, channel); + if(logger.isTraceEnabled())logger.trace("channel created with uuid "+uuid); + + return channel; + } + + public synchronized void sendChannel(ChaincodeMessage message) { + if (!responseChannel.containsKey(message.getTxid())) { + throw new IllegalStateException("[" + shortID(message) + "]sendChannel does not exist"); + } + + logger.debug(String.format("[%s]Before send", shortID(message))); + responseChannel.get(message.getTxid()).add(message); + logger.debug(String.format("[%s]After send", shortID(message))); + } + + public ChaincodeMessage receiveChannel(Channel channel) { + try { + return channel.take(); + } catch (InterruptedException e) { + logger.debug("channel.take() failed with InterruptedException"); + + //Channel has been closed? + //TODO + return null; + } + } + + public synchronized void deleteChannel(String uuid) { + Channel channel = responseChannel.remove(uuid); + if (channel != null) { + channel.close(); + } + + if(logger.isTraceEnabled())logger.trace("deleteChannel done with uuid "+uuid); + } + + /** + * Marks a UUID as either a transaction or a query + * @param uuid ID to be marked + * @param isTransaction true for transaction, false for query + * @return whether or not the UUID was successfully marked + */ + public synchronized boolean markIsTransaction(String uuid, boolean isTransaction) { + if (this.isTransaction == null) { + return false; + } + + this.isTransaction.put(uuid, isTransaction); + return true; + } + + public synchronized void deleteIsTransaction(String uuid) { + isTransaction.remove(uuid); + } + + public void beforeRegistered(Event event) { + extractMessageFromEvent(event); + logger.debug(String.format("Received %s, ready for invocations", REGISTERED)); + } + + /** + * Handles requests to initialize chaincode + * @param message chaincode to be initialized + */ + public void handleInit(ChaincodeMessage message) { + new Thread(() -> { + try { + + // Get the function and args from Payload + final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload()); + + // Mark as a transaction (allow put/del state) + markIsTransaction(message.getTxid(), true); + + // Create the ChaincodeStub which the chaincode can use to callback + final ChaincodeStub stub = new ChaincodeStub(message.getTxid(), this, input.getArgsList()); + + // Call chaincode's init + final Response result = chaincode.init(stub); + + if(result.getStatus() == Status.SUCCESS_VALUE) { + // Send COMPLETED with entire result as payload + logger.debug(String.format(String.format("[%s]Init succeeded. Sending %s", shortID(message), COMPLETED))); + triggerNextState(newCompletedEventMessage(message.getTxid(), result, stub.getEvent()), true); + } else { + // Send ERROR with entire result.Message as payload + logger.error(String.format("[%s]Init failed. Sending %s", shortID(message), ERROR)); + triggerNextState(newErrorEventMessage(message.getTxid(), result.getMessage(), stub.getEvent()), true); + } + + } catch (InvalidProtocolBufferException | RuntimeException e) { + logger.error(String.format("[%s]Init failed. Sending %s", shortID(message), ERROR), e); + triggerNextState(ChaincodeMessage.newBuilder() + .setType(ERROR) + .setPayload(Response.newBuilder() + .setStatus(Status.INTERNAL_SERVER_ERROR_VALUE) + .setPayload(ByteString.copyFromUtf8(e.toString())) + .build().toByteString()) + .setTxid(message.getTxid()) + .build(), true); + } finally { + // delete isTransaction entry + deleteIsTransaction(message.getTxid()); + } + }).start(); + } + + // enterInitState will initialize the chaincode if entering init from established. + public void beforeInit(Event event) { + logger.debug(String.format("Before %s event.", event.name)); + logger.debug(String.format("Current state %s", fsm.current())); + final ChaincodeMessage message = extractMessageFromEvent(event); + logger.debug(String.format("[%s]Received %s, initializing chaincode", shortID(message), message.getType())); + if (message.getType() == INIT) { + // Call the chaincode's Run function to initialize + handleInit(message); + } + } + + // handleTransaction Handles request to execute a transaction. + public void handleTransaction(ChaincodeMessage message) { + new Thread(() -> { + try { + + // Get the function and args from Payload + final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload()); + + // Mark as a transaction (allow put/del state) + markIsTransaction(message.getTxid(), true); + + // Create the ChaincodeStub which the chaincode can use to callback + final ChaincodeStub stub = new ChaincodeStub(message.getTxid(), this, input.getArgsList()); + + // Call chaincode's invoke + final Response result = chaincode.invoke(stub); + + if(result.getStatus() == Status.SUCCESS_VALUE) { + // Send COMPLETED with entire result as payload + logger.debug(String.format(String.format("[%s]Invoke succeeded. Sending %s", shortID(message), COMPLETED))); + triggerNextState(newCompletedEventMessage(message.getTxid(), result, stub.getEvent()), true); + } else { + // Send ERROR with entire result.Message as payload + logger.error(String.format("[%s]Invoke failed. Sending %s", shortID(message), ERROR)); + triggerNextState(newErrorEventMessage(message.getTxid(), result.getMessage(), stub.getEvent()), true); + } + + } catch (InvalidProtocolBufferException | RuntimeException e) { + logger.error(String.format("[%s]Invoke failed. Sending %s", shortID(message), ERROR), e); + triggerNextState(ChaincodeMessage.newBuilder() + .setType(ERROR) + .setPayload(Response.newBuilder() + .setStatus(Status.INTERNAL_SERVER_ERROR_VALUE) + .setPayload(ByteString.copyFromUtf8(e.toString())) + .build().toByteString()) + .setTxid(message.getTxid()) + .build(), true); + } finally { + // delete isTransaction entry + deleteIsTransaction(message.getTxid()); + } + }).start(); + } + + // enterTransactionState will execute chaincode's Run if coming from a TRANSACTION event. + public void beforeTransaction(Event event) { + ChaincodeMessage message = extractMessageFromEvent(event); + logger.debug(String.format("[%s]Received %s, invoking transaction on chaincode(src:%s, dst:%s)", + shortID(message), message.getType().toString(), event.src, event.dst)); + if (message.getType() == TRANSACTION) { + // Call the chaincode's Run function to invoke transaction + handleTransaction(message); + } + } + + // afterCompleted will need to handle COMPLETED event by sending message to the peer + public void afterCompleted(Event event) { + ChaincodeMessage message = extractMessageFromEvent(event); + logger.debug(String.format("[%s]sending COMPLETED to validator for tid", shortID(message))); + try { + serialSend(message); + } catch (Exception e) { + event.cancel(new Exception("send COMPLETED failed %s", e)); + } + } + + // afterResponse is called to deliver a response or error to the chaincode stub. + public void afterResponse(Event event) { + ChaincodeMessage message = extractMessageFromEvent(event); + try { + sendChannel(message); + logger.debug(String.format("[%s]Received %s, communicated (state:%s)", + shortID(message), message.getType(), fsm.current())); + } catch (Exception e) { + logger.error(String.format("[%s]error sending %s (state:%s): %s", shortID(message), + message.getType(), fsm.current(), e)); + } + } + + private ChaincodeMessage extractMessageFromEvent(Event event) { + try { + return (ChaincodeMessage) event.args[0]; + } catch (ClassCastException | ArrayIndexOutOfBoundsException e) { + final RuntimeException error = new RuntimeException("No chaincode message found in event.", e); + event.cancel(error); + throw error; + } + } + + public void afterError(Event event) { + ChaincodeMessage message = extractMessageFromEvent(event); + /* TODO- revisit. This may no longer be needed with the serialized/streamlined messaging model + * There are two situations in which the ERROR event can be triggered: + * 1. When an error is encountered within handleInit or handleTransaction - some issue at the chaincode side; In this case there will be no responseChannel and the message has been sent to the validator. + * 2. The chaincode has initiated a request (get/put/del state) to the validator and is expecting a response on the responseChannel; If ERROR is received from validator, this needs to be notified on the responseChannel. + */ + try { + sendChannel(message); + } catch (Exception e) { + logger.debug(String.format("[%s]Error received from validator %s, communicated(state:%s)", + shortID(message), message.getType(), fsm.current())); + } + } + + // handleGetState communicates with the validator to fetch the requested state information from the ledger. + public ByteString handleGetState(String key, String uuid) { + try { + //TODO Implement method to get and put entire state map and not one key at a time? + // Create the channel on which to communicate the response from validating peer + // Create the channel on which to communicate the response from validating peer + Channel responseChannel; + try { + responseChannel = createChannel(uuid); + } catch (Exception e) { + logger.debug("Another state request pending for this Uuid. Cannot process."); + throw e; + } + + // Send GET_STATE message to validator chaincode support + ChaincodeMessage message = ChaincodeMessage.newBuilder() + .setType(GET_STATE) + .setPayload(ByteString.copyFromUtf8(key)) + .setTxid(uuid) + .build(); + + logger.debug(String.format("[%s]Sending %s", shortID(message), GET_STATE)); + try { + serialSend(message); + } catch (Exception e) { + logger.error(String.format("[%s]error sending GET_STATE %s", shortID(uuid), e)); + throw new RuntimeException("could not send message"); + } + + // Wait on responseChannel for response + ChaincodeMessage response; + try { + response = receiveChannel(responseChannel); + } catch (Exception e) { + logger.error(String.format("[%s]Received unexpected message type", shortID(uuid))); + throw new RuntimeException("Received unexpected message type"); + } + + // Success response + if (response.getType() == RESPONSE) { + logger.debug(String.format("[%s]GetState received payload %s", shortID(response.getTxid()), RESPONSE)); + return response.getPayload(); + } + + // Error response + if (response.getType() == ERROR) { + logger.error(String.format("[%s]GetState received error %s", shortID(response.getTxid()), ERROR)); + throw new RuntimeException(response.getPayload().toString()); + } + + // Incorrect chaincode message received + logger.error(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", + shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); + throw new RuntimeException("Incorrect chaincode message received"); + } finally { + deleteChannel(uuid); + } + } + + private boolean isTransaction(String uuid) { + return isTransaction.containsKey(uuid) && isTransaction.get(uuid); + } + + public void handlePutState(String key, ByteString value, String uuid) { + // Check if this is a transaction + logger.debug("["+ shortID(uuid)+"]Inside putstate (\""+key+"\":\""+value+"\"), isTransaction = "+isTransaction(uuid)); + + if (!isTransaction(uuid)) { + throw new IllegalStateException("Cannot put state in query context"); + } + + PutStateInfo payload = PutStateInfo.newBuilder() + .setKey(key) + .setValue(value) + .build(); + + // Create the channel on which to communicate the response from validating peer + Channel responseChannel; + try { + responseChannel = createChannel(uuid); + } catch (Exception e) { + logger.error(String.format("[%s]Another state request pending for this Uuid. Cannot process.", shortID(uuid))); + throw e; + } + + //Defer + try { + // Send PUT_STATE message to validator chaincode support + ChaincodeMessage message = ChaincodeMessage.newBuilder() + .setType(PUT_STATE) + .setPayload(payload.toByteString()) + .setTxid(uuid) + .build(); + + logger.debug(String.format("[%s]Sending %s", shortID(message), PUT_STATE)); + + try { + serialSend(message); + } catch (Exception e) { + logger.error(String.format("[%s]error sending PUT_STATE %s", message.getTxid(), e)); + throw new RuntimeException("could not send message"); + } + + // Wait on responseChannel for response + ChaincodeMessage response; + try { + response = receiveChannel(responseChannel); + } catch (Exception e) { + //TODO figure out how to get uuid of receive channel + logger.error(String.format("[%s]Received unexpected message type", e)); + throw e; + } + + // Success response + if (response.getType() == RESPONSE) { + logger.debug(String.format("[%s]Received %s. Successfully updated state", shortID(response.getTxid()), RESPONSE)); + return; + } + + // Error response + if (response.getType() == ERROR) { + logger.error(String.format("[%s]Received %s. Payload: %s", shortID(response.getTxid()), ERROR, response.getPayload())); + throw new RuntimeException(response.getPayload().toStringUtf8()); + } + + // Incorrect chaincode message received + logger.error(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", + shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); + + throw new RuntimeException("Incorrect chaincode message received"); + } catch (Exception e) { + throw e; + } finally { + deleteChannel(uuid); + } + } + + public void handleDeleteState(String key, String uuid) { + // Check if this is a transaction + if (!isTransaction(uuid)) { + throw new RuntimeException("Cannot del state in query context"); + } + + // Create the channel on which to communicate the response from validating peer + Channel responseChannel; + try { + responseChannel = createChannel(uuid); + } catch (Exception e) { + logger.error(String.format("[%s]Another state request pending for this Uuid." + + " Cannot process create createChannel.", shortID(uuid))); + throw e; + } + + //Defer + try { + // Send DEL_STATE message to validator chaincode support + ChaincodeMessage message = ChaincodeMessage.newBuilder() + .setType(DEL_STATE) + .setPayload(ByteString.copyFromUtf8(key)) + .setTxid(uuid) + .build(); + logger.debug(String.format("[%s]Sending %s", shortID(uuid), DEL_STATE)); + try { + serialSend(message); + } catch (Exception e) { + logger.error(String.format("[%s]error sending DEL_STATE %s", shortID(message), DEL_STATE)); + throw new RuntimeException("could not send message"); + } + + // Wait on responseChannel for response + ChaincodeMessage response; + try { + response = receiveChannel(responseChannel); + } catch (Exception e) { + logger.error(String.format("[%s]Received unexpected message type", shortID(message))); + throw new RuntimeException("Received unexpected message type"); + } + + if (response.getType() == RESPONSE) { + // Success response + logger.debug(String.format("[%s]Received %s. Successfully deleted state", message.getTxid(), RESPONSE)); + return; + } + + if (response.getType() == ERROR) { + // Error response + logger.error(String.format("[%s]Received %s. Payload: %s", message.getTxid(), ERROR, response.getPayload())); + throw new RuntimeException(response.getPayload().toStringUtf8()); + } + + // Incorrect chaincode message received + logger.error(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", + shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); + throw new RuntimeException("Incorrect chaincode message received"); + } finally { + deleteChannel(uuid); + } + } + + //TODO: Uncomment and fix range query with new proto type +/* + public QueryStateResponse handleGetStateByRange(String startKey, String endKey, String uuid) { + // Create the channel on which to communicate the response from validating peer + Channel responseChannel; + try { + responseChannel = createChannel(uuid); + } catch (Exception e) { + logger.debug(String.format("[%s]Another state request pending for this Uuid." + + " Cannot process.", shortID(uuid))); + throw e; + } + + //Defer + try { + // Send GET_STATE_BY_RANGE message to validator chaincode support + GetStateByRange payload = GetStateByRange.newBuilder() + .setStartKey(startKey) + .setEndKey(endKey) + .build(); + + ChaincodeMessage message = ChaincodeMessage.newBuilder() + .setType(GET_STATE_BY_RANGE) + .setPayload(payload.toByteString()) + .setTxid(uuid) + .build(); + + logger.debug(String.format("[%s]Sending %s", shortID(message), GET_STATE_BY_RANGE)); + try { + serialSend(message); + } catch (Exception e){ + logger.error(String.format("[%s]error sending %s", shortID(message), GET_STATE_BY_RANGE)); + throw new RuntimeException("could not send message"); + } + + // Wait on responseChannel for response + ChaincodeMessage response; + try { + response = receiveChannel(responseChannel); + } catch (Exception e) { + logger.error(String.format("[%s]Received unexpected message type", uuid)); + throw new RuntimeException("Received unexpected message type"); + } + + if (response.getType() == RESPONSE) { + // Success response + logger.debug(String.format("[%s]Received %s. Successfully got range", + shortID(response.getTxid()), RESPONSE)); + + QueryStateResponse rangeQueryResponse; + try { + rangeQueryResponse = QueryStateResponse.parseFrom(response.getPayload()); + } catch (Exception e) { + logger.error(String.format("[%s]unmarshall error", shortID(response.getTxid()))); + throw new RuntimeException("Error unmarshalling GetStateByRangeResponse."); + } + + return rangeQueryResponse; + } + + if (response.getType() == ERROR) { + // Error response + logger.error(String.format("[%s]Received %s", + shortID(response.getTxid()), ERROR)); + throw new RuntimeException(response.getPayload().toStringUtf8()); + } + + // Incorrect chaincode message received + logger.error(String.format("Incorrect chaincode message %s recieved. Expecting %s or %s", + response.getType(), RESPONSE, ERROR)); + throw new RuntimeException("Incorrect chaincode message received"); + } finally { + deleteChannel(uuid); + } + } +*/ + public ByteString handleInvokeChaincode(String chaincodeName, String function, List args, String uuid) { + // Check if this is a transaction + if (!isTransaction.containsKey(uuid)) { + throw new RuntimeException("Cannot invoke chaincode in query context"); + } + + ChaincodeID id = ChaincodeID.newBuilder() + .setName(chaincodeName).build(); + ChaincodeInput input = ChaincodeInput.newBuilder() + .addArgs(ByteString.copyFromUtf8(function)) + .addAllArgs(args) + .build(); + ChaincodeSpec payload = ChaincodeSpec.newBuilder() + .setChaincodeId(id) + .setInput(input) + .build(); + + // Create the channel on which to communicate the response from validating peer + Channel responseChannel; + try { + responseChannel = createChannel(uuid); + } catch (Exception e) { + logger.error(String.format("[%s]Another state request pending for this Uuid. Cannot process.", shortID(uuid))); + throw e; + } + + //Defer + try { + // Send INVOKE_CHAINCODE message to validator chaincode support + ChaincodeMessage message = ChaincodeMessage.newBuilder() + .setType(INVOKE_CHAINCODE) + .setPayload(payload.toByteString()) + .setTxid(uuid) + .build(); + + logger.debug(String.format("[%s]Sending %s", + shortID(message), INVOKE_CHAINCODE)); + + try { + serialSend(message); + } catch (Exception e) { + logger.error("["+ shortID(message)+"]Error sending "+INVOKE_CHAINCODE+": "+e.getMessage()); + throw e; + } + + // Wait on responseChannel for response + ChaincodeMessage response; + try { + response = receiveChannel(responseChannel); + } catch (Exception e) { + logger.error(String.format("[%s]Received unexpected message type", shortID(message))); + throw new RuntimeException("Received unexpected message type"); + } + + if (response.getType() == RESPONSE) { + // Success response + logger.debug(String.format("[%s]Received %s. Successfully invoked chaincode", shortID(response.getTxid()), RESPONSE)); + return response.getPayload(); + } + + if (response.getType() == ERROR) { + // Error response + logger.error(String.format("[%s]Received %s.", shortID(response.getTxid()), ERROR)); + throw new RuntimeException(response.getPayload().toStringUtf8()); + } + + // Incorrect chaincode message received + logger.debug(String.format("[%s]Incorrect chaincode message %s received. Expecting %s or %s", + shortID(response.getTxid()), response.getType(), RESPONSE, ERROR)); + throw new RuntimeException("Incorrect chaincode message received"); + } finally { + deleteChannel(uuid); + } + } + + // handleMessage message handles loop for org.hyperledger.fabric.shim side of chaincode/validator stream. + public synchronized void handleMessage(ChaincodeMessage message) throws Exception { + + if (message.getType() == ChaincodeMessage.Type.KEEPALIVE){ + logger.debug(String.format("[%s] Recieved KEEPALIVE message, do nothing", + shortID(message))); + // Received a keep alive message, we don't do anything with it for now + // and it does not touch the state machine + return; + } + + logger.debug(String.format("[%s]Handling ChaincodeMessage of type: %s(state:%s)", shortID(message), message.getType(), fsm.current())); + + if (fsm.eventCannotOccur(message.getType().toString())) { + String errStr = String.format("[%s]Chaincode handler org.hyperledger.fabric.shim.fsm cannot handle message (%s) with payload size (%d) while in state: %s", + message.getTxid(), message.getType(), message.getPayload().size(), fsm.current()); + serialSend(newErrorEventMessage(message.getTxid(), errStr)); + throw new RuntimeException(errStr); + } + + // Filter errors to allow NoTransitionError and CanceledError + // to not propagate for cases where embedded Err == nil. + try { + fsm.raiseEvent(message.getType().toString(), message); + } catch (NoTransitionException e) { + if (e.error != null) throw e; + logger.debug("["+ shortID(message)+"]Ignoring NoTransitionError"); + } catch (CancelledException e) { + if (e.error != null) throw e; + logger.debug("["+ shortID(message)+"]Ignoring CanceledError"); + } + } + + private String shortID(ChaincodeMessage message) { + return shortID(message.getTxid()); + } + +} diff --git a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/HandlerHelper.java b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/HandlerHelper.java index deb75d7aa94..a59770e2e3b 100644 --- a/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/HandlerHelper.java +++ b/core/chaincode/shim/java/src/main/java/org/hyperledger/fabric/shim/HandlerHelper.java @@ -17,6 +17,10 @@ import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.ERROR; import static org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type.GET_STATE; +import java.io.PrintWriter; +import java.io.StringWriter; + +import org.hyperledger.fabric.protos.peer.ChaincodeEventPackage.ChaincodeEvent; import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage; import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type; import org.hyperledger.fabric.protos.peer.ProposalResponsePackage.Response; @@ -25,37 +29,52 @@ abstract class HandlerHelper { - private static ChaincodeMessage newEventMessage(final Type type, final String txid, final ByteString payload) { - return ChaincodeMessage.newBuilder() - .setType(type) - .setTxid(txid) - .setPayload(payload) - .build(); - } - - static ChaincodeMessage newGetStateEventMessage(final String txid, final String key) { - return newEventMessage(GET_STATE, txid, ByteString.copyFromUtf8(key)); - } - - static ChaincodeMessage newErrorEventMessage(final String txid, final Response payload) { - return newEventMessage(ERROR, txid, payload.toByteString()); - } - - static ChaincodeMessage newErrorEventMessage(final String txid, final Throwable throwable) { - return newErrorEventMessage(txid, ChaincodeHelper.newInternalServerErrorResponse(throwable)); - } - - static ChaincodeMessage newErrorEventMessage(final String txid, final String message) { - return newErrorEventMessage(txid, ChaincodeHelper.newInternalServerErrorResponse(message)); - } - - static ChaincodeMessage newCompletedEventMessage(final String txid, Response response) { - return ChaincodeMessage.newBuilder() - .setType(COMPLETED) - .setTxid(txid) - .setPayload(response.toByteString()) - .build(); - } + static ChaincodeMessage newGetStateEventMessage(final String txid, final String key) { + return newEventMessage(GET_STATE, txid, ByteString.copyFromUtf8(key)); + } + + static ChaincodeMessage newErrorEventMessage(final String txid, final Throwable throwable) { + return newErrorEventMessage(txid, printStackTrace(throwable)); + } + + static ChaincodeMessage newErrorEventMessage(final String txid, final String message) { + return newErrorEventMessage(txid, message, null); + } + + static ChaincodeMessage newErrorEventMessage(final String txid, final String message, final ChaincodeEvent event) { + return newEventMessage(ERROR, txid, ByteString.copyFromUtf8(message), event); + } + + static ChaincodeMessage newCompletedEventMessage(final String txid, final Response response, final ChaincodeEvent event) { + return newEventMessage(COMPLETED, txid, response.toByteString(), event); + } + + private static ChaincodeMessage newEventMessage(final Type type, final String txid, final ByteString payload) { + return newEventMessage(type, txid, payload, null); + } + + private static ChaincodeMessage newEventMessage(final Type type, final String txid, final ByteString payload, final ChaincodeEvent event) { + if (event == null) { + return ChaincodeMessage.newBuilder() + .setType(type) + .setTxid(txid) + .setPayload(payload) + .build(); + } else { + return ChaincodeMessage.newBuilder() + .setType(type) + .setTxid(txid) + .setPayload(payload) + .setChaincodeEvent(event) + .build(); + } + } + private static String printStackTrace(Throwable throwable) { + if (throwable == null) return null; + final StringWriter buffer = new StringWriter(); + throwable.printStackTrace(new PrintWriter(buffer)); + return buffer.toString(); + } } diff --git a/examples/chaincode/java/eventsender/build.gradle b/examples/chaincode/java/eventsender/build.gradle new file mode 100644 index 00000000000..606f0c1704a --- /dev/null +++ b/examples/chaincode/java/eventsender/build.gradle @@ -0,0 +1,83 @@ +/* +Copyright DTCC, IBM 2016, 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + + +buildscript { + repositories { + mavenLocal() + mavenCentral() + jcenter() + } +} + +plugins { + id "java" + id "eclipse" + id "application" +} + + +task printClasspath { + doLast { + configurations.testRuntime.each { println it } + } +} + +archivesBaseName = "chaincode" +mainClassName="example.EventSender" + +run { + if (project.hasProperty("appArgs")) { + args = Eval.me(appArgs) + } +} + +sourceSets { + main { + java { + srcDir 'src/main/java' + } + } +} + +repositories { + mavenLocal() + mavenCentral() +} + + +jar.doFirst { + destinationDir=file("${buildDir}") + manifest { + attributes ( + 'Main-Class': mainClassName, + 'Class-Path': configurations.runtime.collect { "libs/"+"$it.name" }.join(' ') + ) + } +} + +task copyToLib(type: Copy) { + into "$buildDir/libs" + from configurations.runtime +} +build.finalizedBy(copyToLib) + + +dependencies { + compile 'io.grpc:grpc-all:0.13.2' + compile 'commons-cli:commons-cli:1.3.1' + compile 'org.hyperledger:shim-client:1.0' +} diff --git a/examples/chaincode/java/eventsender/src/main/java/example/EventSender.java b/examples/chaincode/java/eventsender/src/main/java/example/EventSender.java new file mode 100644 index 00000000000..ec3b0d5f38f --- /dev/null +++ b/examples/chaincode/java/eventsender/src/main/java/example/EventSender.java @@ -0,0 +1,95 @@ +/* +Copyright IBM 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example; + +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; +import static org.hyperledger.fabric.shim.ChaincodeHelper.newBadRequestResponse; +import static org.hyperledger.fabric.shim.ChaincodeHelper.newInternalServerErrorResponse; +import static org.hyperledger.fabric.shim.ChaincodeHelper.newSuccessResponse; + +import java.util.List; + +import org.hyperledger.fabric.protos.peer.ProposalResponsePackage.Response; +import org.hyperledger.fabric.shim.ChaincodeBase; +import org.hyperledger.fabric.shim.ChaincodeStub; + +public class EventSender extends ChaincodeBase { + + private static final String EVENT_COUNT = "noevents"; + + @Override + public Response init(ChaincodeStub stub) { + stub.putState(EVENT_COUNT, Integer.toString(0)); + return newSuccessResponse(); + } + + @Override + public Response invoke(ChaincodeStub stub) { + + try { + final List argList = stub.getArgsAsStrings(); + final String function = argList.get(0); + + switch (function) { + case "invoke": + return doInvoke(stub, argList.stream().skip(1).collect(toList())); + case "query": + return doQuery(stub); + default: + return newBadRequestResponse(format("Unknown function: %s", function)); + } + + } catch (Throwable e) { + return newInternalServerErrorResponse(e); + } + + } + + private Response doInvoke(ChaincodeStub stub, List args) { + + // get number of events sent + final int eventNumber = Integer.parseInt(stub.getState(EVENT_COUNT)); + + // increment number of events sent + stub.putState(EVENT_COUNT, Integer.toString(eventNumber + 1)); + + // create event payload + final String payload = args.stream().collect(joining(",", "Event " + String.valueOf(eventNumber), "")); + + // indicate event to post with the transaction + stub.setEvent("evtsender", payload.getBytes(UTF_8)); + + return newSuccessResponse(); + } + + private Response doQuery(ChaincodeStub stub) { + return newSuccessResponse(String.format("{\"NoEvents\":%d}", Integer.parseInt(stub.getState(EVENT_COUNT)))); + } + + @Override + public String getChaincodeID() { + return "EventSender"; + } + + public static void main(String[] args) throws Exception { + new EventSender().start(args); + } + +}