Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

feat: waitForStateTransitionResult endpoint #331

Merged
merged 40 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
deaf239
feat: waitForStateTransitionResult endpoint
Jan 13, 2021
74a6d3f
bugfix
Jan 13, 2021
3aaa596
test: add unit tests
Jan 14, 2021
8071597
bugfix
Jan 14, 2021
3fd9b7e
Bugfix
Jan 15, 2021
edca7eb
remove rewiremock
Jan 15, 2021
70cdd37
fix test name
antouhou Jan 19, 2021
f227a71
add waitForTransactionHash to the WS client
antouhou Jan 19, 2021
80d5650
Merge branch 'v0.18-dev' into wait-for-st
shumkov Jan 20, 2021
296d18b
add TransactionsClient.js to subscribe to tendermint transactions mor…
antouhou Jan 20, 2021
ca734b8
Merge remote-tracking branch 'origin/wait-for-st' into wait-for-st
antouhou Jan 20, 2021
c4f3ed1
move init logic from constructor to `start` method in TransactionsCli…
antouhou Jan 20, 2021
735d184
marked private methods as private in jsdoc
antouhou Jan 20, 2021
fbd27ce
add jsdoc for TransactionsClient.js start method
antouhou Jan 20, 2021
44044e1
shortened start method
antouhou Jan 20, 2021
ac42552
remove unused method from WsClient.js
antouhou Jan 20, 2021
ff04df5
add handling of the st timeout in waitForStateTransitionResultHandler…
antouhou Jan 20, 2021
2638966
fixed old waitForStateTransitionResultHandlerFactory tests
antouhou Jan 20, 2021
08f8373
fix TransactionsClient.spec.js
antouhou Jan 20, 2021
0199bf1
remove unnecessary condition
antouhou Jan 20, 2021
a3bcd59
Update scripts/api.js
antouhou Jan 20, 2021
6879af7
Update scripts/api.js
antouhou Jan 20, 2021
08a59ee
Update scripts/api.js
antouhou Jan 20, 2021
9544a49
Update test/integration/grpcServer/handlers/platform/waitForStateTran…
antouhou Jan 20, 2021
6d8f0bc
Update test/integration/grpcServer/handlers/platform/waitForStateTran…
antouhou Jan 20, 2021
2ded36a
Update test/integration/grpcServer/handlers/platform/waitForStateTran…
antouhou Jan 20, 2021
32e0ae9
Update test/integration/grpcServer/handlers/platform/waitForStateTran…
antouhou Jan 20, 2021
907548f
remove duplicated test
antouhou Jan 20, 2021
45ea796
Merge remote-tracking branch 'origin/wait-for-st' into wait-for-st
antouhou Jan 20, 2021
f89c546
add dpp/DriveStateRepository.js and renamed old class with this name …
antouhou Jan 22, 2021
ad45925
add test for waiting for blocks
antouhou Jan 22, 2021
9f9e39c
fix waitForStateTransitionResultHandlerFactory.js tests
antouhou Jan 22, 2021
a3cdb3c
add handling of checTx errors in broadcastStateTransition
antouhou Jan 28, 2021
7269007
rename TransactionsClient to BlockchainListener.js
antouhou Jan 29, 2021
9399de0
add dpp to DriveStateRepository and added a test
antouhou Jan 29, 2021
36074e2
remove unused imports from state repository
antouhou Jan 29, 2021
fb59e20
Update test/integration/externalApis/tenderdash/BlockchainListener.sp…
shumkov Jan 29, 2021
498360b
Merge branch 'v0.18-dev' into wait-for-st
antouhou Feb 1, 2021
2f4a527
update lock
antouhou Feb 1, 2021
d729dc2
feat: update dapi-grpc to 0.18.0-dev.3
Feb 2, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/externalApis/drive/DriveStateRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ class DriveStateRepository {
prove,
);
}

/**
* Fetch proofs by ids
*
* @param {Buffer[]} [documentIds]
* @param {Buffer[]} [identityIds]
* @param {Buffer[]} [dataContractIds]
* @return {Promise<{data: Buffer}>}
*/
async fetchProofs({ documentIds, identityIds, dataContractIds }) {
return this.request(
'/proofs',
{
documentIds,
identityIds,
dataContractIds,
},
);
}
}

module.exports = DriveStateRepository;
253 changes: 253 additions & 0 deletions lib/externalApis/tenderdash/WsClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
const { EventEmitter } = require('events');
const WebSocket = require('ws');

class WsClient extends EventEmitter {
constructor(options = {}) {
super();

const protocol = (options && options.protocol) ? options.protocol.toString() : 'ws';
const host = (options && options.host) ? options.host.toString() : '0.0.0.0';
const port = (options && options.port) ? options.port.toString() : '26657';
const path = (options && options.path) ? options.path.toString() : 'websocket';

this.url = `${protocol}://${host}:${port}/${path}`;
this.isConnected = false;
this.autoReconnectInterval = 1000;
this.subscribedQueries = new Map();
}

/**
* @private
* @return <void>
*/
open() {
if (this.ws) {
this.disconnect();
}

this.ws = new WebSocket(this.url);

const reconnect = () => {
if (this.connectionRetries <= this.maxRetries || this.maxRetries === -1) {
if (this.maxRetries !== -1) {
this.connectionRetries += 1;
}

setTimeout(this.open.bind(this), this.autoReconnectInterval);
} else {
this.disconnect();

const event = {
type: 'connect:max_retry_exceeded',
address: this.url,
};

this.emit(event.type, event);
}
};

const onOpenListener = () => {
this.isConnected = true;

const event = {
type: 'connect',
address: this.url,
};

this.emit(event.type, event);

for (const query of this.subscribedQueries.keys()) {
this.subscribe(query);
}
};

const onCloseListener = (e) => {
if (e.code === 1000) { // close normal
this.disconnect();

return;
}

reconnect();
};

const onErrorListener = (e) => {
switch (e.code) {
case 'ECONNREFUSED':
reconnect();
break;
default:
this.disconnect();
this.emit('error', e);
break;
}
};

const onMessageListener = (rawData) => {
const { result } = JSON.parse(rawData);

if (result !== undefined && Object.keys(result).length > 0) {
this.emit(result.query, result);
}
};

this.ws.on('open', onOpenListener);
this.ws.on('close', onCloseListener);
this.ws.on('error', onErrorListener);
this.ws.on('message', onMessageListener);
}

static getTxHashTopic(hash) {
antouhou marked this conversation as resolved.
Show resolved Hide resolved
return `txhash:${hash}`;
}

/**
*
* @param {object} connectionOptions
* @param {number} connectionOptions.maxRetries
* @return {Promise<void>}
*/
async connect(connectionOptions = {}) {
// by default, we don't set any max number of retries
this.maxRetries = connectionOptions.maxRetries || -1;
this.connectionRetries = 0;
this.subscribedQueries.clear();

return new Promise(async (resolve, reject) => {
// If a max number of retries is set, we reject when exceeding retry number
if (this.maxRetries !== -1) {
this.on('connect:max_retry_exceeded', async () => reject(new Error('Connection dropped. Max retries exceeded.')));
}

this.open();

// We only return socket when we actually established a connection
this.on('connect', () => resolve());
});
}

/**
*
* @return {boolean}
*/
close() {
if (this.ws) {
if (this.isConnected) {
this.disconnect();
}

this.ws = null;
this.subscribedQueries.clear();

return true;
}

return false;
}

disconnect() {
this.ws.removeAllListeners();
try {
this.ws.terminate();
} catch (e) {
// do nothing
}

this.isConnected = false;
}

/**
*
* @param {string} query
*/
subscribe(query) {
const id = 0;

const request = {
jsonrpc: '2.0',
method: 'subscribe',
id,
params: {
query,
},
};

this.ws.send(JSON.stringify(request));

const count = this.subscribedQueries.get(query) || 0;
this.subscribedQueries.set(query, count + 1);
}

createPromiseHandler(topic, resolve) {
const handler = (data) => {
this.off(topic, handler);
resolve(data);
};

return handler;
}

subscribeToTransactions() {
antouhou marked this conversation as resolved.
Show resolved Hide resolved
const query = 'tm.event = \'Tx\'';
this.subscribe(query);
this.on(query, (message) => {
this.emitTransactionHash(message);
});
}

emitTransactionHash(message) {
const hashString = message && message.events ? message.events['tx.hash'] : null;
if (!hashString) {
return;
}

this.emit(WsClient.getTxHashTopic(hashString), message);
}

waitForTransactionHash(hashString, timeout = 60000) {
const topic = WsClient.getTxHashTopic(hashString);
let handler;

return Promise.race([
new Promise((resolve) => {
handler = this.createPromiseHandler(topic, resolve);
this.on(topic, handler);
}),
new Promise((resolve, reject) => {
setTimeout(() => {
this.off(topic, handler);
reject(new Error(`ST waiting period for transaction ${hashString} timed out`));
}, timeout);
}),
]);
}

/**
*
* @param {string} query
*/
unsubscribe(query) {
const count = this.subscribedQueries.get(query) - 1;

if (count > 0) {
this.subscribedQueries.set(query, count);
} else {
const id = 0;

const request = {
jsonrpc: '2.0',
method: 'unsubscribe',
id,
params: {
query,
},
};

this.ws.send(JSON.stringify(request));

this.subscribedQueries.delete(query);
}
}
}

module.exports = WsClient;
29 changes: 29 additions & 0 deletions lib/grpcServer/handlers/platform/platformHandlersFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const {
GetDocumentsRequest,
GetIdentitiesByPublicKeyHashesRequest,
GetIdentityIdsByPublicKeyHashesRequest,
WaitForStateTransitionResultRequest,
pbjs: {
BroadcastStateTransitionRequest: PBJSBroadcastStateTransitionRequest,
BroadcastStateTransitionResponse: PBJSBroadcastStateTransitionResponse,
Expand All @@ -34,6 +35,8 @@ const {
GetIdentitiesByPublicKeyHashesRequest: PBJSGetIdentitiesByPublicKeyHashesRequest,
GetIdentityIdsByPublicKeyHashesResponse: PBJSGetIdentityIdsByPublicKeyHashesResponse,
GetIdentityIdsByPublicKeyHashesRequest: PBJSGetIdentityIdsByPublicKeyHashesRequest,
WaitForStateTransitionResultRequest: PBJSWaitForStateTransitionResultRequest,
WaitForStateTransitionResultResponse: PBJSWaitForStateTransitionResultResponse,
},
},
} = require('@dashevo/dapi-grpc');
Expand All @@ -60,16 +63,23 @@ const getIdentitiesByPublicKeyHashesHandlerFactory = require(
const getIdentityIdsByPublicKeyHashesHandlerFactory = require(
'./getIdentityIdsByPublicKeyHashesHandlerFactory',
);
const waitForStateTransitionResultHandlerFactory = require(
'./waitForStateTransitionResultHandlerFactory',
);

/**
* @param {jaysonClient} rpcClient
* @param {WsClient} tenderDashWsClient
* @param {DriveStateRepository} driveStateRepository
* @param {DashPlatformProtocol} dpp
* @param {boolean} isProductionEnvironment
* @returns {Object<string, function>}
*/
function platformHandlersFactory(
rpcClient,
tenderDashWsClient,
driveStateRepository,
dpp,
isProductionEnvironment,
) {
const wrapInErrorHandler = wrapInErrorHandlerFactory(log, isProductionEnvironment);
Expand Down Expand Up @@ -171,13 +181,32 @@ function platformHandlersFactory(
wrapInErrorHandler(getIdentityIdsByPublicKeyHashesHandler),
);

// waitForStateTransitionResult
const waitForStateTransitionResultHandler = waitForStateTransitionResultHandlerFactory(
driveStateRepository,
tenderDashWsClient,
dpp,
);

const wrappedWaitForStateTransitionResult = jsonToProtobufHandlerWrapper(
jsonToProtobufFactory(
WaitForStateTransitionResultRequest,
PBJSWaitForStateTransitionResultRequest,
),
protobufToJsonFactory(
PBJSWaitForStateTransitionResultResponse,
),
wrapInErrorHandler(waitForStateTransitionResultHandler),
);

return {
broadcastStateTransition: wrappedBroadcastStateTransition,
getIdentity: wrappedGetIdentity,
getDocuments: wrappedGetDocuments,
getDataContract: wrappedGetDataContract,
getIdentitiesByPublicKeyHashes: wrappedGetIdentitiesByPublicKeyHashes,
getIdentityIdsByPublicKeyHashes: wrappedGetIdentityIdsByPublicKeyHashes,
waitForStateTransitionResult: wrappedWaitForStateTransitionResult,
};
}

Expand Down
Loading