Skip to content

Commit

Permalink
refactor(@embark/utils): use timeouts to improve utils.pingEndpoint
Browse files Browse the repository at this point in the history
For reasons unknown, sockets underlying http and websocket requests made by
`utils.pingEndpoint` could become stuck and after a *long* time eventually hang
up. This can happen even when the endpoint being ping'd is perfectly
responsive, e.g. when checking it with `curl` in another terminal. The root
cause may be a bug in node.js itself or some package upon which embark
depends. The effects of the stuck sockets are experienced more often when the
CPU is under moderate-heavy load.

Use timeouts to cut short the stuck sockets. Usually, after one-to-several
retries (which may get stuck) following a stuck socket, connections behave as
expected (learned by experience). It's not enough to rely on the http/websocket
built-in timeouts, since when the CPU is busy those timeouts fail to fire
reliably (learned by experience). By overlapping the built-in timeouts with
`setTimeout`, `pingEndpoint` avoids becoming stuck; it's also important to use
`setImmediate` to defer some calls and socket cleanups (learned by trial and
error).

Note that when the CPU is moderately busy, one of the slowest steps is
launching embark's child processes, e.g. ipfs, geth, etc. However, even under
very heavy load, with the revised `pingEndpoint` embark seems to always
eventually start/detect services correctly. And under light to moderate load,
the effect of the stuck sockets is less noticeable.

Use a weboscket client from the `ws` package instead of manually building
websocket headers.

Use `pingEndpoint` for the IPFS module's `_checkService` method to avoid the
same stuck socket problem that was affecting blockchain detection.
  • Loading branch information
michaelsbradleyjr committed Dec 11, 2018
1 parent 8cbbcfe commit c9ea017
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/lib/modules/blockchain_process/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ exports.serve = async (ipc, host, port, ws, origin) => {
'http',
origin ? origin.split(',')[0] : undefined,
(err) => {
if (!err || (Date.now() - start > 10000)) {
if (!err || (Date.now() - start > 600000)) {
resolve();
} else {
utils.timer(250).then(waitOnTarget).then(resolve);
Expand Down
36 changes: 29 additions & 7 deletions src/lib/modules/ipfs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const fs = require('../../core/fs.js');
const IpfsApi = require('ipfs-api');
// TODO: not great, breaks module isolation
const StorageProcessesLauncher = require('../storage/storageProcessesLauncher');
const {canonicalHost} = require('../../utils/host');

class IPFS {

Expand Down Expand Up @@ -67,7 +68,7 @@ class IPFS {
});

self.events.request("services:register", 'IPFS', function (cb) {
self._checkService((err, body) => {
self._checkService(true, (err, body) => {
if (err) {
self.logger.trace("IPFS unavailable");
return cb({name: "IPFS ", status: 'off'});
Expand All @@ -82,21 +83,42 @@ class IPFS {
});
}

_getNodeUrl() {
_getNodeConfig() {
if (this.storageConfig.upload.provider === 'ipfs') {
return utils.buildUrlFromConfig(this.storageConfig.upload) + '/api/v0/version';
return this.storageConfig.upload;
}

for (let connection of this.storageConfig.dappConnection) {
if (connection.provider === 'ipfs') {
return utils.buildUrlFromConfig(connection) + '/api/v0/version';
return connection;
}
}
}

_checkService(cb) {
let url = this._getNodeUrl();
utils.getJson(url, cb);
_checkService(getJson, cb) {
let _cb = cb || function () {};
let _getJson = getJson;
if (typeof getJson === 'function') {
_cb = getJson;
_getJson = false;
}
const cfg = this._getNodeConfig();
utils.pingEndpoint(
canonicalHost(cfg.host),
cfg.port,
false,
cfg.protocol === 'https' ? cfg.protocol : 'http',
utils.buildUrlFromConfig(cfg),
(err) => {
if (err) {
_cb(err);
} else if (_getJson) {
utils.getJson(utils.buildUrlFromConfig(cfg) + '/api/v0/version', _cb);
} else {
_cb();
}
}
);
}

addStorageProviderToEmbarkJS() {
Expand Down
134 changes: 98 additions & 36 deletions src/lib/utils/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,45 +89,107 @@ function getJson(url, cb) {
httpGetJson(url, cb);
}

function pingEndpoint(host, port, type, protocol, origin, callback) {
const options = {
protocolVersion: 13,
perMessageDeflate: true,
origin: origin,
host: host,
port: port
};
if (type === 'ws') {
options.headers = {
'Sec-WebSocket-Version': 13,
Connection: 'Upgrade',
Upgrade: 'websocket',
'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits',
Origin: origin
};
}
let req;
function pingEndpoint(host, port, type, protocol, origin, callback, count = 0) {
// remove trailing api key from infura, ie rinkeby.infura.io/nmY8WtT4QfEwz2S7wTbl
if (options.host.indexOf('/') > -1) {
options.host = options.host.split('/')[0];
}
if (protocol === 'https') {
req = require('https').get(options);
} else {
req = require('http').get(options);
}
const _host = host.indexOf('/') > -1 ? host.split('/')[0] : host;
const max_retries = 4800;
const timeout = 1000;

req.on('error', (err) => {
callback(err);
});
let alreadyClosed = false;
let retrying = false;

req.on('response', (_response) => {
callback();
});
const doCallback = (v) => {
if (typeof v !== 'undefined') {
callback._doCallback = !!v;
}
let _doCallback = callback._doCallback;
if (typeof _doCallback === 'undefined') {
_doCallback = true;
} else {
_doCallback = !!_doCallback;
}
return _doCallback;
};

req.on('upgrade', (_res, _socket, _head) => {
callback();
});
const cleanup = (req, closeMethod) => {
if (!alreadyClosed) {
alreadyClosed = true;
setImmediate(() => { req[closeMethod](); });
}
};

const maybeRetry = (req, closeMethod, cond, ...args) => {
cleanup(req, closeMethod);
if (doCallback()) {
if (cond) {
retrying = true;
setImmediate(() => {
pingEndpoint(host, port, type, protocol, origin, callback, ++count);
});
} else {
doCallback(false);
callback(...args);
}
}
};

const handleError = (req, closeMethod) => {
req.on('error', (err) => {
if (!retrying) {
maybeRetry(
req,
closeMethod,
(/timed out/).test(err.message) && count < max_retries,
err
);
} else {
cleanup(req, closeMethod);
}
});
};

const handleSuccess = (req, closeMethod, event) => {
req.once(event, () => { maybeRetry(req, closeMethod, false); });
};

const setupTimer = (req, closeMethod) => {
setTimeout(() => {
if (!retrying) {
maybeRetry(
req,
closeMethod,
count < max_retries,
new Error('timed out')
);
}
}, timeout * 2);
};

const handleRequest = (req, closeMethod, event) => {
handleError(req, closeMethod);
handleSuccess(req, closeMethod, event);
setupTimer(req, closeMethod);
};

if (type === 'ws') {
const req = new (require('ws'))(
`${protocol === 'https' ? 'wss' : 'ws'}://${_host}:${port}/`,
{handshakeTimeout: timeout, origin}
);
handleRequest(req, 'close', 'open');
} else {
const req = (protocol === 'https' ? require('https') : require('http')).get(
{host: _host, origin, port, timeout}
);
handleRequest(req, 'abort', 'response');
req.once('socket', (sock) => {
sock.once('connect', () => {
req.once('timeout', () => {
req.emit('error', new Error('timed out'));
});
});
});
}
}

function runCmd(cmd, options, callback) {
Expand Down Expand Up @@ -554,7 +616,7 @@ function isNotFolder(node){
}

function byName(a, b) {
return a.name.localeCompare(b.name);
return a.name.localeCompare(b.name);
}

function fileTreeSort(nodes){
Expand Down

0 comments on commit c9ea017

Please sign in to comment.