From 373ac8c4736e8bdd67ed40873a46f1a0acbe3572 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 27 Feb 2018 18:53:29 -0600 Subject: [PATCH 1/4] feat: handle stream muxer errors improve circuit relay error msgs --- src/dial.js | 23 +++++++++++++++++------ test/circuit-relay.node.js | 2 +- test/node.js | 1 + test/stats.node.js | 2 +- test/stream-muxers.node.js | 22 ++++++++++++++++++++-- 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/dial.js b/src/dial.js index 07c050a..9a153f6 100644 --- a/src/dial.js +++ b/src/dial.js @@ -77,7 +77,10 @@ function dial (swtch) { // 3. update the peerInfo that is already stored in the conn } - openConnInMuxedConn(muxer, (conn) => { + openConnInMuxedConn(muxer, (err, conn) => { + if (err) { + return callback(err) + } protocolHandshake(conn, protocol, callback) }) } @@ -89,18 +92,20 @@ function dial (swtch) { const tKeys = swtch.availableTransports(pi) + const circuitEnabled = !!swtch.transports[Circuit.tag] let circuitTried = false nextTransport(tKeys.shift()) function nextTransport (key) { let transport = key if (!transport) { - if (circuitTried) { - return cb(new Error(`Circuit already tried!`)) + if (!circuitEnabled) { + const msg = `Circuit not enabled and all transports failed to dial peer ${pi.id.toB58String()}!` + return cb(new Error(msg)) } - if (!swtch.transports[Circuit.tag]) { - return cb(new Error(`Circuit not enabled!`)) + if (circuitTried) { + return cb(new Error(`No available transports to dial peer ${pi.id.toB58String()}!`)) } log(`Falling back to dialing over circuit`) @@ -208,7 +213,13 @@ function dial (swtch) { } function openConnInMuxedConn (muxer, cb) { - cb(muxer.newStream()) + muxer.newStream((err, conn) => { + if (err) { + return cb(err) + } + + cb(null, conn) + }) } function protocolHandshake (conn, protocol, cb) { diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 9c87146..078742e 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -88,7 +88,7 @@ describe(`circuit`, function () { swarmA.dial(swarmC._peerInfo, (err, conn) => { expect(err).to.exist() - expect(err).to.match(/Circuit already tried!/) + expect(err).to.match(/No available transports to dial peer/) expect(conn).to.not.exist() expect(dialSpyA.callCount).to.be.eql(1) done() diff --git a/test/node.js b/test/node.js index 82f05ed..d4fabd5 100644 --- a/test/node.js +++ b/test/node.js @@ -7,3 +7,4 @@ require('./swarm-no-muxing.node') require('./swarm-muxing.node') require('./circuit-relay.node') require('./limit-dialer.node') +require('./stats.node') diff --git a/test/stats.node.js b/test/stats.node.js index 6a82dc0..8ba74b8 100644 --- a/test/stats.node.js +++ b/test/stats.node.js @@ -10,7 +10,7 @@ const each = require('async/each') const map = require('async/map') const series = require('async/series') const TCP = require('libp2p-tcp') -const multiplex = require('libp2p-multiplex') +const multiplex = require('libp2p-mplex') const pull = require('pull-stream') const secio = require('libp2p-secio') const PeerBook = require('peer-book') diff --git a/test/stream-muxers.node.js b/test/stream-muxers.node.js index b410e60..4ade2ed 100644 --- a/test/stream-muxers.node.js +++ b/test/stream-muxers.node.js @@ -19,10 +19,10 @@ const tryEcho = utils.tryEcho const Switch = require('../src') -describe('Stream Multiplexing', () => { +describe.only('Stream Multiplexing', () => { [ multiplex, - spdy + spdy // TODO: do we still support this? ].forEach((sm) => describe(sm.multicodec, () => { let switchA let switchB @@ -30,6 +30,7 @@ describe('Stream Multiplexing', () => { before((done) => createInfos(3, (err, peerInfos) => { expect(err).to.not.exist() + function maGen (port) { return `/ip4/127.0.0.1/tcp/${port}` } const peerA = peerInfos[0] @@ -149,5 +150,22 @@ describe('Stream Multiplexing', () => { }, 500) }) }) + + it('should fail graciously on dead stream mux', function (done) { + if (sm.multicodec === '/spdy/3.1.0') { + this.skip() + } + + const id = switchA._peerInfo.id.toB58String() + const muxer = switchB.muxedConns[id].muxer + muxer.multiplex.destroyed = true // simulate a destroyed stream + + switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { + expect(err).to.exist() + expect(err).to.match(/Error: Multiplexer is destroyed/) + muxer.multiplex.destroyed = false // re-enable stream to avoid hangs + done() + }) + }) })) }) From 935ee6dcd6edb4fd7037677363b79708d43e87b2 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 27 Feb 2018 19:10:19 -0600 Subject: [PATCH 2/4] fix: remove runaway `.only` --- test/stream-muxers.node.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/stream-muxers.node.js b/test/stream-muxers.node.js index 4ade2ed..d687d8f 100644 --- a/test/stream-muxers.node.js +++ b/test/stream-muxers.node.js @@ -19,7 +19,7 @@ const tryEcho = utils.tryEcho const Switch = require('../src') -describe.only('Stream Multiplexing', () => { +describe('Stream Multiplexing', () => { [ multiplex, spdy // TODO: do we still support this? From c771dbf9bc7206a789802bfeed6e24eae6eea612 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 27 Feb 2018 19:11:10 -0600 Subject: [PATCH 3/4] lint --- src/dial.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dial.js b/src/dial.js index 9a153f6..83b6593 100644 --- a/src/dial.js +++ b/src/dial.js @@ -92,7 +92,7 @@ function dial (swtch) { const tKeys = swtch.availableTransports(pi) - const circuitEnabled = !!swtch.transports[Circuit.tag] + const circuitEnabled = Boolean(swtch.transports[Circuit.tag]) let circuitTried = false nextTransport(tKeys.shift()) From 869d0c52003812a4457da5fa5aedea1bd4b96b38 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 7 Mar 2018 23:41:06 -0600 Subject: [PATCH 4/4] fix: reenable spdy and rework test --- src/dial.js | 8 +------- test/stream-muxers.node.js | 16 +++++++--------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/dial.js b/src/dial.js index 83b6593..b25e745 100644 --- a/src/dial.js +++ b/src/dial.js @@ -213,13 +213,7 @@ function dial (swtch) { } function openConnInMuxedConn (muxer, cb) { - muxer.newStream((err, conn) => { - if (err) { - return cb(err) - } - - cb(null, conn) - }) + return muxer.newStream(cb) } function protocolHandshake (conn, protocol, cb) { diff --git a/test/stream-muxers.node.js b/test/stream-muxers.node.js index d687d8f..b28327f 100644 --- a/test/stream-muxers.node.js +++ b/test/stream-muxers.node.js @@ -22,7 +22,7 @@ const Switch = require('../src') describe('Stream Multiplexing', () => { [ multiplex, - spdy // TODO: do we still support this? + spdy ].forEach((sm) => describe(sm.multicodec, () => { let switchA let switchB @@ -151,19 +151,17 @@ describe('Stream Multiplexing', () => { }) }) - it('should fail graciously on dead stream mux', function (done) { - if (sm.multicodec === '/spdy/3.1.0') { - this.skip() - } - + it('error propagates correctly from multiplexer', function (done) { const id = switchA._peerInfo.id.toB58String() const muxer = switchB.muxedConns[id].muxer - muxer.multiplex.destroyed = true // simulate a destroyed stream + muxer.newStream = (cb) => { + cb(new Error()) + } switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.exist() - expect(err).to.match(/Error: Multiplexer is destroyed/) - muxer.multiplex.destroyed = false // re-enable stream to avoid hangs + expect(err).to.be.an.instanceof(Error) + expect(conn).to.not.exist() done() }) })