diff --git a/src/upgrader.js b/src/upgrader.js index 146735b90f..6e178a4c70 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -64,7 +64,7 @@ class Upgrader { async upgradeInbound (maConn) { let encryptedConn let remotePeer - let muxedConnection + let upgradedConn let Muxer let cryptoProtocol let setPeer @@ -94,7 +94,11 @@ class Upgrader { } = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos)) // Multiplex the connection - ;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) + if (this.muxers.size) { + ({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) + } else { + upgradedConn = encryptedConn + } } catch (err) { log.error('Failed to upgrade inbound connection', err) await maConn.close(err) @@ -112,7 +116,7 @@ class Upgrader { cryptoProtocol, direction: 'inbound', maConn, - muxedConnection, + upgradedConn, Muxer, remotePeer }) @@ -134,7 +138,7 @@ class Upgrader { let encryptedConn let remotePeer - let muxedConnection + let upgradedConn let cryptoProtocol let Muxer let setPeer @@ -164,7 +168,11 @@ class Upgrader { } = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos)) // Multiplex the connection - ;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) + if (this.muxers.size) { + ({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) + } else { + upgradedConn = encryptedConn + } } catch (err) { log.error('Failed to upgrade outbound connection', err) await maConn.close(err) @@ -182,7 +190,7 @@ class Upgrader { cryptoProtocol, direction: 'outbound', maConn, - muxedConnection, + upgradedConn, Muxer, remotePeer }) @@ -195,7 +203,7 @@ class Upgrader { * @param {string} cryptoProtocol The crypto protocol that was negotiated * @param {string} direction One of ['inbound', 'outbound'] * @param {MultiaddrConnection} maConn The transport layer connection - * @param {*} muxedConnection A duplex connection returned from multiplexer selection + * @param {*} upgradedConn A duplex connection returned from multiplexer and/or crypto selection * @param {Muxer} Muxer The muxer to be used for muxing * @param {PeerId} remotePeer The peer the connection is with * @returns {Connection} @@ -204,49 +212,52 @@ class Upgrader { cryptoProtocol, direction, maConn, - muxedConnection, + upgradedConn, Muxer, remotePeer }) { - // Create the muxer - const muxer = new Muxer({ - // Run anytime a remote stream is created - onStream: async muxedStream => { - const mss = new Multistream.Listener(muxedStream) + let muxer, newStream + + if (Muxer) { + // Create the muxer + muxer = new Muxer({ + // Run anytime a remote stream is created + onStream: async muxedStream => { + const mss = new Multistream.Listener(muxedStream) + try { + const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) + log('%s: incoming stream opened on %s', direction, protocol) + if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) + connection.addStream(stream, protocol) + this._onStream({ connection, stream, protocol }) + } catch (err) { + log.error(err) + } + }, + // Run anytime a stream closes + onStreamEnd: muxedStream => { + connection.removeStream(muxedStream.id) + } + }) + + newStream = async protocols => { + log('%s: starting new stream on %s', direction, protocols) + const muxedStream = muxer.newStream() + const mss = new Multistream.Dialer(muxedStream) try { - const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) - log('%s: incoming stream opened on %s', direction, protocol) + const { stream, protocol } = await mss.select(protocols) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) - connection.addStream(stream, protocol) - this._onStream({ connection, stream, protocol }) + return { stream: { ...muxedStream, ...stream }, protocol } } catch (err) { - log.error(err) + log.error('could not create new stream', err) + throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) } - }, - // Run anytime a stream closes - onStreamEnd: muxedStream => { - connection.removeStream(muxedStream.id) } - }) - const newStream = async protocols => { - log('%s: starting new stream on %s', direction, protocols) - const muxedStream = muxer.newStream() - const mss = new Multistream.Dialer(muxedStream) - try { - const { stream, protocol } = await mss.select(protocols) - if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) - return { stream: { ...muxedStream, ...stream }, protocol } - } catch (err) { - log.error('could not create new stream', err) - throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) - } + // Pipe all data through the muxer + pipe(upgradedConn, muxer, upgradedConn) } - // Pipe all data through the muxer - pipe(muxedConnection, muxer, muxedConnection) - - maConn.timeline.upgraded = Date.now() const _timeline = maConn.timeline maConn.timeline = new Proxy(_timeline, { set: (...args) => { @@ -258,6 +269,11 @@ class Upgrader { return Reflect.set(...args) } }) + maConn.timeline.upgraded = Date.now() + + const errConnectionNotMultiplexed = () => { + throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') + } // Create the connection const connection = new Connection({ @@ -268,11 +284,11 @@ class Upgrader { stat: { direction, timeline: maConn.timeline, - multiplexer: Muxer.multicodec, + multiplexer: Muxer && Muxer.multicodec, encryption: cryptoProtocol }, - newStream, - getStreams: () => muxer.streams, + newStream: newStream || errConnectionNotMultiplexed, + getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed, close: err => maConn.close(err) }) diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 35d870069b..ef0cb84c9a 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -116,6 +116,35 @@ describe('Upgrader', () => { expect(result).to.eql([hello]) }) + it('should upgrade with only crypto', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + // No available muxers + const muxers = new Map() + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[Crypto.protocol, Crypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections).to.have.length(2) + + await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected() + + // Verify the MultiaddrConnection close method is called + sinon.spy(inbound, 'close') + sinon.spy(outbound, 'close') + await Promise.all(connections.map(conn => conn.close())) + expect(inbound.close.callCount).to.equal(1) + expect(outbound.close.callCount).to.equal(1) + }) + it('should use a private connection protector when provided', async () => { const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })