Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

fix: improve stopping logic #324

Merged
merged 5 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class ConnectionFSM extends BaseConnection {
UPGRADING: { // Attempting to upgrade the connection with muxers
stop: 'CONNECTED', // If we cannot mux, stop upgrading
done: 'MUXED',
error: 'ERRORED'
error: 'ERRORED',
disconnect: 'DISCONNECTING'
},
MUXED: {
disconnect: 'DISCONNECTING'
Expand Down
18 changes: 14 additions & 4 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ const {
module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)

_switch.state.on('STOPPING:enter', abort)
_switch.state.on('STARTED:enter', start)
_switch.state.on('STOPPING:enter', stop)

/**
* @param {DialRequest} dialRequest
Expand All @@ -34,14 +35,24 @@ module.exports = function (_switch) {
dialQueueManager.add({ peerInfo, protocol, useFSM, callback })
}

/**
* Starts the `DialQueueManager`
*
* @param {function} callback
*/
function start (callback) {
dialQueueManager.start()
callback()
}

/**
* Aborts all dials that are queued. This should
* only be used when the Switch is being stopped
*
* @param {function} callback
*/
function abort (callback) {
dialQueueManager.abort()
function stop (callback) {
dialQueueManager.stop()
callback()
}

Expand Down Expand Up @@ -77,7 +88,6 @@ module.exports = function (_switch) {
return {
dial,
dialFSM,
abort,
clearBlacklist,
BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts,
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
Expand Down
13 changes: 12 additions & 1 deletion src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class DialQueueManager {
this._queues = {}
this.switch = _switch
this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR)
this.start()
}

/**
Expand Down Expand Up @@ -66,13 +67,21 @@ class DialQueueManager {
this._cleanInterval.reschedule(QUARTER_HOUR)
}

/**
* Allows the `DialQueueManager` to execute dials
*/
start () {
this.isRunning = true
}

/**
* Iterates over all items in the DialerQueue
* and executes there callback with an error.
*
* This causes the entire DialerQueue to be drained
*/
abort () {
stop () {
this.isRunning = false
// Clear the general queue
this._queue.clear()
// Clear the cold call queue
Expand Down Expand Up @@ -140,6 +149,8 @@ class DialQueueManager {
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (!this.isRunning) return

if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) {
let nextQueue = { done: true }
// Check the queue first and fall back to the cold call queue
Expand Down
8 changes: 6 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ class Switch extends EventEmitter {
}, (err) => {
if (err) {
log.error(err)
return this.emit('error', err)
this.emit('error', err)
return this.state('stop')
}
this.state('done')
})
Expand All @@ -250,7 +251,10 @@ class Switch extends EventEmitter {
(cb) => {
each(this.transports, (transport, cb) => {
each(transport.listeners, (listener, cb) => {
listener.close(cb)
listener.close((err) => {
if (err) log.error(err)
cb()
})
}, cb)
}, cb)
},
Expand Down
190 changes: 96 additions & 94 deletions test/circuit-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,124 +24,126 @@ const switchOptions = {
}

describe(`circuit`, function () {
let swarmA // TCP and WS
let swarmB // WS
let swarmC // no transports
let dialSpyA
describe('basic', () => {
let swarmA // TCP and WS
let swarmB // WS
let swarmC // no transports
let dialSpyA

before((done) => createInfos(3, (err, infos) => {
expect(err).to.not.exist()
before((done) => createInfos(3, (err, infos) => {
expect(err).to.not.exist()

const peerA = infos[0]
const peerB = infos[1]
const peerC = infos[2]
const peerA = infos[0]
const peerB = infos[1]
const peerC = infos[2]

peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001')
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws')
peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001')
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws')

swarmA = new Swarm(peerA, new PeerBook(), switchOptions)
swarmB = new Swarm(peerB, new PeerBook())
swarmC = new Swarm(peerC, new PeerBook())
swarmA = new Swarm(peerA, new PeerBook(), switchOptions)
swarmB = new Swarm(peerB, new PeerBook())
swarmC = new Swarm(peerC, new PeerBook())

swarmA.transport.add('tcp', new TCP())
swarmA.transport.add('ws', new WS())
swarmB.transport.add('ws', new WS())
swarmA.transport.add('tcp', new TCP())
swarmA.transport.add('ws', new WS())
swarmB.transport.add('ws', new WS())

dialSpyA = sinon.spy(swarmA.transport, 'dial')
dialSpyA = sinon.spy(swarmA.transport, 'dial')

done()
}))
done()
}))

after((done) => {
parallel([
(cb) => swarmA.stop(cb),
(cb) => swarmB.stop(cb)
], done)
})
after((done) => {
parallel([
(cb) => swarmA.stop(cb),
(cb) => swarmB.stop(cb)
], done)
})

it('circuit not enabled and all transports failed', (done) => {
swarmA.dial(swarmC._peerInfo, (err, conn) => {
expect(err).to.exist()
expect(err).to.match(/Circuit not enabled and all transports failed to dial peer/)
expect(conn).to.not.exist()
done()
it('circuit not enabled and all transports failed', (done) => {
swarmA.dial(swarmC._peerInfo, (err, conn) => {
expect(err).to.exist()
expect(err).to.match(/Circuit not enabled and all transports failed to dial peer/)
expect(conn).to.not.exist()
done()
})
})
})

it('.enableCircuitRelay', () => {
swarmA.connection.enableCircuitRelay({ enabled: true })
expect(Object.keys(swarmA.transports).length).to.equal(3)
it('.enableCircuitRelay', () => {
swarmA.connection.enableCircuitRelay({ enabled: true })
expect(Object.keys(swarmA.transports).length).to.equal(3)

swarmB.connection.enableCircuitRelay({ enabled: true })
expect(Object.keys(swarmB.transports).length).to.equal(2)
})
swarmB.connection.enableCircuitRelay({ enabled: true })
expect(Object.keys(swarmB.transports).length).to.equal(2)
})

it('listed on the transports map', () => {
expect(swarmA.transports.Circuit).to.exist()
expect(swarmB.transports.Circuit).to.exist()
})
it('listed on the transports map', () => {
expect(swarmA.transports.Circuit).to.exist()
expect(swarmB.transports.Circuit).to.exist()
})

it('add /p2p-circuit addrs on start', (done) => {
parallel([
(cb) => swarmA.start(cb),
(cb) => swarmB.start(cb)
], (err) => {
expect(err).to.not.exist()
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
.includes(`/p2p-circuit`)).length).to.be.at.least(3)
// ensure swarmA has had 0.0.0.0 replaced in the addresses
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
.includes(`/0.0.0.0`)).length).to.equal(0)
expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
.includes(`/p2p-circuit`)).length).to.be.at.least(2)
done()
it('add /p2p-circuit addrs on start', (done) => {
parallel([
(cb) => swarmA.start(cb),
(cb) => swarmB.start(cb)
], (err) => {
expect(err).to.not.exist()
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
.includes(`/p2p-circuit`)).length).to.be.at.least(3)
// ensure swarmA has had 0.0.0.0 replaced in the addresses
expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
.includes(`/0.0.0.0`)).length).to.equal(0)
expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString()
.includes(`/p2p-circuit`)).length).to.be.at.least(2)
done()
})
})
})

it('dial circuit only once', (done) => {
swarmA._peerInfo.multiaddrs.clear()
swarmA._peerInfo.multiaddrs
.add(`/dns4/wrtc-star.discovery.libp2p.io/tcp/443/wss/p2p-webrtc-star`)
it('dial circuit only once', (done) => {
swarmA._peerInfo.multiaddrs.clear()
swarmA._peerInfo.multiaddrs
.add(`/dns4/wrtc-star.discovery.libp2p.io/tcp/443/wss/p2p-webrtc-star`)

swarmA.dial(swarmC._peerInfo, (err, conn) => {
expect(err).to.exist()
expect(err).to.match(/No available transports to dial peer/)
expect(conn).to.not.exist()
expect(dialSpyA.callCount).to.be.eql(1)
done()
swarmA.dial(swarmC._peerInfo, (err, conn) => {
expect(err).to.exist()
expect(err).to.match(/No available transports to dial peer/)
expect(conn).to.not.exist()
expect(dialSpyA.callCount).to.be.eql(1)
done()
})
})
})

it('dial circuit last', (done) => {
const peerC = swarmC._peerInfo
peerC.multiaddrs.clear()
peerC.multiaddrs.add(`/p2p-circuit/ipfs/ABCD`)
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9998/ipfs/ABCD`)
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9999/ws/ipfs/ABCD`)

swarmA.dial(peerC, (err, conn) => {
expect(err).to.exist()
expect(conn).to.not.exist()
expect(dialSpyA.lastCall.args[0]).to.be.eql('Circuit')
done()
it('dial circuit last', (done) => {
const peerC = swarmC._peerInfo
peerC.multiaddrs.clear()
peerC.multiaddrs.add(`/p2p-circuit/ipfs/ABCD`)
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9998/ipfs/ABCD`)
peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9999/ws/ipfs/ABCD`)

swarmA.dial(peerC, (err, conn) => {
expect(err).to.exist()
expect(conn).to.not.exist()
expect(dialSpyA.lastCall.args[0]).to.be.eql('Circuit')
done()
})
})
})

it('should not try circuit if no transports enabled', (done) => {
swarmC.dial(swarmA._peerInfo, (err, conn) => {
expect(err).to.exist()
expect(conn).to.not.exist()
it('should not try circuit if no transports enabled', (done) => {
swarmC.dial(swarmA._peerInfo, (err, conn) => {
expect(err).to.exist()
expect(conn).to.not.exist()

expect(err).to.match(/No transports registered, dial not possible/)
done()
expect(err).to.match(/No transports registered, dial not possible/)
done()
})
})
})

it('should not dial circuit if other transport succeed', (done) => {
swarmA.dial(swarmB._peerInfo, (err) => {
expect(err).not.to.exist()
expect(dialSpyA.lastCall.args[0]).to.not.be.eql('Circuit')
done()
it('should not dial circuit if other transport succeed', (done) => {
swarmA.dial(swarmB._peerInfo, (err) => {
expect(err).not.to.exist()
expect(dialSpyA.lastCall.args[0]).to.not.be.eql('Circuit')
done()
})
})
})

Expand Down
19 changes: 14 additions & 5 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(require('chai-checkmark'))
chai.use(dirtyChai)
const sinon = require('sinon')
const PeerBook = require('peer-book')
const parallel = require('async/parallel')
const WS = require('libp2p-websockets')
Expand Down Expand Up @@ -348,16 +349,24 @@ describe('dialFSM', () => {

switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err, connFSM) => {
expect(err).to.not.exist()
connFSM._state.on('UPGRADING:enter', (cb) => {
expect(2).checks(done)
// 2 conn aborts, 1 close, and 1 stop
expect(4).checks(done)

connFSM.once('close', (err) => {
expect(err).to.not.exist().mark()
})

sinon.stub(connFSM, '_onUpgrading').callsFake(() => {
switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => {
expect(err).to.exist().mark()
expect(err.code).to.eql('DIAL_ABORTED').mark()
})
switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => {
expect(err).to.exist().mark()
expect(err.code).to.eql('DIAL_ABORTED').mark()
})

switchA.stop(cb)
switchA.stop((err) => {
expect(err).to.not.exist().mark()
})
})
})
})
Expand Down