Skip to content

Commit

Permalink
Merge "FAB-11220 Samples - remove EventHub"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Jul 26, 2018
2 parents c4bdc68 + a4a15cb commit 30d36be
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 74 deletions.
60 changes: 3 additions & 57 deletions balance-transfer/app/join-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,50 +47,10 @@ var joinChannel = async function(channel_name, peers, username, org_name) {
};
let genesis_block = await channel.getGenesisBlock(request);

// tell each peer to join and wait for the event hub of each peer to tell us
// that the channel has been created on each peer
// tell each peer to join and wait 10 seconds
// for the channel to be created on each peer
var promises = [];
var block_registration_numbers = [];
let event_hubs = client.getEventHubsForOrg(org_name);
event_hubs.forEach((eh) => {
let configBlockPromise = new Promise((resolve, reject) => {
let event_timeout = setTimeout(() => {
let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr;
logger.error(message);
eh.disconnect();
reject(new Error(message));
}, 60000);
let block_registration_number = eh.registerBlockEvent((block) => {
clearTimeout(event_timeout);
// a peer may have more than one channel so
// we must check that this block came from the channel we
// asked the peer to join
if (block.data.data.length === 1) {
// Config block must only contain one transaction
var channel_header = block.data.data[0].payload.header.channel_header;
if (channel_header.channel_id === channel_name) {
let message = util.format('EventHub % has reported a block update for channel %s',eh._ep._endpoint.addr,channel_name);
logger.info(message)
resolve(message);
} else {
let message = util.format('Unknown channel block event received from %s',eh._ep._endpoint.addr);
logger.error(message);
reject(new Error(message));
}
}
}, (err) => {
clearTimeout(event_timeout);
let message = 'Problem setting up the event hub :'+ err.toString();
logger.error(message);
reject(new Error(message));
});
// save the registration handle so able to deregister
block_registration_numbers.push(block_registration_number);
all_eventhubs.push(eh); //save for later so that we can shut it down
});
promises.push(configBlockPromise);
eh.connect(); //this opens the event stream that must be shutdown at some point with a disconnect()
});
promises.push(new Promise(resolve => setTimeout(resolve, 10000)));

let join_request = {
targets: peers, //using the peer names which only is allowed when a connection profile is loaded
Expand All @@ -116,20 +76,6 @@ var joinChannel = async function(channel_name, peers, username, org_name) {
logger.error(message);
}
}
// now see what each of the event hubs reported
for(let i in results) {
let event_hub_result = results[i];
let event_hub = event_hubs[i];
let block_registration_number = block_registration_numbers[i];
logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr);
if(typeof event_hub_result === 'string') {
logger.debug(event_hub_result);
} else {
if(!error_message) error_message = event_hub_result.toString();
logger.debug(event_hub_result.toString());
}
event_hub.unregisterBlockEvent(block_registration_number);
}
} catch(error) {
logger.error('Failed to join channel due to error: ' + error.stack ? error.stack : error);
error_message = error.toString();
Expand Down
5 changes: 0 additions & 5 deletions balance-transfer/artifacts/network-config-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ peers:
# this URL is used to send endorsement and query requests
url: grpcs://ec2-13-59-99-140.us-east-2.compute.amazonaws.com:7051

# this URL is used to connect the EventHub and registering event listeners
eventUrl: grpcs://ec2-13-59-99-140.us-east-2.compute.amazonaws.com:7053

grpcOptions:
ssl-target-name-override: peer0.org1.example.com
tlsCACerts:
Expand All @@ -185,15 +182,13 @@ peers:

peer0.org2.example.com:
url: grpcs://ec2-13-59-99-140.us-east-2.compute.amazonaws.com:8051
eventUrl: grpcs://ec2-13-59-99-140.us-east-2.compute.amazonaws.com:8053
grpcOptions:
ssl-target-name-override: peer0.org2.example.com
tlsCACerts:
path: artifacts/channel/crypto-config/peerOrganizations/org2.example.com/peers/peer0.org2.example.com/tls/ca.crt

peer1.org2.example.com:
url: grpcs://ec2-13-59-99-140.us-east-2.compute.amazonaws.com:8056
eventUrl: grpcs://ec2-13-59-99-140.us-east-2.compute.amazonaws.com:8058
grpcOptions:
ssl-target-name-override: peer1.org2.example.com
tlsCACerts:
Expand Down
5 changes: 0 additions & 5 deletions balance-transfer/artifacts/network-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,20 @@ peers:
# this URL is used to send endorsement and query requests
url: grpcs://localhost:7051

# this URL is used to connect the EventHub and registering event listeners
eventUrl: grpcs://localhost:7053

grpcOptions:
ssl-target-name-override: peer0.org1.example.com
tlsCACerts:
path: artifacts/channel/crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/tls/ca.crt

peer1.org1.example.com:
url: grpcs://localhost:7056
eventUrl: grpcs://localhost:7058
grpcOptions:
ssl-target-name-override: peer1.org1.example.com
tlsCACerts:
path: artifacts/channel/crypto-config/peerOrganizations/org1.example.com/peers/peer1.org1.example.com/tls/ca.crt

peer0.org2.example.com:
url: grpcs://localhost:8051
eventUrl: grpcs://localhost:8053
grpcOptions:
ssl-target-name-override: peer0.org2.example.com
tlsCACerts:
Expand Down
15 changes: 8 additions & 7 deletions fabcar/invoke.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,39 @@ Fabric_Client.newDefaultKeyValueStore({ path: store_path

// get an eventhub once the fabric client has a user assigned. The user
// is required bacause the event registration must be signed
let event_hub = fabric_client.newEventHub();
event_hub.setPeerAddr('grpc://localhost:7053');
let event_hub = channel.newChannelEventHub(peer);

// using resolve the promise so that result status may be processed
// under the then clause rather than having the catch clause process
// the status
let txPromise = new Promise((resolve, reject) => {
let handle = setTimeout(() => {
event_hub.unregisterTxEvent(transaction_id_string);
event_hub.disconnect();
resolve({event_status : 'TIMEOUT'}); //we could use reject(new Error('Trnasaction did not complete within 30 seconds'));
}, 3000);
event_hub.connect();
event_hub.registerTxEvent(transaction_id_string, (tx, code) => {
// this is the callback for transaction event status
// first some clean up of event listener
clearTimeout(handle);
event_hub.unregisterTxEvent(transaction_id_string);
event_hub.disconnect();

// now let the application know what happened
var return_status = {event_status : code, tx_id : transaction_id_string};
if (code !== 'VALID') {
console.error('The transaction was invalid, code = ' + code);
resolve(return_status); // we could use reject(new Error('Problem with the tranaction, event status ::'+code));
} else {
console.log('The transaction has been committed on peer ' + event_hub._ep._endpoint.addr);
console.log('The transaction has been committed on peer ' + event_hub.getPeerAddr());
resolve(return_status);
}
}, (err) => {
//this is the callback if something goes wrong with the event registration or processing
reject(new Error('There was a problem with the eventhub ::'+err));
});
},
{disconnect: true} //disconnect when complete
);
event_hub.connect();

});
promises.push(txPromise);

Expand Down

0 comments on commit 30d36be

Please sign in to comment.