Skip to content

Commit

Permalink
FABN-955: Additional logging for tx event handling
Browse files Browse the repository at this point in the history
Change-Id: Id476eb7152f24f4e894c258902dbc901c70dfaec
Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Oct 4, 2018
1 parent aaa6c16 commit a63c19c
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 98 deletions.
22 changes: 9 additions & 13 deletions fabric-network/lib/impl/event/abstracteventstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

'use strict';

const logger = require('fabric-network/lib/logger').getLogger('AbstractStrategy');

/**
Expand All @@ -19,23 +21,16 @@ const logger = require('fabric-network/lib/logger').getLogger('AbstractStrategy'
class AbstractEventStrategy {
/**
* Constructor.
* @param {EventHubFactory} eventHubFactory Factory for obtaining event hubs for peers.
* @param {ChannelPeer[]} peers Peers from which to process events.
* @param {Promise.ChannelEventHub[]} eventHubsPromise Promise to event hubs for which to process events.
*/
constructor(eventHubFactory, peers) {
if (!eventHubFactory) {
const message = 'Event hub factory not set';
logger.error('constructor:', message);
throw new Error(message);
}
if (!peers || peers.length === 0) {
const message = 'Peers not set';
constructor(eventHubsPromise) {
if (!(eventHubsPromise instanceof Promise)) {
const message = 'Expected event hubs to be a Promise but was ' + typeof eventHubsPromise;
logger.error('constructor:', message);
throw new Error(message);
}

this.eventHubFactory = eventHubFactory;
this.peers = peers;
this.eventHubsPromise = eventHubsPromise;
this.counts = {
success: 0,
fail: 0,
Expand All @@ -47,10 +42,11 @@ class AbstractEventStrategy {
* Called by event handler to obtain the event hubs to which it should listen. Gives an opportunity for
* the strategy to store information on the events it expects to receive for later use in event handling.
* @async
* @returns ChannelEventHubs[] connected event hubs.
* @throws {Error} if the connected event hubs do not satisfy the strategy.
*/
async getConnectedEventHubs() {
const eventHubs = await this.eventHubFactory.getEventHubs(this.peers);
const eventHubs = await this.eventHubsPromise;
const connectedEventHubs = eventHubs.filter((eventHub) => eventHub.isconnected());

if (connectedEventHubs.length === 0) {
Expand Down
12 changes: 4 additions & 8 deletions fabric-network/lib/impl/event/allfortxstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

'use strict';

const AbstractEventStrategy = require('fabric-network/lib/impl/event/abstracteventstrategy');

const logger = require('../../logger').getLogger('AllForTxStrategy');
const logger = require('fabric-network/lib/logger').getLogger('AllForTxStrategy');

/**
* Event handling strategy that:
Expand All @@ -19,17 +21,11 @@ const logger = require('../../logger').getLogger('AllForTxStrategy');
* @class
*/
class AllForTxStrategy extends AbstractEventStrategy {
/**
* @inheritdoc
*/
constructor(eventHubFactory, peers) {
super(eventHubFactory, peers);
}

/**
* @inheritdoc
*/
checkCompletion(counts, successFn, failFn) {
logger.debug('checkCompletion:', counts);
const isAllResponsesReceived = (counts.success + counts.fail === counts.expected);
if (isAllResponsesReceived) {
if (counts.success > 0) {
Expand Down
12 changes: 4 additions & 8 deletions fabric-network/lib/impl/event/anyfortxstrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

'use strict';

const AbstractEventStrategy = require('fabric-network/lib/impl/event/abstracteventstrategy');

const logger = require('../../logger').getLogger('AnyForTxStrategy');
const logger = require('fabric-network/lib/logger').getLogger('AnyForTxStrategy');

/**
* Event handling strategy that:
Expand All @@ -19,17 +21,11 @@ const logger = require('../../logger').getLogger('AnyForTxStrategy');
* @class
*/
class AnyForTxStrategy extends AbstractEventStrategy {
/**
* @inheritdoc
*/
constructor(eventHubFactory, peers) {
super(eventHubFactory, peers);
}

/**
* @inheritdoc
*/
checkCompletion(counts, successFn, failFn) {
logger.debug('checkCompletion:', counts);
const isAllResponsesReceived = (counts.success + counts.fail === counts.expected);
if (counts.success > 0) {
successFn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
const EventHandlerStrategies = require('./defaulteventhandlerstrategies');
const TransactionEventHandler = require('./transactioneventhandler');
const EventHubFactory = require('./eventhubfactory');
const logger = require('../../logger').getLogger('DefaultEventHandlerManager');
const logger = require('fabric-network/lib/logger').getLogger('DefaultEventHandlerManager');

class DefaultEventHandlerManager {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const AnyForTxStrategy = require('fabric-network/lib/impl/event/anyfortxstrategy
*/
function MSPID_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
const peers = network.getPeerMap().get(mspId);
return new AllForTxStrategy(eventHubFactory, peers);
return new AllForTxStrategy(eventHubFactory.getEventHubs(peers));
}

/**
Expand All @@ -29,7 +29,7 @@ function MSPID_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
*/
function MSPID_SCOPE_ANYFORTX(eventHubFactory, network, mspId) {
const peers = network.getPeerMap().get(mspId);
return new AnyForTxStrategy(eventHubFactory, peers);
return new AnyForTxStrategy(eventHubFactory.getEventHubs(peers));
}

/**
Expand All @@ -42,7 +42,7 @@ function MSPID_SCOPE_ANYFORTX(eventHubFactory, network, mspId) {
//eslint-disable-next-line no-unused-vars
function NETWORK_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
const peers = network.getChannel().getPeers();
return new AllForTxStrategy(eventHubFactory, peers);
return new AllForTxStrategy(eventHubFactory.getEventHubs(peers));
}

/**
Expand All @@ -54,7 +54,7 @@ function NETWORK_SCOPE_ALLFORTX(eventHubFactory, network, mspId) {
//eslint-disable-next-line no-unused-vars
function NETWORK_SCOPE_ANYFORTX(eventHubFactory, network, mspId) {
const peers = network.getChannel().getPeers();
return new AnyForTxStrategy(eventHubFactory, peers);
return new AnyForTxStrategy(eventHubFactory.getEventHubs(peers));
}

module.exports = {
Expand Down
2 changes: 1 addition & 1 deletion fabric-network/lib/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

'use strict';

const logger = require('../../logger').getLogger('EventHubFactory');
const logger = require('fabric-network/lib/logger').getLogger('EventHubFactory');

/**
* Factory for obtaining event hubs for peers on a given channel.
Expand Down
29 changes: 20 additions & 9 deletions fabric-network/lib/impl/event/transactioneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

'use strict';

const logger = require('../../logger').getLogger('TransactionEventHandler');
const logger = require('fabric-network/lib/logger').getLogger('TransactionEventHandler');
const util = require('util');

/**
Expand Down Expand Up @@ -49,14 +49,12 @@ class TransactionEventHandler {
* @async
*/
async startListening() {
this._setListenTimeout();

for (const eventHub of this.eventHubs) {
logger.debug('startListening:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());

eventHub.registerTxEvent(this.transactionId,
(txId, code) => this._onEvent(eventHub, txId, code),
(err) => this._onError(eventHub, err));
if (this.eventHubs.length > 0) {
this._setListenTimeout();
this._registerTxEventListeners();
} else {
logger.debug('startListening: No event hubs');
this._txResolve();
}
}

Expand All @@ -72,6 +70,16 @@ class TransactionEventHandler {
}, this.options.commitTimeout * 1000);
}

_registerTxEventListeners() {
for (const eventHub of this.eventHubs) {
logger.debug('_registerTxEventListeners:', `registerTxEvent(${this.transactionId}) for event hub:`, eventHub.getName());

eventHub.registerTxEvent(this.transactionId,
(txId, code) => this._onEvent(eventHub, txId, code),
(err) => this._onError(eventHub, err));
}
}

_timeoutFail() {
const unrespondedEventHubs = this.eventHubs
.filter((eventHub) => !this.respondedEventHubs.has(eventHub))
Expand Down Expand Up @@ -135,13 +143,16 @@ class TransactionEventHandler {
* @throws {Error} if the transaction commit is not successful within the timeout period.
*/
async waitForEvents() {
logger.debug('waitForEvents called');
await this.notificationPromise;
}

/**
* Cancel listening for events.
*/
cancelListening() {
logger.debug('cancelListening called');

clearTimeout(this.timeoutHandler);
for (const eventHub of this.eventHubs) {
eventHub.unregisterTxEvent(this.transactionId);
Expand Down
17 changes: 10 additions & 7 deletions fabric-network/test/impl/event/defaulteventhandlermanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ describe('DefaultEventHandlerManager', () => {

describe('#constructor', () => {
it('has a default strategy if no options supplied', () => {
const handler = new DefaultEventHandlerManager(stubNetwork, 'MSP_ID', {commitTimeout: 300});
const handler = new DefaultEventHandlerManager(stubNetwork, 'MSP_ID', {});
expect(handler.options.strategy).to.equal(EventHandlerStrategies.MSPID_SCOPE_ALLFORTX);
});

it('allows a timeout option to be specified', () => {
const handler = new DefaultEventHandlerManager(stubNetwork, 'MSP_ID', {commitTimeout: 300, strategy: EventHandlerStrategies.MSPID_SCOPE_ANYFORTX});
it('allows a strategy to be specified', () => {
const options = {
strategy: EventHandlerStrategies.MSPID_SCOPE_ANYFORTX
};
const handler = new DefaultEventHandlerManager(stubNetwork, 'MSP_ID', options);
expect(handler.options.strategy).to.equal(EventHandlerStrategies.MSPID_SCOPE_ANYFORTX);
});
});
Expand Down Expand Up @@ -93,8 +96,8 @@ describe('DefaultEventHandlerManager', () => {
stubNetwork.getPeerMap.returns(mockPeerMap);
handler.channel = mockChannel;
await handler.initialize();
handler.initialized.should.equal(true);
handler.useFullBlocks.should.equal(true);
expect(handler.initialized, 'initialized').to.be.true;
expect(handler.useFullBlocks, 'useFullBlocks').to.be.true;
});
});

Expand All @@ -104,8 +107,8 @@ describe('DefaultEventHandlerManager', () => {
handler.initialized = true;
handler.availableEventHubs = [ { disconnect: () => {} }];
handler.dispose();
handler.availableEventHubs.length.should.equal(0);
handler.initialized.should.equal(false);
expect(handler.availableEventHubs).to.have.lengthOf(0);
expect(handler.initialized).to.be.false;
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('DefaultEventHandlerStrategies', () => {
stubEventHub.isconnected.returns(true);

stubEventHubFactory = sinon.createStubInstance(EventHubFactory);
stubEventHubFactory.getEventHubs.returns([stubEventHub]);
stubEventHubFactory.getEventHubs.resolves([stubEventHub]);

stubPeer = {
_stubInfo: 'peer',
Expand Down
2 changes: 1 addition & 1 deletion fabric-network/test/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const sinon = require('sinon');
const Channel = require('fabric-client').Channel;
const ChannelEventHub = require('fabric-client').ChannelEventHub;

const EventHubFactory = require('../../../lib/impl/event/eventhubfactory');
const EventHubFactory = require('fabric-network/lib/impl/event/eventhubfactory');

describe('EventHubFactory', () => {
let stubChannel;
Expand Down
Loading

0 comments on commit a63c19c

Please sign in to comment.