Skip to content

Commit

Permalink
adding subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex committed May 18, 2021
1 parent cea0439 commit 624bf31
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions packages/web3-core-subscriptions/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ var errors = require('web3-core-helpers').errors;
var EventEmitter = require('eventemitter3');
var formatters = require('web3-core-helpers').formatters;

function identity(value) {
return value;
}

function Subscription(options) {
EventEmitter.call(this);

this.id = null;
this.callback = _.identity;
this.callback = identity;
this.arguments = null;
this.lastBlock = null; // "from" block tracker for backfilling events on reconnection

Expand All @@ -56,7 +60,7 @@ Subscription.prototype.constructor = Subscription;
*/

Subscription.prototype._extractCallback = function (args) {
if (_.isFunction(args[args.length - 1])) {
if (typeof args[args.length - 1] === 'function') {
return args.pop(); // modify the args array!
}
};
Expand Down Expand Up @@ -136,7 +140,7 @@ Subscription.prototype._formatOutput = function (result) {
*/
Subscription.prototype._toPayload = function (args) {
var params = [];
this.callback = this._extractCallback(args) || _.identity;
this.callback = this._extractCallback(args) || identity;

if (!this.subscriptionMethod) {
this.subscriptionMethod = args.shift();
Expand Down Expand Up @@ -226,7 +230,7 @@ Subscription.prototype.subscribe = function() {

// Re-subscription only: continue fetching from the last block we received.
// a dropped connection may have resulted in gaps in the logs...
if (this.lastBlock && _.isObject(this.options.params)){
if (this.lastBlock && typeof this.options.params === 'object' && !!null ){
payload.params[1] = this.options.params;
payload.params[1].fromBlock = formatters.inputBlockNumberFormatter(this.lastBlock + 1);
}
Expand All @@ -240,7 +244,7 @@ Subscription.prototype.subscribe = function() {
this.options.params = payload.params[1];

// get past logs, if fromBlock is available
if(payload.params[0] === 'logs' && _.isObject(payload.params[1]) && payload.params[1].hasOwnProperty('fromBlock') && isFinite(payload.params[1].fromBlock)) {
if(payload.params[0] === 'logs' && typeof payload.params[1] === 'object' && !!payload.params[1] && payload.params[1].hasOwnProperty('fromBlock') && isFinite(payload.params[1].fromBlock)) {
// send the subscription request

// copy the params to avoid race-condition with deletion below this block
Expand Down Expand Up @@ -283,17 +287,17 @@ Subscription.prototype.subscribe = function() {
// call callback on notifications
_this.options.requestManager.addSubscription(_this, function(error, result) {
if (!error) {
if (!_.isArray(result)) {
if (!Array.isArray(result)) {
result = [result];
}

result.forEach(function(resultItem) {
var output = _this._formatOutput(resultItem);

// Track current block (for gaps introduced by dropped connections)
_this.lastBlock = _.isObject(output) ? output.blockNumber : null;
_this.lastBlock = typeof output === 'object' ? output.blockNumber : null;

if (_.isFunction(_this.options.subscription.subscriptionHandler)) {
if (typeof _this.options.subscription.subscriptionHandler === 'function' ) {
return _this.options.subscription.subscriptionHandler.call(_this, output);
} else {
_this.emit('data', output);
Expand Down

0 comments on commit 624bf31

Please sign in to comment.