Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix: add connection type param to hangUp()
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 8, 2019
1 parent b3a6a74 commit 56f7eac
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 44 deletions.
31 changes: 24 additions & 7 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@ const BaseConnection = require('./base')
const observeConnection = require('../observe-connection')
const Errors = require('../errors')

/**
* ConnectionType indicates whether the connection is incoming or outgoing
* @readonly
* @enum {string}
*/
const ConnectionType = {
Incoming: 'Incoming',
Outgoing: 'Outgoing'
}

/**
* @typedef {Object} ConnectionOptions
* @property {Switch} _switch Our switch instance
* @property {PeerInfo} peerInfo The PeerInfo of the peer to dial
* @property {Muxer} muxer Optional - A muxed connection
* @property {Connection} conn Optional - The base connection
* @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out.
* @property {ConnectionType} type Optional - identify the connection as Incoming or Outgoing. Defaults to Outgoing.
*/

/**
Expand All @@ -30,7 +40,7 @@ class ConnectionFSM extends BaseConnection {
* @param {ConnectionOptions} param0
* @constructor
*/
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
constructor ({ _switch, peerInfo, muxer, conn, type = ConnectionType.Outgoing }) {
super({
_switch,
name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
Expand All @@ -42,6 +52,8 @@ class ConnectionFSM extends BaseConnection {
this.conn = conn // The base connection
this.muxer = muxer // The upgraded/muxed connection

this._type = type

let startState = 'DISCONNECTED'
if (this.muxer) {
startState = 'MUXED'
Expand Down Expand Up @@ -93,8 +105,7 @@ class ConnectionFSM extends BaseConnection {
disconnect: 'DISCONNECTING'
},
DISCONNECTING: { // Shutting down the connection
done: 'DISCONNECTED',
disconnect: 'DISCONNECTING'
done: 'DISCONNECTED'
},
ABORTED: { }, // A severe event occurred
ERRORED: { // An error occurred, but future dials may be allowed
Expand Down Expand Up @@ -276,12 +287,12 @@ class ConnectionFSM extends BaseConnection {
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
this.switch.emit('peer-mux-closed', this.theirPeerInfo, { type: this._type })
delete this.conn
})
} else {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
this.switch.emit('peer-mux-closed', this.theirPeerInfo, { type: this._type })
}
}

Expand Down Expand Up @@ -375,7 +386,7 @@ class ConnectionFSM extends BaseConnection {
this.switch.protocolMuxer(null)(conn)
})

this.switch.emit('peer-mux-established', this.theirPeerInfo)
this.switch.emit('peer-mux-established', this.theirPeerInfo, { type: this._type })

this._didUpgrade(null)
})
Expand Down Expand Up @@ -447,8 +458,14 @@ class ConnectionFSM extends BaseConnection {
this.emit('error', Errors.INVALID_STATE_TRANSITION(err))
this.log(err)
}

get type () {
return this._type
}
}

ConnectionFSM.ConnectionType = ConnectionType

module.exports = withIs(ConnectionFSM, {
className: 'ConnectionFSM',
symbolName: 'libp2p-switch/ConnectionFSM'
Expand Down
12 changes: 6 additions & 6 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ class ConnectionManager {
* Gets a connection associated with the given peer
* @private
* @param {string} peerId The peers id
* @param {ConnectionType} type Optional Incoming / Outgoing
* @returns {ConnectionFSM|null} The found connection or null
*/
getOne (peerId) {
getOne (peerId, type = null) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
return this.connections[peerId].find(conn => !type || conn.type === type)
}
return null
}
Expand Down Expand Up @@ -171,18 +172,17 @@ class ConnectionManager {
return log('identify not successful')
}
const b58Str = peerInfo.id.toB58String()

const connection = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: muxedConn,
conn: conn,
type: 'inc'
type: ConnectionFSM.ConnectionType.Incoming
})
this.switch.connection.add(connection)

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// with incoming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
Expand All @@ -198,7 +198,7 @@ class ConnectionManager {
connection.close()
})

this.switch.emit('peer-mux-established', peerInfo)
this.switch.emit('peer-mux-established', peerInfo, { type: ConnectionFSM.ConnectionType.Incoming })
})
})
}
Expand Down
3 changes: 1 addition & 2 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ function dial (_switch, returnFSM) {
const b58Id = peerInfo.id.toB58String()

log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)

let connection = _switch.connection.getOne(b58Id)
let connection = _switch.connection.getOne(b58Id, ConnectionFSM.ConnectionType.Outgoing)

if (!ConnectionFSM.isConnectionFSM(connection)) {
connection = new ConnectionFSM({
Expand Down
18 changes: 15 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,29 @@ class Switch extends EventEmitter {
}

/**
* If a muxed Connection exists for the given peer, it will be closed
* and its reference on the Switch will be removed.
* If a muxed Connection exists for the given peer (optionally of the given
* connection type: Incoming / Outgoing), it will be closed and its reference
* on the Switch will be removed.
*
* @param {PeerInfo|Multiaddr|PeerId} peer
* @param {ConnectionType} connectionType Optional
* @param {function()} callback
* @returns {void}
*/
hangUp (peer, callback) {
hangUp (peer, connectionType, callback) {
if (typeof connectionType === 'function') {
callback = connectionType
connectionType = null
}

const peerInfo = getPeerInfo(peer, this._peerBook)
const key = peerInfo.id.toB58String()
const conns = [...this.connection.getAllById(key)]
each(conns, (conn, cb) => {
if (connectionType && conn.type !== connectionType) {
return setImmediate(cb)
}

conn.once('close', cb)
conn.close()
}, callback)
Expand Down Expand Up @@ -262,3 +273,4 @@ class Switch extends EventEmitter {

module.exports = Switch
module.exports.errors = Errors
module.exports.ConnectionType = require('./connection').ConnectionType
8 changes: 4 additions & 4 deletions src/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ module.exports = (swtch) => {
outgoing: observe('out')
})

swtch.on('peer-mux-established', (peerInfo) => {
observer.emit('peer:connected', peerInfo.id.toB58String())
swtch.on('peer-mux-established', (peerInfo, meta) => {
observer.emit('peer:connected', peerInfo.id.toB58String(), meta)
})

swtch.on('peer-mux-closed', (peerInfo) => {
observer.emit('peer:closed', peerInfo.id.toB58String())
swtch.on('peer-mux-closed', (peerInfo, meta) => {
observer.emit('peer:closed', peerInfo.id.toB58String(), meta)
})

return observer
Expand Down
94 changes: 94 additions & 0 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,100 @@ describe('dialFSM', () => {
})
})

it('parallel dials to one another can disconnect only outgoing on hangup', function (done) {
this.timeout(10e3)

switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })

// 2 close checks (outgoing) and 1 hangup check
expect(3).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')

expect(switchA.connection.getAll()).to.have.length(1)
expect(switchB.connection.getAll()).to.have.length(1)

// Hangup remaining connections
switchA.hangUp(switchB._peerInfo, done)
})

switchA.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Outgoing)
})
switchB.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Incoming)
})

const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => {
// Both should have 1 incoming / 1 outgoing connection
expect(switchA.connection.getAll()).to.have.length(2)
expect(switchB.connection.getAll()).to.have.length(2)

// Hangup Outbound only and verify the connections are closed
switchA.hangUp(switchB._peerInfo, Switch.ConnectionType.Outgoing, (err) => {
expect(err).to.not.exist().mark()
})
})

// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
conn._state.on('DIALING:enter', (cb) => {
switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => {
cb()
})
})
})

it('parallel dials to one another can disconnect only incoming on hangup', function (done) {
this.timeout(10e3)

switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })

// 2 close checks (incoming) and 1 hangup check
expect(3).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')

expect(switchA.connection.getAll()).to.have.length(1)
expect(switchB.connection.getAll()).to.have.length(1)

// Hangup remaining connections
switchA.hangUp(switchB._peerInfo, done)
})

switchA.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Incoming)
})
switchB.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Outgoing)
})

const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => {
// Both should have 1 incoming / 1 outgoing connection
expect(switchA.connection.getAll()).to.have.length(2)
expect(switchB.connection.getAll()).to.have.length(2)

// Hangup Inbound only and verify the connections are closed
switchA.hangUp(switchB._peerInfo, Switch.ConnectionType.Incoming, (err) => {
expect(err).to.not.exist().mark()
})
})

// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
conn._state.on('DIALING:enter', (cb) => {
switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => {
cb()
})
})
})

it('parallel dials to one another should disconnect on stop', (done) => {
switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
Expand Down
Loading

0 comments on commit 56f7eac

Please sign in to comment.