-
Notifications
You must be signed in to change notification settings - Fork 37
[WIP] feat: handle stream muxer errors #245
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,7 +77,10 @@ function dial (swtch) { | |
// 3. update the peerInfo that is already stored in the conn | ||
} | ||
|
||
openConnInMuxedConn(muxer, (conn) => { | ||
openConnInMuxedConn(muxer, (err, conn) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
protocolHandshake(conn, protocol, callback) | ||
}) | ||
} | ||
|
@@ -89,18 +92,20 @@ function dial (swtch) { | |
|
||
const tKeys = swtch.availableTransports(pi) | ||
|
||
const circuitEnabled = Boolean(swtch.transports[Circuit.tag]) | ||
let circuitTried = false | ||
nextTransport(tKeys.shift()) | ||
|
||
function nextTransport (key) { | ||
let transport = key | ||
if (!transport) { | ||
if (circuitTried) { | ||
return cb(new Error(`Circuit already tried!`)) | ||
if (!circuitEnabled) { | ||
const msg = `Circuit not enabled and all transports failed to dial peer ${pi.id.toB58String()}!` | ||
return cb(new Error(msg)) | ||
} | ||
|
||
if (!swtch.transports[Circuit.tag]) { | ||
return cb(new Error(`Circuit not enabled!`)) | ||
if (circuitTried) { | ||
return cb(new Error(`No available transports to dial peer ${pi.id.toB58String()}!`)) | ||
} | ||
|
||
log(`Falling back to dialing over circuit`) | ||
|
@@ -208,7 +213,13 @@ function dial (swtch) { | |
} | ||
|
||
function openConnInMuxedConn (muxer, cb) { | ||
cb(muxer.newStream()) | ||
muxer.newStream((err, conn) => { | ||
if (err) { | ||
return cb(err) | ||
} | ||
|
||
cb(null, conn) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just newStream(cb)? |
||
} | ||
|
||
function protocolHandshake (conn, protocol, cb) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,14 +22,15 @@ const Switch = require('../src') | |
describe('Stream Multiplexing', () => { | ||
[ | ||
multiplex, | ||
spdy | ||
spdy // TODO: do we still support this? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course we still support. The whole design of libp2p is to support multiple building blocks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spdy doesn't return any errors when the stream abruptly terminates. We should make the behaviour consistent then. What should happen - error on abrupt termination or silent skip? |
||
].forEach((sm) => describe(sm.multicodec, () => { | ||
let switchA | ||
let switchB | ||
let switchC | ||
|
||
before((done) => createInfos(3, (err, peerInfos) => { | ||
expect(err).to.not.exist() | ||
|
||
function maGen (port) { return `/ip4/127.0.0.1/tcp/${port}` } | ||
|
||
const peerA = peerInfos[0] | ||
|
@@ -149,5 +150,22 @@ describe('Stream Multiplexing', () => { | |
}, 500) | ||
}) | ||
}) | ||
|
||
it('should fail graciously on dead stream mux', function (done) { | ||
if (sm.multicodec === '/spdy/3.1.0') { | ||
this.skip() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No skipping for SPDY There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Skipping because the behaviour is not consistent across muxers - spdy doesn't return any errors on abrupt termination. |
||
|
||
const id = switchA._peerInfo.id.toB58String() | ||
const muxer = switchB.muxedConns[id].muxer | ||
muxer.multiplex.destroyed = true // simulate a destroyed stream | ||
|
||
switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { | ||
expect(err).to.exist() | ||
expect(err).to.match(/Error: Multiplexer is destroyed/) | ||
muxer.multiplex.destroyed = false // re-enable stream to avoid hangs | ||
done() | ||
}) | ||
}) | ||
})) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This asking to change the Stream Muxer interface. https://github.com/libp2p/interface-stream-muxer needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't really change
newStream
interface, it already takes a callback as it is. I reverted this to return a connection as before. This PR just tries to propagate the error if it's returned by the multiplexer - before it would just be ignored.