Skip to content

Commit

Permalink
fix: peer disconnect event and improve logging performance (libp2p#309)
Browse files Browse the repository at this point in the history
* fix: only emit disconnects from muxed conns

* fix: update disconnect logic

* chore: clean up logging to prevent unneeded string formatting

* chore: fix spelling
  • Loading branch information
jacobheun authored Mar 12, 2019
1 parent 33d49e9 commit f731cdc
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 34 deletions.
8 changes: 4 additions & 4 deletions src/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class BaseConnection extends EventEmitter {
*/
close (err) {
if (this._state._state === 'DISCONNECTING') return
this.log(`closing connection to ${this.theirB58Id}`)
this.log('closing connection to %s', this.theirB58Id)
if (err && this._events.error) {
this.emit('error', err)
}
Expand Down Expand Up @@ -80,7 +80,7 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onDisconnected () {
this.log(`disconnected from ${this.theirB58Id}`)
this.log('disconnected from %s', this.theirB58Id)
this.emit('close')
this.removeAllListeners()
}
Expand All @@ -92,7 +92,7 @@ class BaseConnection extends EventEmitter {
* @returns {void}
*/
_onPrivatized () {
this.log(`successfully privatized incoming connection`)
this.log('successfully privatized incoming connection')
this.emit('private', this.conn)
}

Expand All @@ -113,7 +113,7 @@ class BaseConnection extends EventEmitter {
return this.close(err)
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
this.log('successfully privatized conn to %s', this.theirB58Id)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
Expand Down
6 changes: 3 additions & 3 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ class IncomingConnectionFSM extends BaseConnection {
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log(`successfully encrypted connection to ${this.theirB58Id || 'unknown peer'}`)
this.log('successfully encrypted connection to %s', this.theirB58Id || 'unknown peer')
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id || 'unknown peer'}`)
this.log('successfully muxed connection to %s', this.theirB58Id || 'unknown peer')
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
Expand All @@ -81,7 +81,7 @@ class IncomingConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onEncrypting () {
this.log(`encrypting connection via ${this.switch.crypto.tag}`)
this.log('encrypting connection via %s', this.switch.crypto.tag)

this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
Expand Down
37 changes: 18 additions & 19 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,17 @@ class ConnectionFSM extends BaseConnection {
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log(`successfully encrypted connection to ${this.theirB58Id}`)
this.log('successfully encrypted connection to %s', this.theirB58Id)
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id}`)
this.log('successfully muxed connection to %s', this.theirB58Id)
delete this.switch.conns[this.theirB58Id]
this.emit('muxed', this.muxer)
})
this._state.on('CONNECTED', () => {
this.log(`unmuxed connection opened to ${this.theirB58Id}`)
this.log('unmuxed connection opened to %s', this.theirB58Id)
this.emit('unmuxed', this.conn)
})
this._state.on('DISCONNECTING', () => this._onDisconnecting())
Expand Down Expand Up @@ -169,7 +169,7 @@ class ConnectionFSM extends BaseConnection {
return callback(err, null)
}

this.log(`created new stream to ${this.theirB58Id}`)
this.log('created new stream to %s', this.theirB58Id)
this._protocolHandshake(protocol, stream, callback)
})
}
Expand All @@ -194,7 +194,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDialing () {
this.log(`dialing ${this.theirB58Id}`)
this.log('dialing %s', this.theirB58Id)

if (!this.switch.hasTransports()) {
return this.close(NO_TRANSPORTS_REGISTERED())
Expand Down Expand Up @@ -226,7 +226,7 @@ class ConnectionFSM extends BaseConnection {
this.theirPeerInfo.multiaddrs.add(`/p2p-circuit/p2p/${this.theirB58Id}`)
}

this.log(`dialing transport ${transport}`)
this.log('dialing transport %s', transport)
this.switch.transport.dial(transport, this.theirPeerInfo, (errors, _conn) => {
if (errors) {
this.emit('error:connection_attempt_failed', errors)
Expand All @@ -250,7 +250,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDialed () {
this.log(`successfully dialed ${this.theirB58Id}`)
this.log('successfully dialed %s', this.theirB58Id)

this.emit('connected', this.conn)
}
Expand All @@ -261,33 +261,32 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onDisconnecting () {
this.log(`disconnecting from ${this.theirB58Id}`)
this.log('disconnecting from %s', this.theirB58Id)

// Issue disconnects on both Peers
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]

// Clean up stored connections
if (this.muxer) {
this.muxer.end()
delete this.muxer
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
}

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]
delete this.muxer

// If we have the base connection, abort it
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
delete this.conn
})
} else {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
}
}

Expand Down Expand Up @@ -336,7 +335,7 @@ class ConnectionFSM extends BaseConnection {
*/
_onUpgrading () {
const muxers = Object.keys(this.switch.muxers)
this.log(`upgrading connection to ${this.theirB58Id}`)
this.log('upgrading connection to %s', this.theirB58Id)

if (muxers.length === 0) {
return this._state('stop')
Expand Down Expand Up @@ -376,7 +375,7 @@ class ConnectionFSM extends BaseConnection {

// For incoming streams, in case identify is on
this.muxer.on('stream', (conn) => {
this.log(`new stream created via muxer to ${this.theirB58Id}`)
this.log('new stream created via muxer to %s', this.theirB58Id)
conn.setPeerInfo(this.theirPeerInfo)
this.switch.protocolMuxer(null)(conn)
})
Expand Down Expand Up @@ -431,12 +430,12 @@ class ConnectionFSM extends BaseConnection {

msDialer.select(protocol, (err, _conn) => {
if (err) {
this.log(`could not perform protocol handshake: `, err)
this.log('could not perform protocol handshake:', err)
return callback(err, null)
}

const conn = observeConnection(null, protocol, _conn, this.switch.observer)
this.log(`successfully performed handshake of ${protocol} to ${this.theirB58Id}`)
this.log('successfully performed handshake of %s to %s', protocol, this.theirB58Id)
this.emit('connection', conn)
callback(null, conn)
})
Expand Down
8 changes: 4 additions & 4 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const Connection = require('interface-connection').Connection
const ConnectionFSM = require('./connection')
const getPeerInfo = require('./get-peer-info')
const once = require('once')
const setImmediate = require('async/setImmediate')
const nextTick = require('async/nextTick')

const debug = require('debug')
const log = debug('libp2p:switch:dial')
Expand All @@ -22,7 +22,7 @@ function maybePerformHandshake ({ protocol, proxyConnection, connection, callbac
})
}

callback()
nextTick(callback)
}

/**
Expand Down Expand Up @@ -53,7 +53,7 @@ function dial (_switch, returnFSM) {
const peerInfo = getPeerInfo(peer, _switch._peerBook)
const b58Id = peerInfo.id.toB58String()

log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)
log('dialing to %s with protocol %s', b58Id, protocol || 'unknown')

let connection = _switch.connection.getOne(b58Id)

Expand Down Expand Up @@ -89,7 +89,7 @@ function dial (_switch, returnFSM) {
const proxyConnection = new Connection()
proxyConnection.setPeerInfo(peerInfo)

setImmediate(() => {
nextTick(() => {
// If we have a muxed connection, attempt the protocol handshake
if (connection.getState() === 'MUXED') {
maybePerformHandshake({
Expand Down
6 changes: 3 additions & 3 deletions src/limit-dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class DialQueue {
*/
_doWork (transport, addr, token, callback) {
callback = once(callback)
log(`${transport.constructor.name}:work:start`)
log('work:start')
this._dialWithTimeout(transport, addr, (err, conn) => {
if (err) {
log.error(`${transport.constructor.name}:work`, err)
return callback(err)
}

if (token.cancel) {
log(`${transport.constructor.name}:work:cancel`)
log('work:cancel')
// clean up already done dials
pull(empty(), conn)
// If we can close the connection, do it
Expand All @@ -62,7 +62,7 @@ class DialQueue {
// one is enough
token.cancel = true

log(`${transport.constructor.name}:work:success`)
log('work:success')

const proxyConn = new Connection()
proxyConn.setInnerConn(conn)
Expand Down
2 changes: 1 addition & 1 deletion src/protocol-muxer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module.exports = function protocolMuxer (protocols, observer) {
}

const handler = (protocolName, _conn) => {
log(`registering handler with protocol ${protocolName}`)
log('registering handler with protocol %s', protocolName)
const protocol = protocols[protocolName]
if (protocol) {
const handlerFunc = protocol && protocol.handlerFunc
Expand Down

0 comments on commit f731cdc

Please sign in to comment.