Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

feat: global dial queue #314

Merged
merged 9 commits into from
Mar 28, 2019
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const sw = new switch(peerInfo , peerBook [, options])

If defined, `options` should be an object with the following keys and respective values:

- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
- `maxParallelDials` - number of concurrent dials the switch should allow. Defaults to `50`
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.
Expand Down
6 changes: 6 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict'

module.exports = {
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
}
14 changes: 13 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const DialQueueManager = require('./queueManager')
const getPeerInfo = require('../get-peer-info')
const { MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants')

module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)
Expand Down Expand Up @@ -39,6 +40,14 @@ module.exports = function (_switch) {
callback()
}

/**
* Clears the blacklist for a given peer
* @param {PeerInfo} peerInfo
*/
function clearBlacklist (peerInfo) {
dialQueueManager.clearBlacklist(peerInfo)
}

/**
* Adds the dial request to the queue for the given `peerInfo`
* @param {PeerInfo} peerInfo
Expand All @@ -63,6 +72,9 @@ module.exports = function (_switch) {
return {
dial,
dialFSM,
abort
abort,
clearBlacklist,
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials
}
}
63 changes: 53 additions & 10 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const ConnectionFSM = require('../connection')
const { DIAL_ABORTED } = require('../errors')
const { DIAL_ABORTED, ERR_BLACKLISTED } = require('../errors')
const Connection = require('interface-connection').Connection
const nextTick = require('async/nextTick')
const once = require('once')
Expand Down Expand Up @@ -63,12 +63,15 @@ class Queue {
* @constructor
* @param {string} peerId
* @param {Switch} _switch
* @param {function} onStopped Called when the queue stops
*/
constructor (peerId, _switch) {
constructor (peerId, _switch, onStopped) {
this.id = peerId
this.switch = _switch
this._queue = []
this.blackListed = null
this.isRunning = false
this.onStopped = onStopped
}
get length () {
return this._queue.length
Expand All @@ -80,39 +83,78 @@ class Queue {
* @param {string} protocol
* @param {boolean} useFSM If callback should use a ConnectionFSM instead
* @param {function(Error, Connection)} callback
* @returns {boolean} whether or not the queue has been started
*/
add (protocol, useFSM, callback) {
if (!this.isDialAllowed()) {
nextTick(callback, ERR_BLACKLISTED())
return false
}
this._queue.push({ protocol, useFSM, callback })
if (!this.isRunning) {
log('starting dial queue to %s', this.id)
this.start()
return this.start()
}

/**
* Determines whether or not dialing is currently allowed
* @returns {boolean}
*/
isDialAllowed () {
if (this.blackListed) {
// If the blacklist ttl has passed, reset it
if (Date.now() - this.blackListed > this.switch.dialer.BLACK_LIST_TTL) {
this.blackListed = null
return true
}
// Dial is not allowed
return false
}
return true
}

/**
* Starts the queue
* Starts the queue. If the queue was started `true` will be returned.
* If the queue was already running `false` is returned.
* @returns {boolean}
*/
start () {
this.isRunning = true
this._run()
if (!this.isRunning) {
log('starting dial queue to %s', this.id)
this.isRunning = true
this._run()
return true
}
return false
}

/**
* Stops the queue
*/
stop () {
this.isRunning = false
if (this.isRunning) {
log('stopping dial queue to %s', this.id)
this.isRunning = false
this.onStopped()
}
}

/**
* Stops the queue and errors the callback for each dial request
*/
abort () {
this.stop()
while (this.length > 0) {
let dial = this._queue.shift()
dial.callback(DIAL_ABORTED())
}
this.stop()
}

/**
* Marks the queue as blacklisted. The queue will be immediately aborted.
*/
blacklist () {
log('blacklisting queue for %s', this.id)
this.blackListed = Date.now()
this.abort()
}

/**
Expand Down Expand Up @@ -189,6 +231,7 @@ class Queue {
// depending on the error.
connectionFSM.once('error', (err) => {
queuedDial.callback(err)
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
this.blacklist()
})

connectionFSM.once('close', () => {
Expand Down
64 changes: 57 additions & 7 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const once = require('once')
const Queue = require('./queue')

const { DIAL_ABORTED } = require('../errors')
const noop = () => {}

class DialQueueManager {
Expand All @@ -11,8 +11,10 @@ class DialQueueManager {
* @param {Switch} _switch
*/
constructor (_switch) {
this._queue = {}
this._queue = []
this._queues = {}
this.switch = _switch
this.dials = 0
}

/**
Expand All @@ -22,7 +24,14 @@ class DialQueueManager {
* This causes the entire DialerQueue to be drained
*/
abort () {
const queues = Object.values(this._queue)
// Abort items in the general queue
while (this._queue.length > 0) {
let dial = this._queue.shift()
dial.callback(DIAL_ABORTED())
}

// Abort the individual peer queues
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
dialQueue.abort()
})
Expand All @@ -32,12 +41,53 @@ class DialQueueManager {
* Adds the `dialRequest` to the queue and ensures the queue is running
*
* @param {DialRequest} dialRequest
* @returns {void}
*/
add ({ peerInfo, protocol, useFSM, callback }) {
callback = callback ? once(callback) : noop

let dialQueue = this.getQueue(peerInfo)
dialQueue.add(protocol, useFSM, callback)
// If the target queue is currently running, just add the dial
// directly to it. This acts as a crude priority lane for multiple
// calls to a peer.
const targetQueue = this.getQueue(peerInfo)
if (targetQueue.isRunning) {
targetQueue.add(protocol, useFSM, callback)
return
}

this._queue.push({ peerInfo, protocol, useFSM, callback })
this.run()
}

/**
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (this.dials < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.length > 0) {
let { peerInfo, protocol, useFSM, callback } = this._queue.shift()
let dialQueue = this.getQueue(peerInfo)
if (dialQueue.add(protocol, useFSM, callback)) {
this.dials++
}
}
}

/**
* Will remove the `peerInfo` from the dial blacklist
* @param {PeerInfo} peerInfo
*/
clearBlacklist (peerInfo) {
this.getQueue(peerInfo).blackListed = null
}

/**
* A handler for when dialing queues stop. This will trigger
* `run()` in order to keep the queue processing.
* @private
*/
_onQueueStopped () {
this.dials--
this.run()
}

/**
Expand All @@ -48,8 +98,8 @@ class DialQueueManager {
getQueue (peerInfo) {
const id = peerInfo.id.toB58String()

this._queue[id] = this._queue[id] || new Queue(id, this.switch)
return this._queue[id]
this._queues[id] = this._queues[id] || new Queue(id, this.switch, this._onQueueStopped.bind(this))
return this._queues[id]
}
}

Expand Down
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const errCode = require('err-code')
module.exports = {
CONNECTION_FAILED: (err) => errCode(err, 'CONNECTION_FAILED'),
DIAL_ABORTED: () => errCode('Dial was aborted', 'DIAL_ABORTED'),
ERR_BLACKLISTED: () => errCode('Dial is currently blacklisted for this peer', 'ERR_BLACKLISTED'),
DIAL_SELF: () => errCode('A node cannot dial itself', 'DIAL_SELF'),
INVALID_STATE_TRANSITION: (err) => errCode(err, 'INVALID_STATE_TRANSITION'),
NO_TRANSPORTS_REGISTERED: () => errCode('No transports registered, dial not possible', 'NO_TRANSPORTS_REGISTERED'),
Expand Down
6 changes: 3 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ class Switch extends EventEmitter {
})

// higher level (public) API
const dialer = getDialer(this)
this.dial = dialer.dial
this.dialFSM = dialer.dialFSM
this.dialer = getDialer(this)
this.dial = this.dialer.dial
this.dialFSM = this.dialer.dialFSM
}

/**
Expand Down
5 changes: 4 additions & 1 deletion test/circuit-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const getPorts = require('portfinder').getPorts
const utils = require('./utils')
const createInfos = utils.createInfos
const Swarm = require('../src')
const switchOptions = {
blacklistTTL: 0 // nullifies blacklisting
}

describe(`circuit`, function () {
let swarmA // TCP and WS
Expand All @@ -36,7 +39,7 @@ describe(`circuit`, function () {
peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001')
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws')

swarmA = new Swarm(peerA, new PeerBook())
swarmA = new Swarm(peerA, new PeerBook(), switchOptions)
swarmB = new Swarm(peerB, new PeerBook())
swarmC = new Swarm(peerC, new PeerBook())

Expand Down
19 changes: 19 additions & 0 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ describe('dialFSM', () => {
protocol = '/error/1.0.0'
switchC.handle(protocol, () => { })

switchA.dialer.clearBlacklist(switchC._peerInfo)
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
connFSM.once('error', (err) => {
Expand All @@ -113,6 +114,24 @@ describe('dialFSM', () => {
})
})

it('should error when the peer is blacklisted', (done) => {
protocol = '/error/1.0.0'
switchC.handle(protocol, () => { })

switchA.dialer.clearBlacklist(switchC._peerInfo)
switchA.dialFSM(switchC._peerInfo, protocol, (err, connFSM) => {
expect(err).to.not.exist()
connFSM.once('error', () => {
// dial with the blacklist
switchA.dialFSM(switchC._peerInfo, protocol, (err) => {
expect(err).to.exist()
expect(err.code).to.eql('ERR_BLACKLISTED')
done()
})
})
})
})

it('should emit a `closed` event when closed', (done) => {
protocol = '/closed/1.0.0'
switchB.handle(protocol, () => { })
Expand Down