From b00d6cd39428be1d2ba5ee20141a9e4be0ff4da7 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 9 Apr 2019 19:26:30 +0200 Subject: [PATCH 1/8] feat: add priority for cold calls --- src/dialer/index.js | 23 +++++++++++++++++++---- src/dialer/queue.js | 4 +++- src/dialer/queueManager.js | 34 ++++++++++++++++++++++++---------- test/dialer.spec.js | 8 ++++---- 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/dialer/index.js b/src/dialer/index.js index 00675a5..69fc84d 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -19,7 +19,7 @@ module.exports = function (_switch) { * @param {DialRequest} dialRequest * @returns {void} */ - function _dial ({ peerInfo, protocol, useFSM, callback }) { + function _dial ({ peerInfo, protocol, options, callback }) { if (typeof protocol === 'function') { callback = protocol protocol = null @@ -32,7 +32,7 @@ module.exports = function (_switch) { } // Add it to the queue, it will automatically get executed - dialQueueManager.add({ peerInfo, protocol, useFSM, callback }) + dialQueueManager.add({ peerInfo, protocol, options, callback }) } /** @@ -64,6 +64,20 @@ module.exports = function (_switch) { dialQueueManager.clearBlacklist(peerInfo) } + /** + * Attempts to establish a connection to the given `peerInfo` at + * a lower priority than a standard dial. + * @param {PeerInfo} peerInfo + * @param {object} options + * @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false. + * @param {number} options.priority Lowest priority goes first. Defaults to 1. + * @param {function(Error, Connection)} callback + */ + function connect (peerInfo, options, callback) { + options = { useFSM: false, priority: 1, ...options } + _dial({ peerInfo, protocol: null, options, callback }) + } + /** * Adds the dial request to the queue for the given `peerInfo` * @param {PeerInfo} peerInfo @@ -71,7 +85,7 @@ module.exports = function (_switch) { * @param {function(Error, Connection)} callback */ function dial (peerInfo, protocol, callback) { - _dial({ peerInfo, protocol, useFSM: false, callback }) + _dial({ peerInfo, protocol, options: { useFSM: false, priority: 0 }, callback }) } /** @@ -82,10 +96,11 @@ module.exports = function (_switch) { * @param {function(Error, ConnectionFSM)} callback */ function dialFSM (peerInfo, protocol, callback) { - _dial({ peerInfo, protocol, useFSM: true, callback }) + _dial({ peerInfo, protocol, options: { useFSM: true, priority: 0 }, callback }) } return { + connect, dial, dialFSM, clearBlacklist, diff --git a/src/dialer/queue.js b/src/dialer/queue.js index 6df9557..c4e0f87 100644 --- a/src/dialer/queue.js +++ b/src/dialer/queue.js @@ -13,7 +13,9 @@ log.error = debug('libp2p:switch:dial:error') * @typedef {Object} DialRequest * @property {PeerInfo} peerInfo - The peer to dial to * @property {string} [protocol] - The protocol to create a stream for - * @property {boolean} useFSM - If `callback` should return a ConnectionFSM + * @property {object} options + * @property {boolean} options.useFSM - If `callback` should return a ConnectionFSM + * @property {number} options.priority - The priority of the dial * @property {function(Error, Connection|ConnectionFSM)} callback */ diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index 6199e0c..fd10b67 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -21,6 +21,15 @@ class DialQueueManager { this._dialingQueues = new Set() this._queues = {} this.switch = _switch + + setInterval(() => { + console.log('Cold queue has %s items', this._coldCallQueue.size) + console.log('Normal queue has %s items', this._queue.size) + console.log('Currently dialing to %s queues', this._dialingQueues.size) + console.log('%s items in the queue', Object.keys(this._dialingQueues.size).length) + console.log('%s connections', this.switch.connection.getAll().length) + console.log('%s known peers', Object.keys(this.switch._peerBook.getAll()).length) + }, 5e3) this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR) this.start() } @@ -103,17 +112,25 @@ class DialQueueManager { * @param {DialRequest} dialRequest * @returns {void} */ - add ({ peerInfo, protocol, useFSM, callback }) { + add ({ peerInfo, protocol, options, callback }) { callback = callback ? once(callback) : noop // Add the dial to its respective queue const targetQueue = this.getQueue(peerInfo) - // If we have too many cold calls, abort the dial immediately - if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) { - return nextTick(callback, DIAL_ABORTED()) + + // Cold Call + if (options.priority > 0) { + // If we have too many cold calls, abort the dial immediately + if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) { + return nextTick(callback, DIAL_ABORTED()) + } + + if (this._queue.has(targetQueue.id)) { + return nextTick(callback, DIAL_ABORTED()) + } } - targetQueue.add(protocol, useFSM, callback) + targetQueue.add(protocol, options.useFSM, callback) // If we're already connected to the peer, start the queue now // While it might cause queues to go over the max parallel amount, @@ -130,15 +147,12 @@ class DialQueueManager { // Add the id to its respective queue set if the queue isn't running if (!targetQueue.isRunning) { - if (protocol) { + if (options.priority === 0) { this._queue.add(targetQueue.id) this._coldCallQueue.delete(targetQueue.id) // Only add it to the cold queue if it's not in the normal queue - } else if (!this._queue.has(targetQueue.id)) { - this._coldCallQueue.add(targetQueue.id) - // The peer is already in the normal queue, abort the cold call } else { - return nextTick(callback, DIAL_ABORTED()) + this._coldCallQueue.add(targetQueue.id) } } diff --git a/test/dialer.spec.js b/test/dialer.spec.js index 277af90..3459f0f 100644 --- a/test/dialer.spec.js +++ b/test/dialer.spec.js @@ -58,7 +58,7 @@ describe('dialer', () => { id: { toB58String: () => 'QmA' } }, protocol: null, - useFSM: true, + options: { useFSM: true, priority: 1 }, callback: (err) => { expect(err.code).to.eql('DIAL_ABORTED') done() @@ -75,7 +75,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: '/echo/1.0.0', - useFSM: true, + options: { useFSM: true, priority: 0 }, callback: () => {} } @@ -99,7 +99,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: null, - useFSM: true, + options: { useFSM: true, priority: 1 }, callback: () => {} } @@ -120,7 +120,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: null, - useFSM: true, + options: { useFSM: true, priority: 1 }, callback: (err) => { expect(runSpy.called).to.eql(false) expect(hasSpy.called).to.eql(true) From 42cb5b6292dd2297f169e5998781feb665011b9b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 9 Apr 2019 19:30:32 +0200 Subject: [PATCH 2/8] chore: remove debug logging --- src/dialer/queueManager.js | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index fd10b67..756fb7c 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -21,15 +21,6 @@ class DialQueueManager { this._dialingQueues = new Set() this._queues = {} this.switch = _switch - - setInterval(() => { - console.log('Cold queue has %s items', this._coldCallQueue.size) - console.log('Normal queue has %s items', this._queue.size) - console.log('Currently dialing to %s queues', this._dialingQueues.size) - console.log('%s items in the queue', Object.keys(this._dialingQueues.size).length) - console.log('%s connections', this.switch.connection.getAll().length) - console.log('%s known peers', Object.keys(this.switch._peerBook.getAll()).length) - }, 5e3) this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR) this.start() } From 21658dabf77c8e8c6ea9c840eb7c090e3dea2cce Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 9 Apr 2019 20:40:19 +0200 Subject: [PATCH 3/8] fix: support missing option for connect --- src/dialer/index.js | 4 ++++ test/dialer.spec.js | 24 +++++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/dialer/index.js b/src/dialer/index.js index 69fc84d..419ca35 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -74,6 +74,10 @@ module.exports = function (_switch) { * @param {function(Error, Connection)} callback */ function connect (peerInfo, options, callback) { + if (typeof options === 'function') { + callback = options + options = null + } options = { useFSM: false, priority: 1, ...options } _dial({ peerInfo, protocol: null, options, callback }) } diff --git a/test/dialer.spec.js b/test/dialer.spec.js index 3459f0f..2071da5 100644 --- a/test/dialer.spec.js +++ b/test/dialer.spec.js @@ -18,11 +18,13 @@ const createInfos = utils.createInfos describe('dialer', () => { let switchA + let switchB - before((done) => createInfos(1, (err, infos) => { + before((done) => createInfos(2, (err, infos) => { expect(err).to.not.exist() switchA = new Switch(infos[0], new PeerBook()) + switchB = new Switch(infos[1], new PeerBook()) done() })) @@ -31,6 +33,26 @@ describe('dialer', () => { sinon.restore() }) + describe('connect', () => { + afterEach(() => { + switchA.dialer.clearBlacklist(switchB._peerInfo) + }) + + it('should use default options', (done) => { + switchA.dialer.connect(switchB._peerInfo, (err) => { + expect(err).to.exist() + done() + }) + }) + + it('should be able to use custom options', (done) => { + switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: 0 }, (err) => { + expect(err).to.exist() + done() + }) + }) + }) + describe('queue', () => { it('should blacklist forever after 5 blacklists', () => { const queue = new Queue('QM', switchA) From fec88b2d3d655d6e9e802dc1513bc75d0e029b9f Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 10 Apr 2019 10:51:46 +0200 Subject: [PATCH 4/8] docs: add docs for connect --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 26c23cd..4d2014d 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,18 @@ works like dial, but calls back with a [Connection State Machine](#connection-st Connection state machines emit a number of events that can be used to determine the current state of the connection and to received the underlying connection that can be used to transfer data. +### `switch.dialer.connect(peer, options, callback)` + +a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when +an application only wishes to establish connections to new peers, such as during peer discovery when there is a low +peer count. + +- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][] +- `options`: Optional +- `options.priority`: Number of the priority of the dial, defaults to 1. +- `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine) +- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine) + ##### Events - `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted. - `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal. From a1c1e34a26142f471d7653085621accb3a85ce1e Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 10 Apr 2019 15:37:27 +0200 Subject: [PATCH 5/8] fix: add some space in priorities --- src/constants.js | 4 +++- src/dialer/index.js | 10 ++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/constants.js b/src/constants.js index fded895..f0b6496 100644 --- a/src/constants.js +++ b/src/constants.js @@ -6,5 +6,7 @@ module.exports = { DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials - QUARTER_HOUR: 15 * 60e3 + QUARTER_HOUR: 15 * 60e3, + PRIORITY_HIGH: 10, + PRIORITY_LOW: 20 } diff --git a/src/dialer/index.js b/src/dialer/index.js index 419ca35..ba14c60 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -6,7 +6,9 @@ const { BLACK_LIST_ATTEMPTS, BLACK_LIST_TTL, MAX_COLD_CALLS, - MAX_PARALLEL_DIALS + MAX_PARALLEL_DIALS, + PRIORITY_HIGH, + PRIORITY_LOW } = require('../constants') module.exports = function (_switch) { @@ -78,7 +80,7 @@ module.exports = function (_switch) { callback = options options = null } - options = { useFSM: false, priority: 1, ...options } + options = { useFSM: false, priority: PRIORITY_LOW, ...options } _dial({ peerInfo, protocol: null, options, callback }) } @@ -89,7 +91,7 @@ module.exports = function (_switch) { * @param {function(Error, Connection)} callback */ function dial (peerInfo, protocol, callback) { - _dial({ peerInfo, protocol, options: { useFSM: false, priority: 0 }, callback }) + _dial({ peerInfo, protocol, options: { useFSM: false, priority: PRIORITY_HIGH }, callback }) } /** @@ -100,7 +102,7 @@ module.exports = function (_switch) { * @param {function(Error, ConnectionFSM)} callback */ function dialFSM (peerInfo, protocol, callback) { - _dial({ peerInfo, protocol, options: { useFSM: true, priority: 0 }, callback }) + _dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback }) } return { From 00b7d63c912897989940443f04169bff9374bad9 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 10 Apr 2019 18:35:55 +0200 Subject: [PATCH 6/8] fix: reference the priority constants docs: update readme --- README.md | 6 ++---- src/dialer/index.js | 3 ++- src/dialer/queueManager.js | 6 +++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 4d2014d..884021f 100644 --- a/README.md +++ b/README.md @@ -154,13 +154,11 @@ and to received the underlying connection that can be used to transfer data. ### `switch.dialer.connect(peer, options, callback)` -a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when -an application only wishes to establish connections to new peers, such as during peer discovery when there is a low -peer count. +a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when an application only wishes to establish connections to new peers, such as during peer discovery when there is a low peer count. Currently, anything greater than the HIGH_PRIORITY (10) will be placed into the cold call queue, and anything less than or equal to the HIGH_PRIORITY will be added to the normal queue. - `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][] - `options`: Optional -- `options.priority`: Number of the priority of the dial, defaults to 1. +- `options.priority`: Number of the priority of the dial, defaults to 20. - `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine) - `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine) diff --git a/src/dialer/index.js b/src/dialer/index.js index ba14c60..8ee1ace 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -72,7 +72,7 @@ module.exports = function (_switch) { * @param {PeerInfo} peerInfo * @param {object} options * @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false. - * @param {number} options.priority Lowest priority goes first. Defaults to 1. + * @param {number} options.priority Lowest priority goes first. Defaults to 20. * @param {function(Error, Connection)} callback */ function connect (peerInfo, options, callback) { @@ -86,6 +86,7 @@ module.exports = function (_switch) { /** * Adds the dial request to the queue for the given `peerInfo` + * The request will be added with a high priority (10). * @param {PeerInfo} peerInfo * @param {string} protocol * @param {function(Error, Connection)} callback diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index 756fb7c..52355f6 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -5,7 +5,7 @@ const Queue = require('./queue') const { DIAL_ABORTED } = require('../errors') const nextTick = require('async/nextTick') const retimer = require('retimer') -const { QUARTER_HOUR } = require('../constants') +const { QUARTER_HOUR, PRIORITY_HIGH } = require('../constants') const debug = require('debug') const log = debug('libp2p:switch:dial:manager') const noop = () => {} @@ -110,7 +110,7 @@ class DialQueueManager { const targetQueue = this.getQueue(peerInfo) // Cold Call - if (options.priority > 0) { + if (options.priority > PRIORITY_HIGH) { // If we have too many cold calls, abort the dial immediately if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) { return nextTick(callback, DIAL_ABORTED()) @@ -138,7 +138,7 @@ class DialQueueManager { // Add the id to its respective queue set if the queue isn't running if (!targetQueue.isRunning) { - if (options.priority === 0) { + if (options.priority <= PRIORITY_HIGH) { this._queue.add(targetQueue.id) this._coldCallQueue.delete(targetQueue.id) // Only add it to the cold queue if it's not in the normal queue From 72cdf3526490f607b58b7e3caafe47b3b54918d6 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 10 Apr 2019 18:37:34 +0200 Subject: [PATCH 7/8] fix: create separate events for connection tracking (#328) --- README.md | 12 +++++++++++- src/connection/base.js | 1 + src/connection/index.js | 2 -- src/connection/manager.js | 16 +++++++++++++--- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 884021f..3abc807 100644 --- a/README.md +++ b/README.md @@ -196,7 +196,17 @@ Emitted when the switch encounters an error. ### `switch.on('peer-mux-closed', (peer) => {})` -- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection. +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection with. + +### `switch.on('connection:start', (peer) => {})` +This will be triggered anytime a new connection is created. + +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just started a connection with. + +### `switch.on('connection:end', (peer) => {})` +This will be triggered anytime an existing connection, regardless of state, is removed from the switch's internal connection tracking. + +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a connection with. ### `switch.on('start', () => {})` diff --git a/src/connection/base.js b/src/connection/base.js index cb3d5a5..36f7842 100644 --- a/src/connection/base.js +++ b/src/connection/base.js @@ -80,6 +80,7 @@ class BaseConnection extends EventEmitter { * @returns {void} */ _onDisconnected () { + this.switch.connection.remove(this) this.log('disconnected from %s', this.theirB58Id) this.emit('close') this.removeAllListeners() diff --git a/src/connection/index.js b/src/connection/index.js index 1bfe139..a2df251 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -269,8 +269,6 @@ class ConnectionFSM extends BaseConnection { _onDisconnecting () { this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer)) - this.switch.connection.remove(this) - delete this.switch.conns[this.theirB58Id] let tasks = [] diff --git a/src/connection/manager.js b/src/connection/manager.js index 1f17873..b6abd36 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -33,7 +33,12 @@ class ConnectionManager { // Only add it if it's not there if (!this.get(connection)) { this.connections[connection.theirB58Id].push(connection) - this.switch.emit('peer-mux-established', connection.theirPeerInfo) + this.switch.emit('connection:start', connection.theirPeerInfo) + if (connection.getState() === 'MUXED') { + this.switch.emit('peer-mux-established', connection.theirPeerInfo) + } else { + connection.once('muxed', () => this.switch.emit('peer-mux-established', connection.theirPeerInfo)) + } } } @@ -81,8 +86,10 @@ class ConnectionManager { remove (connection) { // No record of the peer, disconnect it if (!this.connections[connection.theirB58Id]) { - connection.theirPeerInfo.disconnect() - this.switch.emit('peer-mux-closed', connection.theirPeerInfo) + if (connection.theirPeerInfo) { + connection.theirPeerInfo.disconnect() + this.switch.emit('peer-mux-closed', connection.theirPeerInfo) + } return } @@ -99,6 +106,9 @@ class ConnectionManager { connection.theirPeerInfo.disconnect() this.switch.emit('peer-mux-closed', connection.theirPeerInfo) } + + // A tracked connection was closed, let the world know + this.switch.emit('connection:end', connection.theirPeerInfo) } /** From 77dee5c9a77ec5514d92fe193c047bcc8e142e85 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 10 Apr 2019 19:09:00 +0200 Subject: [PATCH 8/8] test: fix tests for priority numbers --- test/dialer.spec.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/dialer.spec.js b/test/dialer.spec.js index 2071da5..83ca43b 100644 --- a/test/dialer.spec.js +++ b/test/dialer.spec.js @@ -12,6 +12,7 @@ const PeerBook = require('peer-book') const Queue = require('../src/dialer/queue') const QueueManager = require('../src/dialer/queueManager') const Switch = require('../src') +const { PRIORITY_HIGH, PRIORITY_LOW } = require('../src/constants') const utils = require('./utils') const createInfos = utils.createInfos @@ -46,7 +47,7 @@ describe('dialer', () => { }) it('should be able to use custom options', (done) => { - switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: 0 }, (err) => { + switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: PRIORITY_HIGH }, (err) => { expect(err).to.exist() done() }) @@ -80,7 +81,7 @@ describe('dialer', () => { id: { toB58String: () => 'QmA' } }, protocol: null, - options: { useFSM: true, priority: 1 }, + options: { useFSM: true, priority: PRIORITY_LOW }, callback: (err) => { expect(err.code).to.eql('DIAL_ABORTED') done() @@ -97,7 +98,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: '/echo/1.0.0', - options: { useFSM: true, priority: 0 }, + options: { useFSM: true, priority: PRIORITY_HIGH }, callback: () => {} } @@ -121,7 +122,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: null, - options: { useFSM: true, priority: 1 }, + options: { useFSM: true, priority: PRIORITY_LOW }, callback: () => {} } @@ -142,7 +143,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: null, - options: { useFSM: true, priority: 1 }, + options: { useFSM: true, priority: PRIORITY_LOW }, callback: (err) => { expect(runSpy.called).to.eql(false) expect(hasSpy.called).to.eql(true)