Skip to content

Commit

Permalink
fix(dapi): getStatus cache invalidation (#2155)
Browse files Browse the repository at this point in the history
  • Loading branch information
shumkov committed Sep 27, 2024
1 parent 69a350b commit c3cdb65
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 23 deletions.
36 changes: 25 additions & 11 deletions packages/dapi/lib/externalApis/tenderdash/BlockchainListener.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ class BlockchainListener extends EventEmitter {
*/
constructor(tenderdashWsClient) {
super();

this.wsClient = tenderdashWsClient;

this.processLogger = logger.child({
process: 'BlockchainListener',
});
}

/**
Expand All @@ -30,14 +35,7 @@ class BlockchainListener extends EventEmitter {
* Subscribe to blocks and transaction results
*/
start() {
const processLogger = logger.child({
process: 'BlockchainListener',
});

processLogger.info('Subscribed to state transition results');

// Emit transaction results
this.wsClient.subscribe(TX_QUERY);
this.wsClient.on(TX_QUERY, (message) => {
const [hashString] = (message.events || []).map((event) => {
const hashAttribute = event.attributes.find((attribute) => attribute.key === 'hash');
Expand All @@ -53,15 +51,31 @@ class BlockchainListener extends EventEmitter {
return;
}

processLogger.trace(`received transaction result for ${hashString}`);
this.processLogger.trace(`Received transaction result for ${hashString}`);

this.emit(BlockchainListener.getTransactionEventName(hashString), message);
});

// TODO: It's not using
// Emit blocks and contained transactions
// this.wsClient.subscribe(NEW_BLOCK_QUERY);
// this.wsClient.on(NEW_BLOCK_QUERY, (message) => this.emit(EVENTS.NEW_BLOCK, message));
this.wsClient.on(NEW_BLOCK_QUERY, (message) => {
this.processLogger.trace('Received new platform block');

this.emit(EVENTS.NEW_BLOCK, message);
});

this.wsClient.on('connect', () => {
this.#subscribe();
});

if (this.wsClient.isConnected) {
this.#subscribe();
}
}

#subscribe() {
this.wsClient.subscribe(TX_QUERY);
this.wsClient.subscribe(NEW_BLOCK_QUERY);
this.processLogger.debug('Subscribed to platform blockchain events');
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
} = require('@dashevo/dapi-grpc');

const BlockchainListener = require('../../../externalApis/tenderdash/BlockchainListener');
const logger = require('../../../logger');

/**
* @param {BlockchainListener} blockchainListener
Expand All @@ -17,12 +18,23 @@ const BlockchainListener = require('../../../externalApis/tenderdash/BlockchainL
* @return {getStatusHandler}
*/
function getStatusHandlerFactory(blockchainListener, driveClient, tenderdashRpcClient) {
// Clean cache when new platform block committed
let cachedResponse = null;
let cleanCacheTimeout = null;

blockchainListener.on(BlockchainListener.EVENTS.NEW_BLOCK, () => {
function cleanCache() {
cachedResponse = null;
});

// cancel scheduled cache cleanup
if (cleanCacheTimeout !== null) {
clearTimeout(cleanCacheTimeout);
cleanCacheTimeout = null;
}

logger.trace({ endpoint: 'getStatus' }, 'cleanup cache');
}

// Clean cache when new platform block committed
blockchainListener.on(BlockchainListener.EVENTS.NEW_BLOCK, cleanCache);

// DAPI Software version
const packageJsonPath = path.resolve(__dirname, '..', '..', '..', '..', 'package.json');
Expand Down Expand Up @@ -210,6 +222,15 @@ function getStatusHandlerFactory(blockchainListener, driveClient, tenderdashRpcC
cachedResponse = new GetStatusResponse();
cachedResponse.setV0(v0);

// Cancel any existing scheduled cache cleanup
if (cleanCacheTimeout !== null) {
clearTimeout(cleanCacheTimeout);
cleanCacheTimeout = null;
}

// Clean cache in 3 minutes
cleanCacheTimeout = setTimeout(cleanCache, 3 * 60 * 1000);

return cachedResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ describe('BlockchainListener', () => {
({ sinon } = this);
wsClientMock = new EventEmitter();
wsClientMock.subscribe = sinon.stub();

blockchainListener = new BlockchainListener(wsClientMock);
blockchainListener.start();

sinon.spy(blockchainListener, 'on');
sinon.spy(blockchainListener, 'off');
Expand Down Expand Up @@ -84,19 +84,23 @@ describe('BlockchainListener', () => {
});

describe('#start', () => {
it('should subscribe to transaction events from WS client', () => {
// TODO: We don't use it for now
// expect(wsClientMock.subscribe).to.be.calledTwice();
expect(wsClientMock.subscribe).to.be.calledOnce();
it('should subscribe to transaction events from WS client if it is connected', () => {
wsClientMock.isConnected = true;

blockchainListener.start();

expect(wsClientMock.subscribe).to.be.calledTwice();
expect(wsClientMock.subscribe.firstCall).to.be.calledWithExactly(
BlockchainListener.TX_QUERY,
);
// expect(wsClientMock.subscribe.secondCall).to.be.calledWithExactly(
// BlockchainListener.NEW_BLOCK_QUERY,
// );
expect(wsClientMock.subscribe.secondCall).to.be.calledWithExactly(
BlockchainListener.NEW_BLOCK_QUERY,
);
});

it.skip('should emit block when new block is arrived', (done) => {
it('should emit block when new block is arrived', (done) => {
blockchainListener.start();

blockchainListener.on(BlockchainListener.EVENTS.NEW_BLOCK, (message) => {
expect(message).to.be.deep.equal(blockMessageMock);

Expand All @@ -107,6 +111,8 @@ describe('BlockchainListener', () => {
});

it('should emit transaction when transaction is arrived', (done) => {
blockchainListener.start();

const topic = BlockchainListener.getTransactionEventName(transactionHash);

blockchainListener.on(topic, (message) => {
Expand Down

0 comments on commit c3cdb65

Please sign in to comment.