Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

feat: support a priority queue for dials #325

Merged
merged 8 commits into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ works like dial, but calls back with a [Connection State Machine](#connection-st
Connection state machines emit a number of events that can be used to determine the current state of the connection
and to received the underlying connection that can be used to transfer data.

### `switch.dialer.connect(peer, options, callback)`

a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when an application only wishes to establish connections to new peers, such as during peer discovery when there is a low peer count. Currently, anything greater than the HIGH_PRIORITY (10) will be placed into the cold call queue, and anything less than or equal to the HIGH_PRIORITY will be added to the normal queue.

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `options`: Optional
- `options.priority`: Number of the priority of the dial, defaults to 20.
- `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine)
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)

##### Events
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
Expand Down Expand Up @@ -186,7 +196,17 @@ Emitted when the switch encounters an error.

### `switch.on('peer-mux-closed', (peer) => {})`

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection with.

### `switch.on('connection:start', (peer) => {})`
This will be triggered anytime a new connection is created.

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just started a connection with.

### `switch.on('connection:end', (peer) => {})`
This will be triggered anytime an existing connection, regardless of state, is removed from the switch's internal connection tracking.

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a connection with.

### `switch.on('start', () => {})`

Expand Down
1 change: 1 addition & 0 deletions src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onDisconnected () {
this.switch.connection.remove(this)
this.log('disconnected from %s', this.theirB58Id)
this.emit('close')
this.removeAllListeners()
Expand Down
2 changes: 0 additions & 2 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ class ConnectionFSM extends BaseConnection {
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]

let tasks = []
Expand Down
16 changes: 13 additions & 3 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ class ConnectionManager {
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
this.switch.emit('connection:start', connection.theirPeerInfo)
if (connection.getState() === 'MUXED') {
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
} else {
connection.once('muxed', () => this.switch.emit('peer-mux-established', connection.theirPeerInfo))
}
}
}

Expand Down Expand Up @@ -81,8 +86,10 @@ class ConnectionManager {
remove (connection) {
// No record of the peer, disconnect it
if (!this.connections[connection.theirB58Id]) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
if (connection.theirPeerInfo) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}
return
}

Expand All @@ -99,6 +106,9 @@ class ConnectionManager {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}

// A tracked connection was closed, let the world know
this.switch.emit('connection:end', connection.theirPeerInfo)
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ module.exports = {
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
QUARTER_HOUR: 15 * 60e3
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20
}
32 changes: 27 additions & 5 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ const {
BLACK_LIST_ATTEMPTS,
BLACK_LIST_TTL,
MAX_COLD_CALLS,
MAX_PARALLEL_DIALS
MAX_PARALLEL_DIALS,
PRIORITY_HIGH,
PRIORITY_LOW
} = require('../constants')

module.exports = function (_switch) {
Expand All @@ -19,7 +21,7 @@ module.exports = function (_switch) {
* @param {DialRequest} dialRequest
* @returns {void}
*/
function _dial ({ peerInfo, protocol, useFSM, callback }) {
function _dial ({ peerInfo, protocol, options, callback }) {
if (typeof protocol === 'function') {
callback = protocol
protocol = null
Expand All @@ -32,7 +34,7 @@ module.exports = function (_switch) {
}

// Add it to the queue, it will automatically get executed
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
dialQueueManager.add({ peerInfo, protocol, options, callback })
}

/**
Expand Down Expand Up @@ -64,14 +66,33 @@ module.exports = function (_switch) {
dialQueueManager.clearBlacklist(peerInfo)
}

/**
* Attempts to establish a connection to the given `peerInfo` at
* a lower priority than a standard dial.
* @param {PeerInfo} peerInfo
* @param {object} options
* @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false.
* @param {number} options.priority Lowest priority goes first. Defaults to 20.
* @param {function(Error, Connection)} callback
*/
function connect (peerInfo, options, callback) {
if (typeof options === 'function') {
callback = options
options = null
}
options = { useFSM: false, priority: PRIORITY_LOW, ...options }
_dial({ peerInfo, protocol: null, options, callback })
}

/**
* Adds the dial request to the queue for the given `peerInfo`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs for using higher priority here

* The request will be added with a high priority (10).
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, Connection)} callback
*/
function dial (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: false, callback })
_dial({ peerInfo, protocol, options: { useFSM: false, priority: PRIORITY_HIGH }, callback })
}

/**
Expand All @@ -82,10 +103,11 @@ module.exports = function (_switch) {
* @param {function(Error, ConnectionFSM)} callback
*/
function dialFSM (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, useFSM: true, callback })
_dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback })
}

return {
connect,
dial,
dialFSM,
clearBlacklist,
Expand Down
4 changes: 3 additions & 1 deletion src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ log.error = debug('libp2p:switch:dial:error')
* @typedef {Object} DialRequest
* @property {PeerInfo} peerInfo - The peer to dial to
* @property {string} [protocol] - The protocol to create a stream for
* @property {boolean} useFSM - If `callback` should return a ConnectionFSM
* @property {object} options
* @property {boolean} options.useFSM - If `callback` should return a ConnectionFSM
* @property {number} options.priority - The priority of the dial
* @property {function(Error, Connection|ConnectionFSM)} callback
*/

Expand Down
27 changes: 16 additions & 11 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const nextTick = require('async/nextTick')
const retimer = require('retimer')
const { QUARTER_HOUR } = require('../constants')
const { QUARTER_HOUR, PRIORITY_HIGH } = require('../constants')
const debug = require('debug')
const log = debug('libp2p:switch:dial:manager')
const noop = () => {}
Expand Down Expand Up @@ -103,17 +103,25 @@ class DialQueueManager {
* @param {DialRequest} dialRequest
* @returns {void}
*/
add ({ peerInfo, protocol, useFSM, callback }) {
add ({ peerInfo, protocol, options, callback }) {
callback = callback ? once(callback) : noop

// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) {
return nextTick(callback, DIAL_ABORTED())

// Cold Call
if (options.priority > PRIORITY_HIGH) {
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) {
return nextTick(callback, DIAL_ABORTED())
}

if (this._queue.has(targetQueue.id)) {
return nextTick(callback, DIAL_ABORTED())
}
}

targetQueue.add(protocol, useFSM, callback)
targetQueue.add(protocol, options.useFSM, callback)

// If we're already connected to the peer, start the queue now
// While it might cause queues to go over the max parallel amount,
Expand All @@ -130,15 +138,12 @@ class DialQueueManager {

// Add the id to its respective queue set if the queue isn't running
if (!targetQueue.isRunning) {
if (protocol) {
if (options.priority <= PRIORITY_HIGH) {
this._queue.add(targetQueue.id)
this._coldCallQueue.delete(targetQueue.id)
// Only add it to the cold queue if it's not in the normal queue
} else if (!this._queue.has(targetQueue.id)) {
this._coldCallQueue.add(targetQueue.id)
// The peer is already in the normal queue, abort the cold call
} else {
return nextTick(callback, DIAL_ABORTED())
this._coldCallQueue.add(targetQueue.id)
}
}

Expand Down
33 changes: 28 additions & 5 deletions test/dialer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@ const PeerBook = require('peer-book')
const Queue = require('../src/dialer/queue')
const QueueManager = require('../src/dialer/queueManager')
const Switch = require('../src')
const { PRIORITY_HIGH, PRIORITY_LOW } = require('../src/constants')

const utils = require('./utils')
const createInfos = utils.createInfos

describe('dialer', () => {
let switchA
let switchB

before((done) => createInfos(1, (err, infos) => {
before((done) => createInfos(2, (err, infos) => {
expect(err).to.not.exist()

switchA = new Switch(infos[0], new PeerBook())
switchB = new Switch(infos[1], new PeerBook())

done()
}))
Expand All @@ -31,6 +34,26 @@ describe('dialer', () => {
sinon.restore()
})

describe('connect', () => {
afterEach(() => {
switchA.dialer.clearBlacklist(switchB._peerInfo)
})

it('should use default options', (done) => {
switchA.dialer.connect(switchB._peerInfo, (err) => {
expect(err).to.exist()
done()
})
})

it('should be able to use custom options', (done) => {
switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: PRIORITY_HIGH }, (err) => {
expect(err).to.exist()
done()
})
})
})

describe('queue', () => {
it('should blacklist forever after 5 blacklists', () => {
const queue = new Queue('QM', switchA)
Expand Down Expand Up @@ -58,7 +81,7 @@ describe('dialer', () => {
id: { toB58String: () => 'QmA' }
},
protocol: null,
useFSM: true,
options: { useFSM: true, priority: PRIORITY_LOW },
callback: (err) => {
expect(err.code).to.eql('DIAL_ABORTED')
done()
Expand All @@ -75,7 +98,7 @@ describe('dialer', () => {
isConnected: () => null
},
protocol: '/echo/1.0.0',
useFSM: true,
options: { useFSM: true, priority: PRIORITY_HIGH },
callback: () => {}
}

Expand All @@ -99,7 +122,7 @@ describe('dialer', () => {
isConnected: () => null
},
protocol: null,
useFSM: true,
options: { useFSM: true, priority: PRIORITY_LOW },
callback: () => {}
}

Expand All @@ -120,7 +143,7 @@ describe('dialer', () => {
isConnected: () => null
},
protocol: null,
useFSM: true,
options: { useFSM: true, priority: PRIORITY_LOW },
callback: (err) => {
expect(runSpy.called).to.eql(false)
expect(hasSpy.called).to.eql(true)
Expand Down