Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix: improve connection closing and error handling (#285)
Browse files Browse the repository at this point in the history
* fix: improve connection closing and error handling

* test: improve identify test

*  chore: update deps

* fix: only emit from connections if there is a listener

* test: add more connection tests

* chore: update libp2p-mplex
  • Loading branch information
jacobheun authored Nov 15, 2018
1 parent fe84058 commit e757cf6
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 60 deletions.
28 changes: 14 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,40 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^15.1.0",
"chai": "^4.1.2",
"aegir": "^17.0.1",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"libp2p-mplex": "~0.8.2",
"libp2p-mplex": "~0.8.4",
"libp2p-pnet": "~0.1.0",
"libp2p-secio": "~0.10.0",
"libp2p-spdy": "~0.12.1",
"libp2p-tcp": "~0.12.1",
"libp2p-webrtc-star": "~0.15.4",
"libp2p-secio": "~0.10.1",
"libp2p-spdy": "~0.13.0",
"libp2p-tcp": "~0.13.0",
"libp2p-webrtc-star": "~0.15.5",
"libp2p-websockets": "~0.12.0",
"peer-book": "~0.8.0",
"portfinder": "^1.0.17",
"sinon": "^6.2.0",
"portfinder": "^1.0.19",
"sinon": "^7.1.1",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
"async": "^2.6.1",
"big.js": "^5.1.2",
"big.js": "^5.2.2",
"class-is": "^1.1.0",
"debug": "^3.1.0",
"debug": "^4.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
"libp2p-circuit": "~0.2.1",
"libp2p-circuit": "~0.3.0",
"libp2p-identify": "~0.7.2",
"lodash.includes": "^4.3.0",
"moving-average": "^1.0.0",
"multiaddr": "^5.0.0",
"multiaddr": "^5.0.2",
"multistream-select": "~0.14.3",
"once": "^1.4.0",
"peer-id": "~0.11.0",
"peer-id": "~0.12.0",
"peer-info": "~0.14.1",
"pull-stream": "^3.6.9"
},
Expand Down
26 changes: 24 additions & 2 deletions src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,29 @@ class BaseConnection extends EventEmitter {
this.switch = _switch
this.ourPeerInfo = this.switch._peerInfo
this.log = debug(`libp2p:conn:${name}`)
this.log.error = debug(`libp2p:conn:${name}:error`)
}

/**
* Puts the state into its disconnecting flow
*
* @param {Error} err Will be emitted if provided
* @returns {void}
*/
close (err) {
this.log(`closing connection to ${this.theirB58Id}`)
if (err && this._events.error) {
this.emit('error', err)
}
this._state('disconnect')
}

emit (eventName, ...args) {
if (eventName === 'error' && !this._events.error) {
this.log.error(...args)
} else {
super.emit(eventName, ...args)
}
}

/**
Expand Down Expand Up @@ -86,8 +109,7 @@ class BaseConnection extends EventEmitter {

this.conn = this.switch.protector.protect(this.conn, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
return this.close(err)
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
Expand Down
3 changes: 1 addition & 2 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class IncomingConnectionFSM extends BaseConnection {
this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
return this.close(err)
}
this.conn.getPeerInfo((_, peerInfo) => {
this.theirPeerInfo = peerInfo
Expand Down
31 changes: 7 additions & 24 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,6 @@ class ConnectionFSM extends BaseConnection {
this._state.on('error', (err) => this._onStateError(err))
}

/**
* Puts the state into its disconnecting flow
*
* @returns {void}
*/
close () {
this.log(`closing connection to ${this.theirB58Id}`)
this._state('disconnect')
}

/**
* Puts the state into dialing mode
*
Expand Down Expand Up @@ -200,8 +190,7 @@ class ConnectionFSM extends BaseConnection {
this.log(`dialing ${this.theirB58Id}`)

if (!this.switch.hasTransports()) {
this.emit('error', Errors.NO_TRANSPORTS_REGISTERED())
return this._state('disconnect')
return this.close(Errors.NO_TRANSPORTS_REGISTERED())
}

const tKeys = this.switch.availableTransports(this.theirPeerInfo)
Expand All @@ -213,17 +202,15 @@ class ConnectionFSM extends BaseConnection {
let transport = key
if (!transport) {
if (!circuitEnabled) {
this.emit('error', Errors.CONNECTION_FAILED(
return this.close(Errors.CONNECTION_FAILED(
new Error(`Circuit not enabled and all transports failed to dial peer ${this.theirB58Id}!`)
))
return this._state('disconnect')
}

if (circuitTried) {
this.emit('error', Errors.CONNECTION_FAILED(
return this.close(Errors.CONNECTION_FAILED(
new Error(`No available transports to dial peer ${this.theirB58Id}!`)
))
return this._state('disconnect')
}

this.log(`Falling back to dialing over circuit`)
Expand Down Expand Up @@ -300,24 +287,21 @@ class ConnectionFSM extends BaseConnection {
const msDialer = new multistream.Dialer()
msDialer.handle(this.conn, (err) => {
if (err) {
this.emit('error', Errors.maybeUnexpectedEnd(err))
return this._state('disconnect')
return this.close(Errors.maybeUnexpectedEnd(err))
}

this.log('selecting crypto %s to %s', this.switch.crypto.tag, this.theirB58Id)

msDialer.select(this.switch.crypto.tag, (err, _conn) => {
if (err) {
this._state('disconnect')
return this.emit('error', Errors.maybeUnexpectedEnd(err))
return this.close(Errors.maybeUnexpectedEnd(err))
}

const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)

this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => {
if (err) {
this._state('disconnect')
return this.emit('error', err)
return this.close(err)
}

this.conn.setPeerInfo(this.theirPeerInfo)
Expand Down Expand Up @@ -371,8 +355,7 @@ class ConnectionFSM extends BaseConnection {
this.switch.muxedConns[this.theirB58Id] = this

this.muxer.once('close', () => {
delete this.muxer
this._state('disconnect')
this.close()
})

// For incoming streams, in case identify is on
Expand Down
48 changes: 48 additions & 0 deletions test/connection.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ describe('ConnectionFSM', () => {
connection.dial()
})

it('should be able to close with an error and not throw', (done) => {
const connection = new ConnectionFSM({
_switch: dialerSwitch,
peerInfo: listenerSwitch._peerInfo
})

connection.once('connected', (conn) => {
expect(conn).to.be.an.instanceof(Connection)
expect(() => connection.close(new Error('shutting down'))).to.not.throw()
done()
})

connection.dial()
})

it('should emit warning on dial failed attempt', (done) => {
const connection = new ConnectionFSM({
_switch: dialerSwitch,
Expand Down Expand Up @@ -362,6 +377,10 @@ describe('ConnectionFSM', () => {
})
})

afterEach(() => {
sinon.restore()
})

it('should be able to protect a basic connection', (done) => {
const connection = new ConnectionFSM({
_switch: dialerSwitch,
Expand All @@ -381,6 +400,35 @@ describe('ConnectionFSM', () => {
connection.dial()
})

it('should close on failed protection', (done) => {
const connection = new ConnectionFSM({
_switch: dialerSwitch,
peerInfo: listenerSwitch._peerInfo
})

const error = new Error('invalid key')
const stub = sinon.stub(dialerSwitch.protector, 'protect').callsFake((_, cb) => {
cb(error)
})

expect(3).check(done)

connection.once('close', () => {
expect(stub.callCount).to.eql(1).mark()
})

connection.once('error', (err) => {
expect(err).to.eql(error).mark()
})

connection.once('connected', (conn) => {
expect(conn).to.be.an.instanceof(Connection).mark()
connection.protect()
})

connection.dial()
})

it('should be able to encrypt a protected connection', (done) => {
const connection = new ConnectionFSM({
_switch: dialerSwitch,
Expand Down
33 changes: 15 additions & 18 deletions test/identify.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
'use strict'

const chai = require('chai')
const dirtyChai = require('dirty-chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-checkmark'))
const expect = chai.expect
chai.use(dirtyChai)
const parallel = require('async/parallel')
const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-mplex')
Expand All @@ -13,6 +13,7 @@ const secio = require('libp2p-secio')
const PeerBook = require('peer-book')
const identify = require('libp2p-identify')
const lp = require('pull-length-prefixed')
const sinon = require('sinon')

const utils = require('./utils')
const createInfos = utils.createInfos
Expand Down Expand Up @@ -61,8 +62,7 @@ describe('Identify', () => {
], done)
}))

after(function (done) {
this.timeout(3 * 1000)
after((done) => {
parallel([
(cb) => switchA.stop(cb),
(cb) => switchB.stop(cb),
Expand All @@ -71,6 +71,7 @@ describe('Identify', () => {
})

afterEach(function (done) {
sinon.restore()
// Hangup everything
parallel([
(cb) => switchA.hangUp(switchB._peerInfo, cb),
Expand Down Expand Up @@ -99,8 +100,8 @@ describe('Identify', () => {
})
})

it('should require crypto and identify to have the same peerId', (done) => {
identify.listener = (conn) => {
it('should close connection when identify fails', (done) => {
const stub = sinon.stub(identify, 'listener').callsFake((conn) => {
conn.getObservedAddrs((err, observedAddrs) => {
if (err) { return }
observedAddrs = observedAddrs[0]
Expand All @@ -122,20 +123,16 @@ describe('Identify', () => {
conn
)
})
}
})

expect(2).check(done)

switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
expect(err).to.not.exist()
pull(
pull.values([Buffer.from('data that cant be had')]),
conn,
pull.collect((err, values) => {
expect(err).to.exist()
expect(values).to.have.length(0)
done()
})
)
const connFSM = switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err) => {
expect(err).to.not.exist().mark()
})
connFSM.once('close', () => {
expect(stub.called).to.eql(true).mark()
})
})
})

0 comments on commit e757cf6

Please sign in to comment.