Skip to content
This repository has been archived by the owner on Mar 8, 2020. It is now read-only.

Commit

Permalink
[Master] Add checks for eventhub failure (#4492)
Browse files Browse the repository at this point in the history
Missing checks in startListening and waitForEvents to ensure we
can still get a commit notification

Signed-off-by: Dave Kelsey <d_kelsey@uk.ibm.com>
  • Loading branch information
Dave Kelsey authored Oct 29, 2018
1 parent 985af23 commit 670bb83
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 28 deletions.
2 changes: 1 addition & 1 deletion packages/composer-connector-hlfv1/lib/hlfconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ class HLFConnection extends Connection {
const proposal = results[1];
const header = results[2];

eventHandler = HLFConnection.createTxEventHandler(this.eventHubs, txId.getTransactionID(), this.commitTimeout);
eventHandler = HLFConnection.createTxEventHandler(this.eventHubs, txId.getTransactionID(), this.commitTimeout, this.requiredEventHubs);
eventHandler.startListening();
LOG.debug(method, 'TxEventHandler started listening, sending valid responses to the orderer');
LOG.perf(method, 'Total duration to prepare proposals for orderer: ', txId, t0);
Expand Down
29 changes: 23 additions & 6 deletions packages/composer-connector-hlfv1/lib/hlftxeventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ class HLFTxEventHandler {
* @param {EventHub[]} eventHubs the event hubs to listen for tx events
* @param {String} txId the txid that is driving the events to occur
* @param {Integer} timeout how long (in seconds) to wait for events to occur.
* @param {Integer} requiredEventHubs number of required event hubs, either 0 or 1, default to 0
* for old composer be
*/
constructor(eventHubs, txId, timeout) {
constructor(eventHubs, txId, timeout, requiredEventHubs = 1) {
const method = 'constructor';
// Don't log the eventHub objects they are too large
LOG.entry(method, txId, timeout);
LOG.entry(method, txId, timeout, requiredEventHubs);
this.eventHubs = eventHubs || [];
this.txId = txId || '';
this.listenerPromises = [];
this.timeoutHandles = [];
this.timeout = timeout || 0;
this.responseCount = 0;
this.requiredEventHubs = requiredEventHubs;
LOG.exit(method);
}

Expand All @@ -66,6 +70,7 @@ class HLFTxEventHandler {

eh.registerTxEvent(this.txId,
(tx, code) => {
this.responseCount++;
clearTimeout(handle);
eh.unregisterTxEvent(this.txId);
if (code !== 'VALID') {
Expand All @@ -90,23 +95,35 @@ class HLFTxEventHandler {
this.timeoutHandles.push(handle);
}
});
if (this.listenerPromises.length < this.requiredEventHubs) {
this.cancelListening();
const msg = 'No connected event hubs. It is required that at least 1 event hub has been connected to receive the commit event';
LOG.error(method, msg);
throw Error(msg);
}
LOG.exit(method);
}

/**
* wait for all event hubs to send the tx event.
* @returns {Promise} a promise which is resolved when all the events have been received, rejected if an error occurs.
*/
waitForEvents() {
async waitForEvents() {
const method = 'waitForEvents';
LOG.entry(method);
// don't need to check against requiredEventHubs as startListening has already checked
// this listenerPromises.length. This ensures the same composer behaviour if requiredEventHubs
// is set to 0.
if (this.listenerPromises.length > 0) {
await Promise.all(this.listenerPromises);
if (this.responseCount < this.requiredEventHubs) {
const msg = 'No event hubs responded. It is required that at least 1 event hub responds with a commit event';
LOG.error(method, msg);
throw Error(msg);
}
LOG.exit(method);
return Promise.all(this.listenerPromises);
}
LOG.warn(method, `No event hubs available to listen on to wait for a commit for transaction '${this.txId}'`);
LOG.exit(method);
return Promise.resolve();
}

/**
Expand Down
110 changes: 89 additions & 21 deletions packages/composer-connector-hlfv1/test/hlftxeventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,28 @@ describe('HLFTxEventHandler', () => {
});

describe('#startListening', () => {
it('Should do nothing if no events hubs', () => {
it('Should do nothing if no events hubs and none are required', () => {
sandbox.stub(global, 'setTimeout');
let evHandler = new HLFTxEventHandler(null, null, null);
let evHandler = new HLFTxEventHandler(null, null, null, 0);
evHandler.startListening();
sinon.assert.notCalled(global.setTimeout);
evHandler = new HLFTxEventHandler([], '1234', 100);
evHandler = new HLFTxEventHandler([], '1234', 100, 0);
evHandler.startListening();
sinon.assert.notCalled(global.setTimeout);
});

it('Should throw an error if no event hubs and an event hub is required', () => {
sandbox.stub(global, 'setTimeout');
let evHandler = new HLFTxEventHandler(null, null, null);
(() => {
evHandler.startListening();
}).should.throw(/No connected event hubs/);
sinon.assert.notCalled(global.setTimeout);
evHandler = new HLFTxEventHandler([], '1234', 100);
(() => {
evHandler.startListening();
}).should.throw(/No connected event hubs/);
sinon.assert.notCalled(global.setTimeout);
});

it('Should set up a timeout and register for a single event hub', () => {
Expand Down Expand Up @@ -110,8 +123,6 @@ describe('HLFTxEventHandler', () => {
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
});


});

it('Should handle a transaction event response which is valid', async () => {
Expand All @@ -129,7 +140,6 @@ describe('HLFTxEventHandler', () => {
} catch(err) {
should.fail(null,null,`${err} unexpected`);
}

});

it('Should handle a transaction event response which is not valid', async () => {
Expand All @@ -150,7 +160,7 @@ describe('HLFTxEventHandler', () => {
}
});

it('Should handle a transaction error response', async () => {
it('Should handle a transaction listener error response on the only single event hub', async () => {
sandbox.stub(global, 'setTimeout');
sandbox.stub(global, 'clearTimeout');
sandbox.stub(HLFUtil, 'eventHubConnected').withArgs(eventhub1).returns(true);
Expand All @@ -160,16 +170,76 @@ describe('HLFTxEventHandler', () => {
sinon.assert.calledOnce(global.clearTimeout);
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
await evHandler.waitForEvents().should.be.rejectedWith(/No event hubs responded/);
sinon.assert.calledOnce(logWarnSpy);
try {
await evHandler.waitForEvents();
} catch(err) {
should.fail(null,null,`${err} unexpected`);
}

});
});

it('Should handle a transaction listener error response all event hubs and event hubs required', async () => {
sandbox.stub(global, 'setTimeout');
sandbox.stub(global, 'clearTimeout');
const ehc = sandbox.stub(HLFUtil, 'eventHubConnected');
ehc.withArgs(eventhub1).returns(true);
ehc.withArgs(eventhub2).returns(true);
let evHandler = new HLFTxEventHandler([eventhub1, eventhub2], '1234', 31);
evHandler.startListening();
eventhub1.registerTxEvent.callArgWith(2, new Error('lost connection'));
eventhub2.registerTxEvent.callArgWith(2, new Error('lost connection'));
sinon.assert.calledTwice(global.clearTimeout);
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
await evHandler.waitForEvents().should.be.rejectedWith(/No event hubs responded/);
sinon.assert.calledTwice(logWarnSpy);
});

it('Should handle a transaction listener error response all event hubs but no event hubs required', async () => {
sandbox.stub(global, 'setTimeout');
sandbox.stub(global, 'clearTimeout');
const ehc = sandbox.stub(HLFUtil, 'eventHubConnected');
ehc.withArgs(eventhub1).returns(true);
ehc.withArgs(eventhub2).returns(true);
let evHandler = new HLFTxEventHandler([eventhub1, eventhub2], '1234', 31, 0);
evHandler.startListening();
eventhub1.registerTxEvent.callArgWith(2, new Error('lost connection'));
eventhub2.registerTxEvent.callArgWith(2, new Error('lost connection'));
sinon.assert.calledTwice(global.clearTimeout);
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
try {
await evHandler.waitForEvents();
} catch(err) {
should.fail(null,null,`${err} unexpected`);
}
sinon.assert.calledThrice(logWarnSpy);
});

it('Should handle a transaction listener error response on one but not all of the event hubs', async () => {
sandbox.stub(global, 'setTimeout');
sandbox.stub(global, 'clearTimeout');
const ehc = sandbox.stub(HLFUtil, 'eventHubConnected');
ehc.withArgs(eventhub1).returns(true);
ehc.withArgs(eventhub2).returns(true);
let evHandler = new HLFTxEventHandler([eventhub1, eventhub2], '1234', 31);
evHandler.startListening();
eventhub1.registerTxEvent.callArgWith(2, new Error('lost connection'));
eventhub2.registerTxEvent.yield('1234', 'VALID');
sinon.assert.calledTwice(global.clearTimeout);
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
sinon.assert.calledOnce(logWarnSpy);
try {
await evHandler.waitForEvents();
} catch(err) {
should.fail(null, null, `${err} unexpected`);
}
});

describe('#waitForEvents', () => {
it('Should do nothing if no events hubs', () => {
let evHandler = new HLFTxEventHandler(null, null, null);
Expand All @@ -192,8 +262,8 @@ describe('HLFTxEventHandler', () => {
evHandler = new HLFTxEventHandler([eventhub1], '1234', 31);
evHandler.startListening();
evHandler.listenerPromises[0].should.be.instanceOf(Promise);
evHandler.listenerPromises[0] = Promise.reject();
evHandler.waitForEvents().should.eventually.be.rejected;
evHandler.listenerPromises[0] = Promise.reject(new Error('some error'));
evHandler.waitForEvents().should.eventually.be.rejectedWith(/some error/);
});

it('Should do wait more than 1 event', () => {
Expand All @@ -206,8 +276,8 @@ describe('HLFTxEventHandler', () => {
evHandler.listenerPromises[0].should.be.instanceOf(Promise);
evHandler.listenerPromises[0] = Promise.resolve();
evHandler.listenerPromises[1].should.be.instanceOf(Promise);
evHandler.listenerPromises[1] = Promise.reject();
evHandler.waitForEvents().should.eventually.be.rejected;
evHandler.listenerPromises[1] = Promise.reject(new Error('some error'));
evHandler.waitForEvents().should.eventually.be.rejectedWith(/some error/);
});

it('Should handle timeout for an event', () => {
Expand Down Expand Up @@ -248,7 +318,6 @@ describe('HLFTxEventHandler', () => {
sinon.assert.calledWith(global.clearTimeout.firstCall, 'handle1');
sinon.assert.calledWith(global.clearTimeout.secondCall, 'handle2');
});

});

it('Should handle a transaction event response which isn\'t valid', () => {
Expand All @@ -273,7 +342,6 @@ describe('HLFTxEventHandler', () => {
sinon.assert.calledWith(global.clearTimeout.firstCall, 'handle1');
sinon.assert.calledWith(global.clearTimeout.secondCall, 'handle2');
});

});

});
Expand All @@ -292,11 +360,11 @@ describe('HLFTxEventHandler', () => {
it('Should do nothing if no events hubs', () => {
sandbox.stub(global, 'setTimeout');
sandbox.stub(global, 'clearTimeout');
let evHandler = new HLFTxEventHandler(null, null, null);
let evHandler = new HLFTxEventHandler(null, null, null, 0);
evHandler.startListening();
evHandler.cancelListening();
sinon.assert.notCalled(global.clearTimeout);
evHandler = new HLFTxEventHandler([], '1234', 100);
evHandler = new HLFTxEventHandler([], '1234', 100, 0);
evHandler.startListening();
evHandler.cancelListening();
sinon.assert.notCalled(global.clearTimeout);
Expand Down

0 comments on commit 670bb83

Please sign in to comment.