From 828e685e79a918831a6dac0a868d0b209dd5874c Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 3 Apr 2019 11:35:04 +0200 Subject: [PATCH] fix: improve connection tracking (#318) * fix: centralize connection events and peer connects * fix: remove unneeded peerBook put --- src/connection/incoming.js | 3 --- src/connection/index.js | 13 +++-------- src/connection/manager.js | 44 +++++++++++++++++++++++++------------- src/dialer/queue.js | 11 ++++------ src/transport.js | 1 - test/dial-fsm.node.js | 8 +++---- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/src/connection/incoming.js b/src/connection/incoming.js index 81a6af8..10ddfce 100644 --- a/src/connection/incoming.js +++ b/src/connection/incoming.js @@ -68,9 +68,6 @@ class IncomingConnectionFSM extends BaseConnection { this.emit('muxed', this.conn) }) this._state.on('DISCONNECTING', () => { - if (this.theirPeerInfo) { - this.theirPeerInfo.disconnect() - } this._state('done') }) } diff --git a/src/connection/index.js b/src/connection/index.js index 592f244..d437433 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -268,11 +268,6 @@ class ConnectionFSM extends BaseConnection { _onDisconnecting () { this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer)) - // Issue disconnects on both Peers - if (this.theirPeerInfo) { - this.theirPeerInfo.disconnect() - } - this.switch.connection.remove(this) delete this.switch.conns[this.theirB58Id] @@ -284,7 +279,6 @@ class ConnectionFSM extends BaseConnection { tasks.push((cb) => { this.muxer.end(() => { delete this.muxer - this.switch.emit('peer-mux-closed', this.theirPeerInfo) cb() }) }) @@ -325,13 +319,13 @@ class ConnectionFSM extends BaseConnection { return this.close(maybeUnexpectedEnd(err)) } - const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer) - - this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => { + const observedConn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer) + const encryptedConn = this.switch.crypto.encrypt(this.ourPeerInfo.id, observedConn, this.theirPeerInfo.id, (err) => { if (err) { return this.close(err) } + this.conn = encryptedConn this.conn.setPeerInfo(this.theirPeerInfo) this._state('done') }) @@ -392,7 +386,6 @@ class ConnectionFSM extends BaseConnection { this.switch.protocolMuxer(null)(conn) }) - this.switch.emit('peer-mux-established', this.theirPeerInfo) this._didUpgrade(null) // Run identify on the connection diff --git a/src/connection/manager.js b/src/connection/manager.js index 5874d76..1f17873 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -33,6 +33,7 @@ class ConnectionManager { // Only add it if it's not there if (!this.get(connection)) { this.connections[connection.theirB58Id].push(connection) + this.switch.emit('peer-mux-established', connection.theirPeerInfo) } } @@ -78,14 +79,26 @@ class ConnectionManager { * @returns {void} */ remove (connection) { - if (!this.connections[connection.theirB58Id]) return + // No record of the peer, disconnect it + if (!this.connections[connection.theirB58Id]) { + connection.theirPeerInfo.disconnect() + this.switch.emit('peer-mux-closed', connection.theirPeerInfo) + return + } for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { if (this.connections[connection.theirB58Id][i] === connection) { this.connections[connection.theirB58Id].splice(i, 1) - return + break } } + + // The peer is fully disconnected + if (this.connections[connection.theirB58Id].length === 0) { + delete this.connections[connection.theirB58Id] + connection.theirPeerInfo.disconnect() + this.switch.emit('peer-mux-closed', connection.theirPeerInfo) + } } /** @@ -175,6 +188,7 @@ class ConnectionManager { return log('identify not successful') } const b58Str = peerInfo.id.toB58String() + peerInfo = this.switch._peerBook.put(peerInfo) const connection = new ConnectionFSM({ _switch: this.switch, @@ -185,24 +199,24 @@ class ConnectionManager { }) this.switch.connection.add(connection) - if (peerInfo.multiaddrs.size > 0) { - // with incomming conn and through identify, going to pick one - // of the available multiaddrs from the other peer as the one - // I'm connected to as we really can't be sure at the moment - // TODO add this consideration to the connection abstraction! - peerInfo.connect(peerInfo.multiaddrs.toArray()[0]) - } else { - // for the case of websockets in the browser, where peers have - // no addr, use just their IPFS id - peerInfo.connect(`/ipfs/${b58Str}`) + // Only update if it's not already connected + if (!peerInfo.isConnected()) { + if (peerInfo.multiaddrs.size > 0) { + // with incomming conn and through identify, going to pick one + // of the available multiaddrs from the other peer as the one + // I'm connected to as we really can't be sure at the moment + // TODO add this consideration to the connection abstraction! + peerInfo.connect(peerInfo.multiaddrs.toArray()[0]) + } else { + // for the case of websockets in the browser, where peers have + // no addr, use just their IPFS id + peerInfo.connect(`/ipfs/${b58Str}`) + } } - peerInfo = this.switch._peerBook.put(peerInfo) muxedConn.once('close', () => { connection.close() }) - - this.switch.emit('peer-mux-established', peerInfo) }) }) } diff --git a/src/dialer/queue.js b/src/dialer/queue.js index c5f97f9..a2d79c3 100644 --- a/src/dialer/queue.js +++ b/src/dialer/queue.js @@ -2,7 +2,6 @@ const ConnectionFSM = require('../connection') const { DIAL_ABORTED, ERR_BLACKLISTED } = require('../errors') -const Connection = require('interface-connection').Connection const nextTick = require('async/nextTick') const once = require('once') const debug = require('debug') @@ -45,10 +44,8 @@ function createConnectionWithProtocol ({ protocol, connection, callback }) { return callback(err) } - const proxyConnection = new Connection() - proxyConnection.setPeerInfo(connection.theirPeerInfo) - proxyConnection.setInnerConn(conn) - callback(null, proxyConnection) + conn.setPeerInfo(connection.theirPeerInfo) + callback(null, conn) }) } @@ -192,6 +189,8 @@ class Queue { conn: null }) + this.switch.connection.add(connectionFSM) + // Add control events and start the dialer connectionFSM.once('connected', () => connectionFSM.protect()) connectionFSM.once('private', () => connectionFSM.encrypt()) @@ -252,7 +251,6 @@ class Queue { // If we're not muxed yet, add listeners connectionFSM.once('muxed', () => { this.blackListCount = 0 // reset blacklisting on good connections - this.switch.connection.add(connectionFSM) queuedDial.connection = connectionFSM createConnectionWithProtocol(queuedDial) next() @@ -260,7 +258,6 @@ class Queue { connectionFSM.once('unmuxed', () => { this.blackListCount = 0 - this.switch.connection.add(connectionFSM) queuedDial.connection = connectionFSM createConnectionWithProtocol(queuedDial) next() diff --git a/src/transport.js b/src/transport.js index 14bb282..a22b7b5 100644 --- a/src/transport.js +++ b/src/transport.js @@ -106,7 +106,6 @@ class TransportManager { } peerInfo.connect(success.multiaddr) - this.switch._peerBook.put(peerInfo) callback(null, success.conn) }) } diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 2bfa364..fae69c5 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -240,8 +240,8 @@ describe('dialFSM', () => { // Expect 4 `peer-mux-established` events expect(4).checks(() => { - // Expect 4 `peer-mux-closed`, plus 1 hangup - expect(5).checks(() => { + // Expect 2 `peer-mux-closed`, plus 1 hangup + expect(3).checks(() => { switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') switchA.removeAllListeners('peer-mux-established') @@ -286,8 +286,8 @@ describe('dialFSM', () => { switchA.handle(protocol, (_, conn) => { pull(conn, conn) }) switchB.handle(protocol, (_, conn) => { pull(conn, conn) }) - // 4 close checks and 1 hangup check - expect(5).checks(() => { + // 2 close checks and 1 hangup check + expect(2).checks(() => { switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') // restart the node for subsequent tests