Skip to content
This repository has been archived by the owner on Jan 12, 2022. It is now read-only.

fix(TransactionSyncWorker): improve stream response handling #323

Merged
merged 22 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
892ffb1
feat(TransactionSyncWorker): moved handler to specific file
Alex-Werner Sep 28, 2021
beffba2
feat: simple event based queue processor
Alex-Werner Sep 29, 2021
b1b718b
feat: add process chunks method
Alex-Werner Sep 29, 2021
574f313
feat: correctly order the processing of data chunks
Alex-Werner Sep 29, 2021
6a1c1f3
Merge branch 'v0.21-dev' into fix/utxo-inclusion
Alex-Werner Sep 29, 2021
92234ea
style: fix lint
Alex-Werner Sep 29, 2021
1ab6e4d
Merge remote-tracking branch 'origin/fix/utxo-inclusion' into fix/utx…
Alex-Werner Sep 29, 2021
d16dc50
Revert "revert: retry policy of unconfirmed transaction from stream (…
Alex-Werner Sep 29, 2021
a0c6f34
test: update new behaviour that waits for block to be received
Alex-Werner Sep 29, 2021
6e05b97
test: add test for ensureAddressGap
Alex-Werner Sep 29, 2021
e02a685
fix: correctly generate bip44 separately by their type
Alex-Werner Oct 4, 2021
d9d29f2
style: remove unused WALLET_TYPES
Alex-Werner Oct 4, 2021
e8bd92a
fix: correctly set as uncreated address on empty address set
Alex-Werner Oct 4, 2021
41c4942
test: removed early return
Alex-Werner Oct 4, 2021
a178be7
impr(bip44): factorize ensureAddressesToGapLimit
Alex-Werner Oct 4, 2021
6e40c08
feat: immediately add transaction and update later
Alex-Werner Oct 4, 2021
174a891
feat: allow transaction history on unconfirmed transaction
Alex-Werner Oct 4, 2021
6724f99
feat: handler for propagation issue
Alex-Werner Oct 4, 2021
e0e133b
Merge remote-tracking branch 'origin/v0.21-dev' into fix/utxo-inclusion
Alex-Werner Oct 4, 2021
126b1bb
feat: transmit error from job to queue to reject from plugin
Alex-Werner Oct 4, 2021
096f0c0
fix: forgot catching in processOnProcessedEvent
Alex-Werner Oct 4, 2021
12c3d01
fix: calling `cancel` on null stream
jawid-h Oct 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/events/blockheight_changed.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
Example:
```js
const {EVENTS} = require('@dashevo/wallet-lib');
const onReady = ()=>{
console.log("Blockheight changed to");
}
account.events.on(EVENTS.BLOCKHEIGHT_CHANGED, onReady);

account.events.on(EVENTS.BLOCKHEIGHT_CHANGED, ({payload: blockHeight})=>{
console.log(`Blockheight changed to ${blockHeight}`);
});
```

33 changes: 33 additions & 0 deletions docs/plugins/writing-a-new-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,36 @@ Due to the risk from running a plugin that have access to your keychain, these a
One would need to initialize a Wallet with the option `allowSensitiveOperations` set to `true`.

You can see the list of thoses [sensitive functions and properties](https://github.com/dashevo/wallet-lib/blob/master/src/CONSTANTS.js#L67), anything under `UNSAFE_*` will require this option to be set to true in order to be use from within a plugin.


## Accessing events

From a plugin, you have the ability to listen to account's emitted events.

```js
const { EVENT, plugins: { Worker } } = require('@dashevo/wallet-lib');
class NewBlockWorker extends Worker {
constructor(options) {
super({
name: 'NewBlockWorker',
executeOnStart: true,
firstExecutionRequired: true,
workerIntervalTime: 60 * 1000,
gapLimit: 10,
dependencies: [
'storage',
'transport',
'walletId',
'identities',
],
...options,
});
}

async onStart() {
this.parentEvents.on(EVENT.BLOCKHEIGHT_CHANGED, ({payload: blockHeight}) => {
// on new blockheight do something.
});
}
}
```
12 changes: 9 additions & 3 deletions src/plugins/Plugins/ChainPlugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@ class ChainPlugin extends StandardPlugin {
this.parentEvents.emit(EVENTS.BLOCK, { type: EVENTS.BLOCK, payload: block });
// We do not announce BLOCKHEADER as this is done by Storage
await self.storage.importBlockHeader(block.header);
});
self.transport.on(EVENTS.BLOCKHEIGHT_CHANGED, async (ev) => {
const { payload: blockheight } = ev;

this.parentEvents.emit(EVENTS.BLOCKHEIGHT_CHANGED, {
type: EVENTS.BLOCKHEIGHT_CHANGED, payload: blockheight,
});

const blockHeight = await self.transport.getBestBlockHeight();
this.storage.store.chains[network.toString()].blockHeight = blockHeight;
logger.debug(`ChainPlugin - setting chain blockheight ${blockHeight}`);
this.storage.store.chains[network.toString()].blockHeight = blockheight;
logger.debug(`ChainPlugin - setting chain blockheight ${blockheight}`);
});
await self.transport.subscribeToBlocks();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class TransactionSyncStreamWorker extends Worker {
this.stream = null;
this.incomingSyncPromise = null;
this.pendingRequest = {};
this.delayedRequests = {};
}

/**
Expand Down Expand Up @@ -217,6 +218,8 @@ TransactionSyncStreamWorker.prototype.getBestBlockHeightFromTransport = require(
TransactionSyncStreamWorker.prototype.setLastSyncedBlockHeight = require('./methods/setLastSyncedBlockHeight');
TransactionSyncStreamWorker.prototype.getLastSyncedBlockHeight = require('./methods/getLastSyncedBlockHeight');
TransactionSyncStreamWorker.prototype.startHistoricalSync = require('./methods/startHistoricalSync');
TransactionSyncStreamWorker.prototype.handleTransactionFromStream = require('./methods/handleTransactionFromStream');
TransactionSyncStreamWorker.prototype.processChunks = require('./methods/processChunks');
TransactionSyncStreamWorker.prototype.startIncomingSync = require('./methods/startIncomingSync');
TransactionSyncStreamWorker.prototype.syncUpToTheGapLimit = require('./methods/syncUpToTheGapLimit');

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/* eslint-disable no-param-reassign */
const logger = require('../../../../logger');
const Job = require('../../../../utils/Queue/Job');

function onStreamData(self, data) {
logger.silly('TransactionSyncStreamWorker - received chunks waiting for processing');
self.chunksQueue.enqueueJob(new Job(null, () => self.processChunks(data)));
}

module.exports = onStreamData;
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* eslint-disable no-param-reassign */
const logger = require('../../../../logger');
const sleep = require('../../../../utils/sleep');

function onStreamEnd(workerInstance, resolve) {
const endStream = () => {
logger.silly('TransactionSyncStreamWorker - end stream on request');
workerInstance.stream = null;
resolve(workerInstance.hasReachedGapLimit);
};

const tryEndStream = async () => {
if (Object.keys(workerInstance.pendingRequest).length !== 0) {
await sleep(200);
return tryEndStream();
}
return endStream();
};

tryEndStream();
}
module.exports = onStreamEnd;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const logger = require('../../../../logger');

function onStreamError(error, reject) {
logger.silly('TransactionSyncStreamWorker - end stream on error');
reject(error);
}
module.exports = onStreamError;
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
const logger = require('../../../../logger');
const EVENTS = require('../../../../EVENTS');

async function handleTransactionFromStream(transaction) {
const self = this;
// As we require height information, we fetch transaction using client.
// eslint-disable-next-line no-restricted-syntax
// eslint-disable-next-line no-underscore-dangle
const transactionHash = transaction.hash;

this.pendingRequest[transactionHash] = { isProcessing: true, type: 'transaction' };
// eslint-disable-next-line no-await-in-loop
const getTransactionResponse = await this.transport.getTransaction(transactionHash);

if (!getTransactionResponse) {
// This can happen due to propagation when one node inform us about a transaction,
// but the node we ask the transaction to is not aware of it.
logger.silly(`TransactionSyncStreamWorker - Transaction ${transactionHash} was not found`);
return new Promise((resolve) => {
setTimeout(() => {
resolve(self.handleTransactionFromStream(transaction));
}, 1000);
});
}

if (!getTransactionResponse.blockHash) {
// at this point, transaction is not yet mined, therefore we gonna retry on next block to
// fetch this tx and subsequently its blockhash for blockheader fetching.
logger.silly(`TransactionSyncStreamWorker - Unconfirmed transaction ${transactionHash}: delayed.`);
this.delayedRequests[transactionHash] = { isDelayed: true, type: 'transaction' };

return new Promise((resolve) => {
self.parentEvents.once(
EVENTS.BLOCKHEIGHT_CHANGED,
() => {
resolve(self.handleTransactionFromStream(transaction));
},
);
});
}

const executor = async () => {
if (self.delayedRequests[transactionHash]) {
logger.silly(`TransactionSyncStreamWorker - Processing previously delayed transaction ${transactionHash} from stream`);
delete self.delayedRequests[transactionHash];
} else {
logger.silly(`TransactionSyncStreamWorker - Processing transaction ${transactionHash} from stream`);
}

this.pendingRequest[getTransactionResponse.blockHash.toString('hex')] = { isProcessing: true, type: 'blockheader' };
// eslint-disable-next-line no-await-in-loop
const getBlockHeaderResponse = await this
.transport
.getBlockHeaderByHash(getTransactionResponse.blockHash);
// eslint-disable-next-line no-await-in-loop
await this.importBlockHeader(getBlockHeaderResponse);
delete this.pendingRequest[getTransactionResponse.blockHash.toString('hex')];
};

await executor();

const metadata = {
blockHash: getTransactionResponse.blockHash,
height: getTransactionResponse.height,
instantLocked: getTransactionResponse.instantLocked,
chainLocked: getTransactionResponse.chainLocked,
};

delete this.pendingRequest[transactionHash];

return {
transaction,
transactionHash,
metadata,
transactionResponse: getTransactionResponse,
};
}

module.exports = handleTransactionFromStream;
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/* eslint-disable no-param-reassign */
const logger = require('../../../../logger');

function isAnyIntersection(arrayA, arrayB) {
const intersection = arrayA.filter((e) => arrayB.indexOf(e) > -1);
return intersection.length > 0;
}

async function processChunks(dataChunk) {
const self = this;
const addresses = this.getAddressesToSync();
const { network } = this;
/* First check if any instant locks appeared */
const instantLocksReceived = this.constructor.getInstantSendLocksFromResponse(dataChunk);
instantLocksReceived.forEach((isLock) => {
this.importInstantLock(isLock);
});

/* Incoming transactions handling */
const transactionsFromResponse = this.constructor
.getTransactionListFromStreamResponse(dataChunk);

const walletTransactions = this.constructor
.filterWalletTransactions(transactionsFromResponse, addresses, network);

if (walletTransactions.transactions.length) {
// When a transaction exist, there is multiple things we need to do :
// 1) The transaction itself needs to be imported
const addressesGeneratedCount = await self
.importTransactions(walletTransactions.transactions);

// 2) Transaction metadata need to be fetched and imported as well.
// as such event might happen in the future
// As we require height information, we fetch transaction using client

const awaitingMetadataPromises = walletTransactions.transactions
.map((transaction) => self.handleTransactionFromStream(transaction)
.then(({
transactionResponse,
metadata,
}) => [transactionResponse.transaction, metadata]));

Promise
.all(awaitingMetadataPromises)
.then(async (transactionsWithMetadata) => {
await self.importTransactions(transactionsWithMetadata);
});

self.hasReachedGapLimit = self.hasReachedGapLimit || addressesGeneratedCount > 0;

if (self.hasReachedGapLimit) {
logger.silly('TransactionSyncStreamWorker - end stream - new addresses generated');
// If there are some new addresses being imported
// to the storage, that mean that we hit the gap limit
// and we need to update the bloom filter with new addresses,
// i.e. we need to open another stream with a bloom filter
// that contains new addresses.

// DO not setting null this.stream allow to know we
// need to reset our stream (as we pass along the error)
// Wrapping `cancel` in `setImmediate` due to bug with double-free
// explained here (https://github.com/grpc/grpc-node/issues/1652)
// and here (https://github.com/nodejs/node/issues/38964)
await new Promise((resolveCancel) => setImmediate(() => {
self.stream.cancel();
resolveCancel();
}));
}
}

/* Incoming Merkle block handling */
const merkleBlockFromResponse = this.constructor
.getMerkleBlockFromStreamResponse(dataChunk);

if (merkleBlockFromResponse) {
// Reverse hashes, as they're little endian in the header
const transactionsInHeader = merkleBlockFromResponse.hashes.map((hashHex) => Buffer.from(hashHex, 'hex').reverse().toString('hex'));
const transactionsInWallet = Object.keys(self.storage.getStore().transactions);
const isTruePositive = isAnyIntersection(transactionsInHeader, transactionsInWallet);
if (isTruePositive) {
self.importBlockHeader(merkleBlockFromResponse.header);
}
}
}

module.exports = processChunks;
Loading