Skip to content

Commit

Permalink
node-SDK: add multi-callback registrations
Browse files Browse the repository at this point in the history
This patch allows the sdk to handle multiple registrations
on the same event criteria. It also enables registrations for
all events for a particular chaincode.

Change-Id: I2a4b78b682c8f34555ce89b7a03c6cc690d89b2a
Signed-off-by: Patrick Mullaney <pm.mullaney@gmail.com>
  • Loading branch information
pmullaney committed Sep 23, 2016
1 parent 967ed86 commit 4d4df36
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions sdk/node/src/hfc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2842,15 +2842,16 @@ export function newFileKeyValStore(dir:string):KeyValStore {
/**
* The ChainCodeCBE is used internal to the EventHub to hold chaincode event registration callbacks.
*/
class ChainCodeCBE {
export class ChainCodeCBE {
// chaincode id
ccid: string;
eventname: string;
payload: Uint8Array;
// event name regex filter
eventNameFilter: RegExp;
// callback function to invoke on successful filter match
cb: Function;
constructor(ccid: string,eventname: string,payload: Uint8Array, cb: Function) {
constructor(ccid: string, eventNameFilter: string, cb: Function) {
this.ccid = ccid;
this.eventname = eventname;
this.payload = payload;
this.eventNameFilter = new RegExp(eventNameFilter);
this.cb = cb;
}
}
Expand Down Expand Up @@ -2879,7 +2880,7 @@ export class EventHub {
this.chaincodeRegistrants = new HashTable();
this.blockRegistrants = new Set();
this.txRegistrants = new HashTable();
this.peeraddr = "localhost:7053";
this.peeraddr = null;
this.connected = false;
}

Expand All @@ -2893,6 +2894,7 @@ export class EventHub {

public connect() {
if (this.connected) return;
if (!this.peeraddr) throw Error("Must set peer address before connecting.");
this.events = grpc.load(__dirname + "/protos/events.proto" ).protos;
this.client = new this.events.Events(this.peeraddr,grpc.credentials.createInsecure());
this.call = this.client.chat();
Expand All @@ -2902,11 +2904,15 @@ export class EventHub {
let eh = this; // for callback context
this.call.on('data', function(event) {
if ( event.Event == "chaincodeEvent" ) {
var cbe = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID + "/" + event.chaincodeEvent.eventName);
if ( cbe ) {
cbe.payload = event.chaincodeEvent.payload;
cbe.cb(cbe);
}
var cbtable = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID);
if( !cbtable ) {
return;
}
cbtable.forEach(function (cbe) {
if ( cbe.eventNameFilter.test(event.chaincodeEvent.eventName)) {
cbe.cb(event.chaincodeEvent);
}
});
} else if ( event.Event == "block") {
eh.blockRegistrants.forEach(function(cb){
cb(event.block);
Expand All @@ -2928,19 +2934,35 @@ export class EventHub {
this.connected = false;
}

public registerChaincodeEvent(ccid: string, eventname: string, callback: Function){
public registerChaincodeEvent(ccid: string, eventname: string, callback: Function): ChainCodeCBE {
if (!this.connected) return;
let cb = new ChainCodeCBE(ccid, eventname, null, callback);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: eventname }} ] }};
this.chaincodeRegistrants.put(ccid + "/" + eventname, cb);
this.call.write(register);
let cb = new ChainCodeCBE(ccid, eventname, callback);
let cbtable = this.chaincodeRegistrants.get(ccid);
if ( !cbtable ) {
cbtable = new Set();
this.chaincodeRegistrants.put(ccid, cbtable);
cbtable.add(cb);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: "" }} ] }};
this.call.write(register);
} else {
cbtable.add(cb);
}
return cb;
}

public unregisterChaincodeEvent(ccid: string, eventname: string){
public unregisterChaincodeEvent(cbe: ChainCodeCBE){
if (!this.connected) return;
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid, eventName: eventname }} ] }};
this.chaincodeRegistrants.remove(ccid + "/" + eventname);
this.call.write(unregister);
let cbtable = this.chaincodeRegistrants.get(cbe.ccid);
if ( !cbtable ) {
debug("No event registration for ccid %s ", cbe.ccid);
return;
}
cbtable.delete(cbe);
if( cbtable.size <= 0 ) {
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: cbe.ccid, eventName: "" }} ] }};
this.chaincodeRegistrants.remove(cbe.ccid);
this.call.write(unregister);
}
}

public registerBlockEvent(callback:Function){
Expand Down

0 comments on commit 4d4df36

Please sign in to comment.