From ffd7a2576c12d04fd7ae07115384ca8a2fd47867 Mon Sep 17 00:00:00 2001 From: Bret Harrison Date: Fri, 19 Jan 2018 16:39:34 -0500 Subject: [PATCH] FAB-6400 Balance-transfer filtered events Update sample code to use the channel-based events. The sample will also use the new connection profile API to get a list of channel-based NodeSDK event hubs using filtered blocks, automatic unregistration, and automatic disconnect, all new features of channel-based events. This will demostrate the most common use case for events. The sample code will require the NodeSDK to be at 1.1 alpha. Change-Id: Id9f2b37f02d7d662b7ca1016586560ee4c595992 Signed-off-by: Bret Harrison --- balance-transfer/app/instantiate-chaincode.js | 44 +++++++++---------- balance-transfer/app/invoke-transaction.js | 43 ++++++++---------- balance-transfer/package.json | 4 +- 3 files changed, 41 insertions(+), 50 deletions(-) diff --git a/balance-transfer/app/instantiate-chaincode.js b/balance-transfer/app/instantiate-chaincode.js index a7c51cd9b6..923be6b8c8 100644 --- a/balance-transfer/app/instantiate-chaincode.js +++ b/balance-transfer/app/instantiate-chaincode.js @@ -25,7 +25,7 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha logger.debug('\n\n============ Instantiate chaincode on channel ' + channelName + ' ============\n'); var error_message = null; - var eventhubs_in_use = []; + try { // first setup the client for this org var client = await helper.getClientForOrg(org_name, username); @@ -84,27 +84,25 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha logger.info(util.format( 'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s', proposalResponses[0].response.status, proposalResponses[0].response.message, - proposalResponses[0].response.payload, proposalResponses[0].endorsement - .signature)); + proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature)); - // tell each peer to join and wait for the event hub of each peer to tell us - // that the channel has been created on each peer + // wait for the channel-based event hub to tell us that the + // instantiate transaction was committed on the peer var promises = []; - let event_hubs = client.getEventHubsForOrg(org_name); + let event_hubs = channel.getChannelEventHubsForOrg(); logger.debug('found %s eventhubs for this organization %s',event_hubs.length, org_name); event_hubs.forEach((eh) => { let instantiateEventPromise = new Promise((resolve, reject) => { logger.debug('instantiateEventPromise - setting up event'); let event_timeout = setTimeout(() => { - let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr; + let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr(); logger.error(message); eh.disconnect(); - reject(new Error(message)); }, 60000); - eh.registerTxEvent(deployId, (tx, code) => { - logger.info('The chaincode instantiate transaction has been committed on peer %s',eh._ep._endpoint.addr); + eh.registerTxEvent(deployId, (tx, code, block_num) => { + logger.info('The chaincode instantiate transaction has been committed on peer %s',eh.getPeerAddr()); + logger.info('Transaction %s has status of %s in blocl %s', tx, code, block_num); clearTimeout(event_timeout); - eh.unregisterTxEvent(deployId); if (code !== 'VALID') { let message = until.format('The chaincode instantiate transaction was invalid, code:%s',code); @@ -117,15 +115,18 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha } }, (err) => { clearTimeout(event_timeout); - eh.unregisterTxEvent(deployId); - let message = 'Problem setting up the event hub :'+ err.toString(); - logger.error(message); - reject(new Error(message)); - }); + logger.error(err); + reject(err); + }, + // the default for 'unregister' is true for transaction listeners + // so no real need to set here, however for 'disconnect' + // the default is false as most event hubs are long running + // in this use case we are using it only once + {unregister: true, disconnect: true} + ); + eh.connect(); }); promises.push(instantiateEventPromise); - eh.connect(); - eventhubs_in_use.push(eh); }); var orderer_request = { @@ -155,7 +156,7 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha for(let i in results) { let event_hub_result = results[i]; let event_hub = event_hubs[i]; - logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr); + logger.debug('Event results for event hub :%s',event_hub.getPeerAddr()); if(typeof event_hub_result === 'string') { logger.debug(event_hub_result); } else { @@ -172,11 +173,6 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha error_message = error.toString(); } - // need to shutdown open event streams - eventhubs_in_use.forEach((eh) => { - eh.disconnect(); - }); - if (!error_message) { let message = util.format( 'Successfully instantiate chaingcode in organization %s to the channel \'%s\'', diff --git a/balance-transfer/app/invoke-transaction.js b/balance-transfer/app/invoke-transaction.js index ff9f9c6d39..9232f70713 100644 --- a/balance-transfer/app/invoke-transaction.js +++ b/balance-transfer/app/invoke-transaction.js @@ -24,7 +24,6 @@ var logger = helper.getLogger('invoke-chaincode'); var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, args, username, org_name) { logger.debug(util.format('\n============ invoke transaction on channel %s ============\n', channelName)); var error_message = null; - var eventhubs_in_use = []; var tx_id_string = null; try { // first setup the client for this org @@ -78,26 +77,24 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, logger.info(util.format( 'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s', proposalResponses[0].response.status, proposalResponses[0].response.message, - proposalResponses[0].response.payload, proposalResponses[0].endorsement - .signature)); + proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature)); - // tell each peer to join and wait for the event hub of each peer to tell us - // that the channel has been created on each peer + // wait for the channel-based event hub to tell us + // that the commit was good or bad on each peer in our organization var promises = []; - let event_hubs = client.getEventHubsForOrg(org_name); + let event_hubs = channel.getChannelEventHubsForOrg(); event_hubs.forEach((eh) => { logger.debug('invokeEventPromise - setting up event'); let invokeEventPromise = new Promise((resolve, reject) => { let event_timeout = setTimeout(() => { - let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr; + let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr(); logger.error(message); eh.disconnect(); - reject(new Error(message)); }, 3000); - eh.registerTxEvent(tx_id_string, (tx, code) => { - logger.info('The chaincode invoke chaincode transaction has been committed on peer %s',eh._ep._endpoint.addr); + eh.registerTxEvent(tx_id_string, (tx, code, block_num) => { + logger.info('The chaincode invoke chaincode transaction has been committed on peer %s',eh.getPeerAddr()); + logger.info('Transaction %s has status of %s in blocl %s', tx, code, block_num); clearTimeout(event_timeout); - eh.unregisterTxEvent(tx_id_string); if (code !== 'VALID') { let message = util.format('The invoke chaincode transaction was invalid, code:%s',code); @@ -110,15 +107,18 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, } }, (err) => { clearTimeout(event_timeout); - eh.unregisterTxEvent(tx_id_string); - let message = 'Problem setting up the event hub :'+ err.toString(); - logger.error(message); - reject(new Error(message)); - }); + logger.error(err); + reject(err); + }, + // the default for 'unregister' is true for transaction listeners + // so no real need to set here, however for 'disconnect' + // the default is false as most event hubs are long running + // in this use case we are using it only once + {unregister: true, disconnect: true} + ); + eh.connect(); }); promises.push(invokeEventPromise); - eh.connect(); - eventhubs_in_use.push(eh); }); var orderer_request = { @@ -144,7 +144,7 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, for(let i in results) { let event_hub_result = results[i]; let event_hub = event_hubs[i]; - logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr); + logger.debug('Event results for event hub :%s',event_hub.getPeerAddr()); if(typeof event_hub_result === 'string') { logger.debug(event_hub_result); } else { @@ -161,11 +161,6 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, error_message = error.toString(); } - // need to shutdown open event streams - eventhubs_in_use.forEach((eh) => { - eh.disconnect(); - }); - if (!error_message) { let message = util.format( 'Successfully invoked the chaincode %s to the channel \'%s\' for transaction ID: %s', diff --git a/balance-transfer/package.json b/balance-transfer/package.json index fd5554bcb0..fa51c95d6d 100644 --- a/balance-transfer/package.json +++ b/balance-transfer/package.json @@ -12,8 +12,8 @@ "v1.0 fabric nodesdk sample" ], "engines": { - "node": ">=6.9.5 <7.0", - "npm": ">=3.10.10 <4.0" + "node": ">=8.9.4 <9.0", + "npm": ">=5.6.0 <6.0" }, "license": "Apache-2.0", "dependencies": {