From 7011ca42b7d5d46b8749373a0a84c7d3d8569a92 Mon Sep 17 00:00:00 2001 From: rickr Date: Tue, 1 Aug 2017 10:59:32 -0400 Subject: [PATCH] FAB-5387 Listener for custom chaincode events. Change-Id: I3082678b840d4149dce6a6b772f2cb1ba23091a9 Signed-off-by: rickr --- .../hyperledger/fabric/sdk/BlockEvent.java | 6 + .../org/hyperledger/fabric/sdk/BlockInfo.java | 20 ++ .../sdk/ChaincodeActionDeserializer.java | 14 +- .../fabric/sdk/ChaincodeEvent.java | 110 +++++++ .../fabric/sdk/ChaincodeEventListener.java | 31 ++ .../org/hyperledger/fabric/sdk/Channel.java | 303 +++++++++++++----- .../src/github.com/example_cc/example_cc.go | 3 + .../fabric/sdk/testutils/TestConfig.java | 13 +- .../sdkintegration/End2endAndBackAgainIT.java | 9 +- .../fabric/sdkintegration/End2endIT.java | 108 ++++++- 10 files changed, 501 insertions(+), 116 deletions(-) create mode 100644 src/main/java/org/hyperledger/fabric/sdk/ChaincodeEvent.java create mode 100644 src/main/java/org/hyperledger/fabric/sdk/ChaincodeEventListener.java diff --git a/src/main/java/org/hyperledger/fabric/sdk/BlockEvent.java b/src/main/java/org/hyperledger/fabric/sdk/BlockEvent.java index d9575bdb..f7a706cb 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/BlockEvent.java +++ b/src/main/java/org/hyperledger/fabric/sdk/BlockEvent.java @@ -42,6 +42,12 @@ public EventHub getEventHub() { private final EventHub eventHub; private final Event event; + /** + * Raw proto buff event. + * + * @return Return raw protobuf event. + */ + public Event getEvent() { return event; } diff --git a/src/main/java/org/hyperledger/fabric/sdk/BlockInfo.java b/src/main/java/org/hyperledger/fabric/sdk/BlockInfo.java index 924232e0..2893e8e8 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/BlockInfo.java +++ b/src/main/java/org/hyperledger/fabric/sdk/BlockInfo.java @@ -321,6 +321,13 @@ public int getProposalResponseStatus() { } + /** + * Get read write set for this transaction. Will return null on for Eventhub events. + * For eventhub events find the block by block number to get read write set if needed. + * + * @return Read write set. + */ + public TxReadWriteSetInfo getTxReadWriteSet() { TxReadWriteSet txReadWriteSet = transactionAction.getPayload().getAction().getProposalResponsePayload() @@ -333,6 +340,19 @@ public TxReadWriteSetInfo getTxReadWriteSet() { } + /** + * Get chaincode events for this transaction. + * + * @return A chaincode event if the chaincode set an event otherwise null. + */ + + public ChaincodeEvent getEvent() { + + return transactionAction.getPayload().getAction().getProposalResponsePayload() + .getExtension().getEvent(); + + } + } public TransactionActionInfo getTransactionActionInfo(int index) { diff --git a/src/main/java/org/hyperledger/fabric/sdk/ChaincodeActionDeserializer.java b/src/main/java/org/hyperledger/fabric/sdk/ChaincodeActionDeserializer.java index f0371a09..dbe7d7fa 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/ChaincodeActionDeserializer.java +++ b/src/main/java/org/hyperledger/fabric/sdk/ChaincodeActionDeserializer.java @@ -52,11 +52,19 @@ ChaincodeAction getChaincodeAction() { } - //TODO events ? + return ret; - // ret.getResponse(); + } - return ret; + ChaincodeEvent getEvent() { + + ChaincodeAction ca = getChaincodeAction(); + ByteString eventsBytes = ca.getEvents(); + if (eventsBytes == null || eventsBytes.isEmpty()) { + return null; + } + + return new ChaincodeEvent(eventsBytes); } diff --git a/src/main/java/org/hyperledger/fabric/sdk/ChaincodeEvent.java b/src/main/java/org/hyperledger/fabric/sdk/ChaincodeEvent.java new file mode 100644 index 00000000..c2374715 --- /dev/null +++ b/src/main/java/org/hyperledger/fabric/sdk/ChaincodeEvent.java @@ -0,0 +1,110 @@ +/* + * + * Copyright 2016,2017 DTCC, Fujitsu Australia Software Technology, IBM - All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.hyperledger.fabric.sdk; + +import java.lang.ref.WeakReference; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.hyperledger.fabric.protos.peer.ChaincodeEventOuterClass; +import org.hyperledger.fabric.sdk.exception.InvalidProtocolBufferRuntimeException; + +/** + * Encapsulates a Chaincode event. + */ +public class ChaincodeEvent { + private final ByteString byteString; + private WeakReference chaincodeEvent; + + ChaincodeEvent(ByteString byteString) { + this.byteString = byteString; + } + + ChaincodeEventOuterClass.ChaincodeEvent getChaincodeEvent() { + ChaincodeEventOuterClass.ChaincodeEvent ret = null; + + if (chaincodeEvent != null) { + ret = chaincodeEvent.get(); + + } + if (ret == null) { + + try { + ret = ChaincodeEventOuterClass.ChaincodeEvent.parseFrom(byteString); + + } catch (InvalidProtocolBufferException e) { + throw new InvalidProtocolBufferRuntimeException(e); + } + + chaincodeEvent = new WeakReference<>(ret); + + } + + return ret; + + } + + /** + * Get Chaincode event's name; + * + * @return Return name; + */ + public String getEventName() { + + return getChaincodeEvent().getEventName(); + + } + + /** + * Get Chaincode identifier. + * + * @return The identifier + */ + public String getChaincodeId() { + + return getChaincodeEvent().getChaincodeId(); + + } + + /** + * Get transaction id associated with this event. + * + * @return The transactions id. + */ + public String getTxId() { + + return getChaincodeEvent().getTxId(); + + } + + /** + * Binary data associated with this event. + * + * @return binary data set by the chaincode for this event. This may return null. + */ + public byte[] getPayload() { + + ByteString ret = getChaincodeEvent().getPayload(); + if (null == ret) { + return null; + } + + return ret.toByteArray(); + + } + +} diff --git a/src/main/java/org/hyperledger/fabric/sdk/ChaincodeEventListener.java b/src/main/java/org/hyperledger/fabric/sdk/ChaincodeEventListener.java new file mode 100644 index 00000000..6afea3ff --- /dev/null +++ b/src/main/java/org/hyperledger/fabric/sdk/ChaincodeEventListener.java @@ -0,0 +1,31 @@ +/* + * + * Copyright 2016,2017 DTCC, Fujitsu Australia Software Technology, IBM - All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.hyperledger.fabric.sdk; + +/** + * ChaincodeEventListener implemented by classes needing to receive chaincode events. + */ +public interface ChaincodeEventListener { + /** + * Receiving a chaincode event. ChaincodeEventListener should not be long lived as they can take up thread resources. + * + * @param handle The handle of the chaincode event listener that produced this event. + * @param blockEvent The block event information that contained the chaincode event. See {@link BlockEvent} + * @param chaincodeEvent The chaincode event. see {@link ChaincodeEvent} + */ + void received(String handle, BlockEvent blockEvent, ChaincodeEvent chaincodeEvent); +} diff --git a/src/main/java/org/hyperledger/fabric/sdk/Channel.java b/src/main/java/org/hyperledger/fabric/sdk/Channel.java index d864e4d7..fa67f1ad 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/Channel.java +++ b/src/main/java/org/hyperledger/fabric/sdk/Channel.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -2526,7 +2527,7 @@ byte[] getChannelConfigurationSignature(ChannelConfiguration channelConfiguratio * Register a block listener. * * @param listener - * @return the UUID handle of the registered block listener. + * @return The handle of the registered block listener. * @throws InvalidArgumentException if the channel is shutdown. */ public String registerBlockListener(BlockListener listener) throws InvalidArgumentException { @@ -2539,6 +2540,39 @@ public String registerBlockListener(BlockListener listener) throws InvalidArgume } + private static void checkHandle(final String tag, final String handle) throws InvalidArgumentException { + + if (isNullOrEmpty(handle)) { + throw new InvalidArgumentException("Handle is invalid."); + } + if (!handle.startsWith(tag) || !handle.endsWith(tag)) { + throw new InvalidArgumentException("Handle is wrong type."); + } + + } + + /** + * Unregister a block listener. + * + * @param handle of Block listener to remove. + * @return false if not found. + * @throws InvalidArgumentException if the channel is shutdown or invalid arguments. + */ + public boolean unRegisterBlockListener(String handle) throws InvalidArgumentException { + + if (shutdown) { + throw new InvalidArgumentException(format("Channel %s has been shutdown.", name)); + } + + checkHandle(BLOCK_LISTENER_TAG, handle); + + synchronized (blockListeners) { + + return null != blockListeners.remove(handle); + + } + } + /** * A queue each eventing hub will write events to. */ @@ -2565,9 +2599,6 @@ boolean addBEvent(BlockEvent event) { return false; } -// Block block = event.seekBlock(); -// final long num = block.getHeader().getNumber(); - // May be fed by multiple eventhubs but BlockingQueue.add() is thread-safe events.add(event); @@ -2674,43 +2705,10 @@ private void startEventQue() { } }); -// Do our own time out. of tasks -// cleanUpTask = () -> { -// -// -// for (;;) { -// -// synchronized (txListeners) { -// -// for (LinkedList tll : txListeners.values()) { -// -// if (tll == null) { -// continue; -// } -// -// for (TL tl : tll) { -// tl.timedOut(); -// } -// } -// } -// -// -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// logger.error(e); -// -// } -// -// } -// -// }; -// -// -// new Thread(cleanUpTask).start(); -// } + private static final String BLOCK_LISTENER_TAG = "BLOCK_LISTENER_HANDLE"; + private final LinkedHashMap blockListeners = new LinkedHashMap<>(); class BL { @@ -2725,7 +2723,7 @@ public String getHandle() { BL(BlockListener listener) { - handle = Utils.generateUUID(); + handle = BLOCK_LISTENER_TAG + Utils.generateUUID() + BLOCK_LISTENER_TAG; logger.debug(format("Channel %s blockListener %s starting", name, handle)); this.listener = listener; @@ -2792,8 +2790,6 @@ private class TL { final AtomicBoolean fired = new AtomicBoolean(false); final CompletableFuture future; final Set seenEventHubs = Collections.synchronizedSet(new HashSet<>()); -// final long createdTime = System.currentTimeMillis();//seconds -// final long waitTime; Set eventReceived(EventHub eventHub) { @@ -2805,11 +2801,6 @@ Set eventReceived(EventHub eventHub) { TL(String txID, CompletableFuture future) { this.txID = txID; this.future = future; -// if (waitTimeSeconds > 0) { -// this.waitTime = waitTimeSeconds * 1000; -// } else { -// this.waitTime = -1; -// } addListener(); } @@ -2852,39 +2843,6 @@ void fire(BlockEvent.TransactionEvent transactionEvent) { } } - //KEEP THIS FOR NOW in case in the future we decide we want it. - -// public boolean timedOut() { -// -// if (fired.get()) { -// return false; -// } -// if (waitTime == -1) { -// return false; -// } -// -// if (createdTime + waitTime > System.currentTimeMillis()) { -// return false; -// } -// -// LinkedList l = txListeners.get(txID); -// if (null != l) { -// l.removeFirstOccurrence(this); -// } -// -// logger.debug("timeout:" + txID); -// -// if (fired.getAndSet(true)) { -// return false; -// } -// -// executorService.execute(() -> { -// future.completeExceptionally(new TimeoutException("Transaction " + txID + " timed out.")); -// }); -// -// return true; -// -// } } /** @@ -2904,6 +2862,185 @@ private CompletableFuture registerTxListener(String txid) { } + //////////////////////////////////////////////////////////////////////// + //////////////// Chaincode Events.. ////////////////////////////////// + + private static final String CHAINCODE_EVENTS_TAG = "CHAINCODE_EVENTS_HANDLE"; + + private final LinkedHashMap chainCodeListeners = new LinkedHashMap<>(); + + private class ChaincodeEventListenerEntry { + + private final Pattern chaincodeIdPattern; + private final Pattern eventNamePattern; + private final ChaincodeEventListener chaincodeEventListener; + private final String handle; + + ChaincodeEventListenerEntry(Pattern chaincodeIdPattern, Pattern eventNamePattern, ChaincodeEventListener chaincodeEventListener) { + this.chaincodeIdPattern = chaincodeIdPattern; + this.eventNamePattern = eventNamePattern; + this.chaincodeEventListener = chaincodeEventListener; + this.handle = CHAINCODE_EVENTS_TAG + Utils.generateUUID() + CHAINCODE_EVENTS_TAG; + + synchronized (chainCodeListeners) { + + chainCodeListeners.put(handle, this); + + } + } + + boolean isMatch(ChaincodeEvent chaincodeEvent) { + + return chaincodeIdPattern.matcher(chaincodeEvent.getChaincodeId()).matches() && eventNamePattern.matcher(chaincodeEvent.getEventName()).matches(); + + } + + void fire(BlockEvent blockEvent, ChaincodeEvent ce) { + + executorService.execute(() -> chaincodeEventListener.received(handle, blockEvent, ce)); + + } + } + + /** + * Register a chaincode event listener. Both chaincodeId pattern AND eventName pattern must match to invoke + * the chaincodeEventListener + * + * @param chaincodeId Java pattern for chaincode identifier also know as chaincode name. If ma + * @param eventName Java pattern to match the event name. + * @param chaincodeEventListener The listener to be invoked if both chaincodeId and eventName pattern matches. + * @return Handle to be used to unregister the event listener {@link #unRegisterChaincodeEventListener(String)} + * @throws InvalidArgumentException + */ + + public String registerChaincodeEventListener(Pattern chaincodeId, Pattern eventName, ChaincodeEventListener chaincodeEventListener) throws InvalidArgumentException { + + if (shutdown) { + throw new InvalidArgumentException(format("Channel %s has been shutdown.", name)); + } + + if (chaincodeId == null) { + throw new InvalidArgumentException("The chaincodeId argument may not be null."); + } + + if (eventName == null) { + throw new InvalidArgumentException("The eventName argument may not be null."); + } + + if (chaincodeEventListener == null) { + throw new InvalidArgumentException("The chaincodeEventListener argument may not be null."); + } + + ChaincodeEventListenerEntry chaincodeEventListenerEntry = new ChaincodeEventListenerEntry(chaincodeId, eventName, chaincodeEventListener); + synchronized (this) { + if (null == blh) { + blh = registerChaincodeListenerProcessor(); + } + } + return chaincodeEventListenerEntry.handle; + + } + + private String blh = null; + + /** + * Unregister an existing chaincode event listener. + * + * @param handle Chaincode event listener handle to be unregistered. + * @return True if the chaincode handler was found and removed. + * @throws InvalidArgumentException + */ + + public boolean unRegisterChaincodeEventListener(String handle) throws InvalidArgumentException { + boolean ret; + + if (shutdown) { + throw new InvalidArgumentException(format("Channel %s has been shutdown.", name)); + } + + checkHandle(CHAINCODE_EVENTS_TAG, handle); + + synchronized (chainCodeListeners) { + ret = null != chainCodeListeners.remove(handle); + + } + + synchronized (this) { + if (null != blh && chainCodeListeners.isEmpty()) { + + unRegisterBlockListener(blh); + blh = null; + } + } + + return ret; + + } + + private String registerChaincodeListenerProcessor() throws InvalidArgumentException { + logger.debug(format("Channel %s registerChaincodeListenerProcessor starting", name)); + + // Chaincode event listener is internal Block listener for chaincode events. + + return registerBlockListener(blockEvent -> { + + if (chainCodeListeners.isEmpty()) { + return; + } + + LinkedList chaincodeEvents = new LinkedList<>(); + + //Find the chaincode events in the transactions. + + for (TransactionEvent transactionEvent : blockEvent.getTransactionEvents()) { + + logger.debug(format("Channel %s got event for transaction %s ", name, transactionEvent.getTransactionID())); + + for (BlockInfo.TransactionEnvelopeInfo.TransactionActionInfo info : transactionEvent.getTransactionActionInfos()) { + + ChaincodeEvent event = info.getEvent(); + if (null != event) { + chaincodeEvents.add(event); + } + + } + + } + + if (!chaincodeEvents.isEmpty()) { + + HashMap matches = new HashMap<>(); //Find matches. + + synchronized (chainCodeListeners) { + + for (ChaincodeEventListenerEntry chaincodeEventListenerEntry : chainCodeListeners.values()) { + + for (ChaincodeEvent chaincodeEvent : chaincodeEvents) { + + if (chaincodeEventListenerEntry.isMatch(chaincodeEvent)) { + + matches.put(chaincodeEventListenerEntry, chaincodeEvent); + } + + } + + } + } + + //fire events + for (Map.Entry match : matches.entrySet()) { + + ChaincodeEventListenerEntry chaincodeEventListenerEntry = match.getKey(); + ChaincodeEvent ce = match.getValue(); + chaincodeEventListenerEntry.fire(blockEvent, ce); + + } + + } + + }); + } + /** * Shutdown the channel with all resources released. * @@ -2918,9 +3055,13 @@ public synchronized void shutdown(boolean force) { initialized = false; shutdown = true; -// anchorPeers = null; + executorService = null; + chainCodeListeners.clear(); + + blockListeners.clear(); + for (EventHub eh : getEventHubs()) { try { diff --git a/src/test/fixture/sdkintegration/gocc/sample1/src/github.com/example_cc/example_cc.go b/src/test/fixture/sdkintegration/gocc/sample1/src/github.com/example_cc/example_cc.go index d5f2b0ae..f1d52c55 100644 --- a/src/test/fixture/sdkintegration/gocc/sample1/src/github.com/example_cc/example_cc.go +++ b/src/test/fixture/sdkintegration/gocc/sample1/src/github.com/example_cc/example_cc.go @@ -157,6 +157,9 @@ func (t *SimpleChaincode) move(stub shim.ChaincodeStubInterface, args []string) } if transientMap, err := stub.GetTransient(); err == nil { + if transientData, ok := transientMap["event"]; ok { + stub.SetEvent("event", transientData) + } if transientData, ok := transientMap["result"]; ok { return shim.Success(transientData) } diff --git a/src/test/java/org/hyperledger/fabric/sdk/testutils/TestConfig.java b/src/test/java/org/hyperledger/fabric/sdk/testutils/TestConfig.java index f2a35ecd..2f303956 100644 --- a/src/test/java/org/hyperledger/fabric/sdk/testutils/TestConfig.java +++ b/src/test/java/org/hyperledger/fabric/sdk/testutils/TestConfig.java @@ -52,12 +52,10 @@ public class TestConfig { private static final String PROPBASE = "org.hyperledger.fabric.sdktest."; - private static final String GOSSIPWAITTIME = PROPBASE + "GossipWaitTime"; private static final String INVOKEWAITTIME = PROPBASE + "InvokeWaitTime"; private static final String DEPLOYWAITTIME = PROPBASE + "DeployWaitTime"; private static final String PROPOSALWAITTIME = PROPBASE + "ProposalWaitTime"; - private static final String INTEGRATIONTESTS_ORG = PROPBASE + "integrationTests.org."; private static final Pattern orgPat = Pattern.compile("^" + Pattern.quote(INTEGRATIONTESTS_ORG) + "([^\\.]+)\\.mspid$"); @@ -89,12 +87,10 @@ private TestConfig() { // Default values - defaultProperty(GOSSIPWAITTIME, "5000"); defaultProperty(INVOKEWAITTIME, "100000"); defaultProperty(DEPLOYWAITTIME, "120000"); defaultProperty(PROPOSALWAITTIME, "120000"); - ////// defaultProperty(INTEGRATIONTESTS_ORG + "peerOrg1.mspid", "Org1MSP"); defaultProperty(INTEGRATIONTESTS_ORG + "peerOrg1.domname", "org1.example.com"); @@ -255,10 +251,6 @@ public int getDeployWaitTime() { return Integer.parseInt(getProperty(DEPLOYWAITTIME)); } - public int getGossipWaitTime() { - return Integer.parseInt(getProperty(GOSSIPWAITTIME)); - } - public long getProposalWaitTime() { return Integer.parseInt(getProperty(PROPOSALWAITTIME)); } @@ -272,7 +264,6 @@ public SampleOrg getIntegrationTestsSampleOrg(String name) { } - public Properties getPeerProperties(String name) { return getEndPointProperties("peer", name); @@ -283,8 +274,7 @@ public Properties getOrdererProperties(String name) { return getEndPointProperties("orderer", name); - } - + } private Properties getEndPointProperties(final String type, final String name) { @@ -329,5 +319,4 @@ private String getDomainName(final String name) { } - } diff --git a/src/test/java/org/hyperledger/fabric/sdkintegration/End2endAndBackAgainIT.java b/src/test/java/org/hyperledger/fabric/sdkintegration/End2endAndBackAgainIT.java index 8f555628..f0c5730d 100644 --- a/src/test/java/org/hyperledger/fabric/sdkintegration/End2endAndBackAgainIT.java +++ b/src/test/java/org/hyperledger/fabric/sdkintegration/End2endAndBackAgainIT.java @@ -583,14 +583,7 @@ private void waitOnFabric() { ///// NO OP ... leave in case it's needed. private void waitOnFabric(int additional) { - // wait a few seconds for the peers to catch up with each other via the gossip network. - // Another way would be to wait on all the peers event hubs for the event containing the transaction TxID -// try { -// out("Wait %d milliseconds for peers to sync with each other", gossipWaitTime + additional); -// TimeUnit.MILLISECONDS.sleep(gossipWaitTime + additional); -// } catch (InterruptedException e) { -// fail("should not have jumped out of sleep mode. No other threads should be running"); -// } + } private static boolean checkInstalledChaincode(HFClient client, Peer peer, String ccName, String ccPath, String ccVersion) throws InvalidArgumentException, ProposalException { diff --git a/src/test/java/org/hyperledger/fabric/sdkintegration/End2endIT.java b/src/test/java/org/hyperledger/fabric/sdkintegration/End2endIT.java index edda7c1a..750dcd7c 100644 --- a/src/test/java/org/hyperledger/fabric/sdkintegration/End2endIT.java +++ b/src/test/java/org/hyperledger/fabric/sdkintegration/End2endIT.java @@ -18,13 +18,16 @@ import java.io.IOException; import java.net.MalformedURLException; import java.nio.file.Paths; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.Vector; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.apache.commons.codec.binary.Hex; import org.hyperledger.fabric.protos.ledger.rwset.kvrwset.KvRwset; @@ -32,6 +35,7 @@ import org.hyperledger.fabric.sdk.BlockInfo; import org.hyperledger.fabric.sdk.BlockchainInfo; import org.hyperledger.fabric.sdk.ChaincodeEndorsementPolicy; +import org.hyperledger.fabric.sdk.ChaincodeEvent; import org.hyperledger.fabric.sdk.ChaincodeID; import org.hyperledger.fabric.sdk.Channel; import org.hyperledger.fabric.sdk.ChannelConfiguration; @@ -85,6 +89,9 @@ public class End2endIT { private static final String FOO_CHANNEL_NAME = "foo"; private static final String BAR_CHANNEL_NAME = "bar"; + private static final byte[] EXPECTED_EVENT_DATA = "!".getBytes(UTF_8); + private static final String EXPECTED_EVENT_NAME = "event"; + String testTxID = null; // save the CC invoke TxID and use in queries private final TestConfigHelper configHelper = new TestConfigHelper(); @@ -214,6 +221,19 @@ sampleOrgDomainName, format("/users/Admin@%s/msp/keystore", sampleOrgDomainName) //CHECKSTYLE.OFF: Method length is 320 lines (max allowed is 150). void runChannel(HFClient client, Channel channel, boolean installChaincode, SampleOrg sampleOrg, int delta) { + class ChaincodeEventCapture { //A test class to capture chaincode events + final String handle; + final BlockEvent blockEvent; + final ChaincodeEvent chaincodeEvent; + + ChaincodeEventCapture(String handle, BlockEvent blockEvent, ChaincodeEvent chaincodeEvent) { + this.handle = handle; + this.blockEvent = blockEvent; + this.chaincodeEvent = chaincodeEvent; + } + } + Vector chaincodeEvents = new Vector<>(); // Test list to capture chaincode events. + try { final String channelName = channel.getName(); @@ -222,13 +242,35 @@ void runChannel(HFClient client, Channel channel, boolean installChaincode, Samp channel.setTransactionWaitTime(testConfig.getTransactionWaitTime()); channel.setDeployWaitTime(testConfig.getDeployWaitTime()); - Collection channelPeers = channel.getPeers(); Collection orderers = channel.getOrderers(); final ChaincodeID chaincodeID; Collection responses; Collection successful = new LinkedList<>(); Collection failed = new LinkedList<>(); + // Register a chaincode event listener that will trigger for any chaincode id and only for EXPECTED_EVENT_NAME event. + + String chaincodeEventListenerHandle = channel.registerChaincodeEventListener(Pattern.compile(".*"), + Pattern.compile(Pattern.quote(EXPECTED_EVENT_NAME)), + (handle, blockEvent, chaincodeEvent) -> { + + chaincodeEvents.add(new ChaincodeEventCapture(handle, blockEvent, chaincodeEvent)); + + out("RECEIVED Chaincode event with handle: %s, chhaincode Id: %s, chaincode event name: %s, " + + "transaction id: %s, event payload: \"%s\", from eventhub: %s", + handle, chaincodeEvent.getChaincodeId(), + chaincodeEvent.getEventName(), chaincodeEvent.getTxId(), + new String(chaincodeEvent.getPayload()), blockEvent.getEventHub().toString()); + + }); + + //For non foo channel unregister event listener to test events are not called. + if (!isFooChain) { + channel.unRegisterChaincodeEventListener(chaincodeEventListenerHandle); + chaincodeEventListenerHandle = null; + + } + chaincodeID = ChaincodeID.newBuilder().setName(CHAIN_CODE_NAME) .setVersion(CHAIN_CODE_VERSION) .setPath(CHAIN_CODE_PATH).build(); @@ -366,9 +408,11 @@ policy OR(Org1MSP.member, Org2MSP.member) meaning 1 signature from someone in ei transactionProposalRequest.setArgs(new String[] {"move", "a", "b", "100"}); Map tm2 = new HashMap<>(); - tm2.put("HyperLedgerFabric", "TransactionProposalRequest:JavaSDK".getBytes(UTF_8)); - tm2.put("method", "TransactionProposalRequest".getBytes(UTF_8)); - tm2.put("result", ":)".getBytes(UTF_8)); /// This should be returned see chaincode. + tm2.put("HyperLedgerFabric", "TransactionProposalRequest:JavaSDK".getBytes(UTF_8)); //Just some extra junk in transient map + tm2.put("method", "TransactionProposalRequest".getBytes(UTF_8)); // ditto + tm2.put("result", ":)".getBytes(UTF_8)); // This should be returned see chaincode why. + tm2.put(EXPECTED_EVENT_NAME, EXPECTED_EVENT_DATA); //This should trigger an event see chaincode why. + transactionProposalRequest.setTransientMap(tm2); out("sending transactionProposal to all peers with arguments: move(a,b,100)"); @@ -531,6 +575,40 @@ policy OR(Org1MSP.member, Org2MSP.member) meaning 1 signature from someone in ei out("QueryTransactionByID returned TransactionInfo: txID " + txInfo.getTransactionID() + "\n validation code " + txInfo.getValidationCode().getNumber()); + if (chaincodeEventListenerHandle != null) { + + channel.unRegisterChaincodeEventListener(chaincodeEventListenerHandle); + //Should be two. One event in chaincode and two notification for each of the two event hubs + + final int numberEventHubs = channel.getEventHubs().size(); + //just make sure we get the notifications. + for (int i = 15; i > 0; --i) { + if (chaincodeEvents.size() == numberEventHubs) { + break; + } else { + Thread.sleep(90); // wait for the events. + } + + } + assertEquals(numberEventHubs, chaincodeEvents.size()); + + for (ChaincodeEventCapture chaincodeEventCapture : chaincodeEvents) { + assertEquals(chaincodeEventListenerHandle, chaincodeEventCapture.handle); + assertEquals(testTxID, chaincodeEventCapture.chaincodeEvent.getTxId()); + assertEquals(EXPECTED_EVENT_NAME, chaincodeEventCapture.chaincodeEvent.getEventName()); + assertTrue(Arrays.equals(EXPECTED_EVENT_DATA, chaincodeEventCapture.chaincodeEvent.getPayload())); + assertEquals(CHAIN_CODE_NAME, chaincodeEventCapture.chaincodeEvent.getChaincodeId()); + + BlockEvent blockEvent = chaincodeEventCapture.blockEvent; + assertEquals(channelName, blockEvent.getChannelId()); + assertTrue(channel.getEventHubs().contains(blockEvent.getEventHub())); + + } + + } else { + assertTrue(chaincodeEvents.isEmpty()); + } + out("Running for Channel %s done", channelName); } catch (Exception e) { @@ -630,14 +708,8 @@ static void out(String format, Object... args) { } private void waitOnFabric(int additional) { - // wait a few seconds for the peers to catch up with each other via the gossip network. - // Another way would be to wait on all the peers event hubs for the event containing the transaction TxID -// try { -// out("Wait %d milliseconds for peers to sync with each other", gossipWaitTime + additional); -// TimeUnit.MILLISECONDS.sleep(gossipWaitTime + additional); -// } catch (InterruptedException e) { -// fail("should not have jumped out of sleep mode. No other threads should be running"); -// } + //NOOP today + } private static final Map TX_EXPECTED; @@ -712,6 +784,18 @@ void blockWalker(Channel channel) throws InvalidArgumentException, ProposalExcep out(" Transaction action %d proposal response payload: %s", j, printableString(new String(transactionActionInfo.getProposalResponsePayload()))); + // Check to see if we have our expected event. + if (blockNumber == 2) { + ChaincodeEvent chaincodeEvent = transactionActionInfo.getEvent(); + assertNotNull(chaincodeEvent); + + assertTrue(Arrays.equals(EXPECTED_EVENT_DATA, chaincodeEvent.getPayload())); + assertEquals(testTxID, chaincodeEvent.getTxId()); + assertEquals(CHAIN_CODE_NAME, chaincodeEvent.getChaincodeId()); + assertEquals(EXPECTED_EVENT_NAME, chaincodeEvent.getEventName()); + + } + TxReadWriteSetInfo rwsetInfo = transactionActionInfo.getTxReadWriteSet(); if (null != rwsetInfo) { out(" Transaction action %d has %d name space read write sets", j, rwsetInfo.getNsRwsetCount());