From 0cc778ef0e80cbf406094521b9db2776e9e1c896 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 13 Dec 2019 15:19:59 +0100 Subject: [PATCH 1/3] fix: upgrader should not need muxers --- src/upgrader.js | 50 +++++++++++++++++++++++++++------ test/upgrading/upgrader.spec.js | 20 +++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/upgrader.js b/src/upgrader.js index 146735b90f..81a604867f 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -64,7 +64,7 @@ class Upgrader { async upgradeInbound (maConn) { let encryptedConn let remotePeer - let muxedConnection + let upgradedConn let Muxer let cryptoProtocol let setPeer @@ -94,7 +94,11 @@ class Upgrader { } = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos)) // Multiplex the connection - ;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) + if (this.muxers.size) { + ({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) + } else { + upgradedConn = encryptedConn + } } catch (err) { log.error('Failed to upgrade inbound connection', err) await maConn.close(err) @@ -112,7 +116,7 @@ class Upgrader { cryptoProtocol, direction: 'inbound', maConn, - muxedConnection, + upgradedConn, Muxer, remotePeer }) @@ -134,7 +138,7 @@ class Upgrader { let encryptedConn let remotePeer - let muxedConnection + let upgradedConn let cryptoProtocol let Muxer let setPeer @@ -164,7 +168,11 @@ class Upgrader { } = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos)) // Multiplex the connection - ;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) + if (this.muxers.size) { + ({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) + } else { + upgradedConn = encryptedConn + } } catch (err) { log.error('Failed to upgrade outbound connection', err) await maConn.close(err) @@ -182,7 +190,7 @@ class Upgrader { cryptoProtocol, direction: 'outbound', maConn, - muxedConnection, + upgradedConn, Muxer, remotePeer }) @@ -195,7 +203,7 @@ class Upgrader { * @param {string} cryptoProtocol The crypto protocol that was negotiated * @param {string} direction One of ['inbound', 'outbound'] * @param {MultiaddrConnection} maConn The transport layer connection - * @param {*} muxedConnection A duplex connection returned from multiplexer selection + * @param {*} upgradedConn A duplex connection returned from multiplexer selection * @param {Muxer} Muxer The muxer to be used for muxing * @param {PeerId} remotePeer The peer the connection is with * @returns {Connection} @@ -204,10 +212,34 @@ class Upgrader { cryptoProtocol, direction, maConn, - muxedConnection, + upgradedConn, Muxer, remotePeer }) { + if (!Muxer) { + // Create the connection + maConn.timeline.upgraded = Date.now() + + const connection = new Connection({ + localAddr: maConn.localAddr, + remoteAddr: maConn.remoteAddr, + localPeer: this.localPeer, + remotePeer: remotePeer, + stat: { + direction, + timeline: maConn.timeline, + encryption: cryptoProtocol + }, + newStream: () => { throw new Error('connection is not multiplexed') }, + getStreams: () => { throw new Error('connection is not multiplexed') }, + close: err => maConn.close(err) + }) + + this.onConnection(connection) + + return connection + } + // Create the muxer const muxer = new Muxer({ // Run anytime a remote stream is created @@ -244,7 +276,7 @@ class Upgrader { } // Pipe all data through the muxer - pipe(muxedConnection, muxer, muxedConnection) + pipe(upgradedConn, muxer, upgradedConn) maConn.timeline.upgraded = Date.now() const _timeline = maConn.timeline diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 35d870069b..0851c2caef 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -116,6 +116,26 @@ describe('Upgrader', () => { expect(result).to.eql([hello]) }) + it('should upgrade with only crypto', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + // No available muxers + const muxers = new Map() + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[Crypto.protocol, Crypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections).to.have.length(2) + }) + it('should use a private connection protector when provided', async () => { const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) From 7eb4309cda3c9e2d909baac8f0ac0401292b85cd Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 13 Dec 2019 16:35:56 +0100 Subject: [PATCH 2/3] chore: address review --- src/upgrader.js | 31 ++++++++++++++++++------------- test/upgrading/upgrader.spec.js | 2 ++ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/upgrader.js b/src/upgrader.js index 81a604867f..08c2cc4761 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -216,6 +216,18 @@ class Upgrader { Muxer, remotePeer }) { + const _timeline = maConn.timeline + maConn.timeline = new Proxy(_timeline, { + set: (...args) => { + if (args[1] === 'close' && args[2] && !_timeline.close) { + connection.stat.status = 'closed' + this.onConnectionEnd(connection) + } + + return Reflect.set(...args) + } + }) + if (!Muxer) { // Create the connection maConn.timeline.upgraded = Date.now() @@ -230,8 +242,12 @@ class Upgrader { timeline: maConn.timeline, encryption: cryptoProtocol }, - newStream: () => { throw new Error('connection is not multiplexed') }, - getStreams: () => { throw new Error('connection is not multiplexed') }, + newStream: () => { + throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') + }, + getStreams: () => { + throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') + }, close: err => maConn.close(err) }) @@ -279,17 +295,6 @@ class Upgrader { pipe(upgradedConn, muxer, upgradedConn) maConn.timeline.upgraded = Date.now() - const _timeline = maConn.timeline - maConn.timeline = new Proxy(_timeline, { - set: (...args) => { - if (args[1] === 'close' && args[2] && !_timeline.close) { - connection.stat.status = 'closed' - this.onConnectionEnd(connection) - } - - return Reflect.set(...args) - } - }) // Create the connection const connection = new Connection({ diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 0851c2caef..6ace08b760 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -134,6 +134,8 @@ describe('Upgrader', () => { ]) expect(connections).to.have.length(2) + + await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected() }) it('should use a private connection protector when provided', async () => { From def7e544624a9bbd16869eda64ceca4d50e7dc22 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Sun, 15 Dec 2019 20:54:42 +0000 Subject: [PATCH 3/3] chore: apply suggestions from code review Co-Authored-By: Jacob Heun --- src/upgrader.js | 119 +++++++++++++------------------- test/upgrading/upgrader.spec.js | 7 ++ 2 files changed, 56 insertions(+), 70 deletions(-) diff --git a/src/upgrader.js b/src/upgrader.js index 08c2cc4761..6e178a4c70 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -203,7 +203,7 @@ class Upgrader { * @param {string} cryptoProtocol The crypto protocol that was negotiated * @param {string} direction One of ['inbound', 'outbound'] * @param {MultiaddrConnection} maConn The transport layer connection - * @param {*} upgradedConn A duplex connection returned from multiplexer selection + * @param {*} upgradedConn A duplex connection returned from multiplexer and/or crypto selection * @param {Muxer} Muxer The muxer to be used for muxing * @param {PeerId} remotePeer The peer the connection is with * @returns {Connection} @@ -216,86 +216,65 @@ class Upgrader { Muxer, remotePeer }) { - const _timeline = maConn.timeline - maConn.timeline = new Proxy(_timeline, { - set: (...args) => { - if (args[1] === 'close' && args[2] && !_timeline.close) { - connection.stat.status = 'closed' - this.onConnectionEnd(connection) - } - - return Reflect.set(...args) - } - }) - - if (!Muxer) { - // Create the connection - maConn.timeline.upgraded = Date.now() - - const connection = new Connection({ - localAddr: maConn.localAddr, - remoteAddr: maConn.remoteAddr, - localPeer: this.localPeer, - remotePeer: remotePeer, - stat: { - direction, - timeline: maConn.timeline, - encryption: cryptoProtocol - }, - newStream: () => { - throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') + let muxer, newStream + + if (Muxer) { + // Create the muxer + muxer = new Muxer({ + // Run anytime a remote stream is created + onStream: async muxedStream => { + const mss = new Multistream.Listener(muxedStream) + try { + const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) + log('%s: incoming stream opened on %s', direction, protocol) + if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) + connection.addStream(stream, protocol) + this._onStream({ connection, stream, protocol }) + } catch (err) { + log.error(err) + } }, - getStreams: () => { - throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') - }, - close: err => maConn.close(err) + // Run anytime a stream closes + onStreamEnd: muxedStream => { + connection.removeStream(muxedStream.id) + } }) - this.onConnection(connection) - - return connection - } - - // Create the muxer - const muxer = new Muxer({ - // Run anytime a remote stream is created - onStream: async muxedStream => { - const mss = new Multistream.Listener(muxedStream) + newStream = async protocols => { + log('%s: starting new stream on %s', direction, protocols) + const muxedStream = muxer.newStream() + const mss = new Multistream.Dialer(muxedStream) try { - const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) - log('%s: incoming stream opened on %s', direction, protocol) + const { stream, protocol } = await mss.select(protocols) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) - connection.addStream(stream, protocol) - this._onStream({ connection, stream, protocol }) + return { stream: { ...muxedStream, ...stream }, protocol } } catch (err) { - log.error(err) + log.error('could not create new stream', err) + throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) } - }, - // Run anytime a stream closes - onStreamEnd: muxedStream => { - connection.removeStream(muxedStream.id) } - }) - const newStream = async protocols => { - log('%s: starting new stream on %s', direction, protocols) - const muxedStream = muxer.newStream() - const mss = new Multistream.Dialer(muxedStream) - try { - const { stream, protocol } = await mss.select(protocols) - if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) - return { stream: { ...muxedStream, ...stream }, protocol } - } catch (err) { - log.error('could not create new stream', err) - throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) - } + // Pipe all data through the muxer + pipe(upgradedConn, muxer, upgradedConn) } - // Pipe all data through the muxer - pipe(upgradedConn, muxer, upgradedConn) + const _timeline = maConn.timeline + maConn.timeline = new Proxy(_timeline, { + set: (...args) => { + if (args[1] === 'close' && args[2] && !_timeline.close) { + connection.stat.status = 'closed' + this.onConnectionEnd(connection) + } + return Reflect.set(...args) + } + }) maConn.timeline.upgraded = Date.now() + const errConnectionNotMultiplexed = () => { + throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') + } + // Create the connection const connection = new Connection({ localAddr: maConn.localAddr, @@ -305,11 +284,11 @@ class Upgrader { stat: { direction, timeline: maConn.timeline, - multiplexer: Muxer.multicodec, + multiplexer: Muxer && Muxer.multicodec, encryption: cryptoProtocol }, - newStream, - getStreams: () => muxer.streams, + newStream: newStream || errConnectionNotMultiplexed, + getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed, close: err => maConn.close(err) }) diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 6ace08b760..ef0cb84c9a 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -136,6 +136,13 @@ describe('Upgrader', () => { expect(connections).to.have.length(2) await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected() + + // Verify the MultiaddrConnection close method is called + sinon.spy(inbound, 'close') + sinon.spy(outbound, 'close') + await Promise.all(connections.map(conn => conn.close())) + expect(inbound.close.callCount).to.equal(1) + expect(outbound.close.callCount).to.equal(1) }) it('should use a private connection protector when provided', async () => {