Skip to content
This repository has been archived by the owner on Jan 12, 2022. It is now read-only.

Commit

Permalink
fix: grpc-web doesn't throws cancel error (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Shuplenkov authored Oct 13, 2021
1 parent 504cb2e commit 3de71b1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
const {
Transaction, MerkleBlock, InstantLock,
} = require('@dashevo/dashcore-lib');
const GrpcError = require('@dashevo/grpc-common/lib/server/error/GrpcError');
const GrpcErrorCodes = require('@dashevo/grpc-common/lib/server/error/GrpcErrorCodes');
const { WALLET_TYPES } = require('../../../CONSTANTS');
const sleep = require('../../../utils/sleep');

const Worker = require('../../Worker');
const isBrowser = require('../../../utils/isBrowser');

class TransactionSyncStreamWorker extends Worker {
constructor(options) {
Expand Down Expand Up @@ -167,6 +170,29 @@ class TransactionSyncStreamWorker extends Worker {
}
this.syncIncomingTransactions = false;

if (isBrowser()) {
// Under browser environment, grpc-web doesn't call error and end events
// so we call it by ourselves
if (this.stream) {
return new Promise((resolve) => setImmediate(() => {
if (this.stream) {
this.stream.cancel();

const error = new GrpcError(GrpcErrorCodes.CANCELLED, 'Cancelled on client');
// call onError events
this.stream.f.forEach((func) => func(error));

// call onEnd events
this.stream.c.forEach((func) => func());

this.stream = null;
}

resolve(true);
}));
}
}

// Wrapping `cancel` in `setImmediate` due to bug with double-free
// explained here (https://github.com/grpc/grpc-node/issues/1652)
// and here (https://github.com/nodejs/node/issues/38964)
Expand All @@ -175,9 +201,9 @@ class TransactionSyncStreamWorker extends Worker {
this.stream.cancel();
// When calling stream.cancel(), the stream will emit 'error' event
// with the code 'CANCELLED'.
// There are two cases when this happens: when the gap limit is filled and syncToTheGapLimit
// and the stream needs to be restarted with new parameters, and here,
// when stopping the worker.
// There are two cases when this happens: when the gap limit is filled
// and syncToTheGapLimit and the stream needs to be restarted with new parameters,
// and here, when stopping the worker.
// The code in stream worker distinguishes whether it need to reconnect or not by the fact
// that the old stream object is present or not. When it is set to null, it won't try to
// reconnect to the stream.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
/* eslint-disable no-param-reassign */
const GrpcError = require('@dashevo/grpc-common/lib/server/error/GrpcError');
const GrpcErrorCodes = require('@dashevo/grpc-common/lib/server/error/GrpcErrorCodes');
const logger = require('../../../../logger');
const isBrowser = require('../../../../utils/isBrowser');

function isAnyIntersection(arrayA, arrayB) {
const intersection = arrayA.filter((e) => arrayB.indexOf(e) > -1);
Expand Down Expand Up @@ -50,21 +53,38 @@ async function processChunks(dataChunk) {

if (self.hasReachedGapLimit && self.stream) {
logger.silly('TransactionSyncStreamWorker - end stream - new addresses generated');
// If there are some new addresses being imported
// to the storage, that mean that we hit the gap limit
// and we need to update the bloom filter with new addresses,
// i.e. we need to open another stream with a bloom filter
// that contains new addresses.

// DO not setting null this.stream allow to know we
// need to reset our stream (as we pass along the error)
// Wrapping `cancel` in `setImmediate` due to bug with double-free
// explained here (https://github.com/grpc/grpc-node/issues/1652)
// and here (https://github.com/nodejs/node/issues/38964)
await new Promise((resolveCancel) => setImmediate(() => {
self.stream.cancel();
resolveCancel();
}));

if (isBrowser()) {
// Under browser environment, grpc-web doesn't call error and end events
// so we call it by ourselves
await new Promise((resolveCancel) => setImmediate(() => {
self.stream.cancel();
const error = new GrpcError(GrpcErrorCodes.CANCELLED, 'Cancelled on client');

// call onError events
self.stream.f.forEach((func) => func(error));

// call onEnd events
self.stream.c.forEach((func) => func());
resolveCancel();
}));
} else {
// If there are some new addresses being imported
// to the storage, that mean that we hit the gap limit
// and we need to update the bloom filter with new addresses,
// i.e. we need to open another stream with a bloom filter
// that contains new addresses.

// DO not setting null this.stream allow to know we
// need to reset our stream (as we pass along the error)
// Wrapping `cancel` in `setImmediate` due to bug with double-free
// explained here (https://github.com/grpc/grpc-node/issues/1652)
// and here (https://github.com/nodejs/node/issues/38964)
await new Promise((resolveCancel) => setImmediate(() => {
self.stream.cancel();
resolveCancel();
}));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module.exports = async function syncUpToTheGapLimit({
self.network = network;
self.hasReachedGapLimit = false;
// The order is important, however, some async calls are being performed
// in order to additionnaly fetch metadata for each valid tx chunks.
// in order to additionally fetch metadata for each valid tx chunks.
// We therefore need to temporarily store chunks for handling.
self.chunksQueue = new Queue();

Expand Down
9 changes: 9 additions & 0 deletions src/test/mocks/TxStreamMock.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
const EventEmitter = require('events');

class TxStreamMock extends EventEmitter {
constructor() {
super();

// onError minified events list
this.f = [];
// onEnd minified events list
this.c = [];
}

cancel() {
const err = new Error();
err.code = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const { BLOOM_FALSE_POSITIVE_RATE } = require('../../../CONSTANTS');
* @return {Promise<void>}
*/

module.exports = async function subscribeToTransactionWithProofs(
module.exports = async function subscribeToTransactionsWithProofs(
addressList,
opts = { fromBlockHeight: 1, count: 0 },
) {
Expand Down
4 changes: 4 additions & 0 deletions src/utils/isBrowser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// eslint-disable-next-line no-new-func
const isBrowser = new Function('try {return this===window;}catch(e){ return false;}');

module.exports = isBrowser;

0 comments on commit 3de71b1

Please sign in to comment.