From 26f47f67ded0dc40f33814f6a13bdb26580d52dd Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 2 Apr 2019 18:30:03 -0400 Subject: [PATCH 1/7] chore: callbacks -> async / await BREAKING CHANGE: All places in the API that used callbacks are now replaced with async/await --- package.json | 6 +-- src/dial-test.js | 74 ++++++++++----------------- src/listen-test.js | 124 +++++++++++++++++++-------------------------- 3 files changed, 82 insertions(+), 122 deletions(-) diff --git a/package.json b/package.json index cbc48b2..b600136 100644 --- a/package.json +++ b/package.json @@ -39,10 +39,10 @@ }, "dependencies": { "chai": "^4.2.0", + "it-goodbye": "^1.0.0", + "it-pipe": "^1.0.0", "multiaddr": "^5.0.2", - "pull-goodbye": "~0.0.2", - "pull-serializer": "~0.3.2", - "pull-stream": "^3.6.9" + "streaming-iterables": "^4.0.2" }, "contributors": [ "David Dias ", diff --git a/src/dial-test.js b/src/dial-test.js index 85c1a6c..66b0a35 100644 --- a/src/dial-test.js +++ b/src/dial-test.js @@ -5,9 +5,10 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') -const serializer = require('pull-serializer') + +const goodbye = require('it-goodbye') +const { collect } = require('streaming-iterables') +const pipe = require('it-pipe') module.exports = (common) => { describe('dial', () => { @@ -15,59 +16,38 @@ module.exports = (common) => { let transport let listener - before((done) => { - common.setup((err, _transport, _addrs) => { - if (err) return done(err) - transport = _transport - addrs = _addrs - done() - }) + before(async () => { + ({ transport, addrs } = await common.setup()) }) - after((done) => { - common.teardown(done) - }) + after(() => common.teardown && common.teardown()) - beforeEach((done) => { - listener = transport.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(addrs[0], done) + beforeEach(() => { + listener = transport.createListener((conn) => pipe(conn, conn)) + return listener.listen(addrs[0]) }) - afterEach((done) => { - listener.close(done) - }) + afterEach(() => listener.close()) + + it('simple', async () => { + const conn = await transport.dial(addrs[0]) + + const s = goodbye({ source: ['hey'], sink: collect }) - it('simple', (done) => { - const s = serializer(goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, values) => { - expect(err).to.not.exist() - expect( - values - ).to.be.eql( - ['hey'] - ) - done() - }) - })) + const result = await pipe(s, conn, s) - pull( - s, - transport.dial(addrs[0]), - s - ) + expect(result.length).to.equal(1) + expect(result[0].toString()).to.equal('hey') }) - it('to non existent listener', (done) => { - pull( - transport.dial(addrs[1]), - pull.onEnd((err) => { - expect(err).to.exist() - done() - }) - ) + it('to non existent listener', async () => { + try { + await transport.dial(addrs[1]) + } catch (_) { + // Success: expected an error to be throw + return + } + expect.fail('Did not throw error') }) }) } diff --git a/src/listen-test.js b/src/listen-test.js index 082361a..6874b01 100644 --- a/src/listen-test.js +++ b/src/listen-test.js @@ -7,118 +7,98 @@ const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const pull = require('pull-stream') +const pipe = require('it-pipe') module.exports = (common) => { describe('listen', () => { let addrs let transport - before((done) => { - common.setup((err, _transport, _addrs) => { - if (err) return done(err) - transport = _transport - addrs = _addrs - done() - }) + before(async () => { + ({ transport, addrs } = await common.setup()) }) - after((done) => { - common.teardown(done) - }) + after(() => common.teardown && common.teardown()) - it('simple', (done) => { + it('simple', async () => { const listener = transport.createListener((conn) => {}) - listener.listen(addrs[0], () => { - listener.close(done) - }) + await listener.listen(addrs[0]) + await listener.close() }) - it('close listener with connections, through timeout', (done) => { - const finish = plan(3, done) - const listener = transport.createListener((conn) => { - pull(conn, conn) + it('close listener with connections, through timeout', async () => { + let finish + let done = new Promise((resolve) => { + finish = resolve }) - listener.listen(addrs[0], () => { - const socket1 = transport.dial(addrs[0], () => { - listener.close(finish) - }) + const listener = transport.createListener((conn) => pipe(conn, conn)) - pull( - transport.dial(addrs[0]), - pull.onEnd(() => { - finish() - }) - ) - - pull( - pull.values([Buffer.from('Some data that is never handled')]), - socket1, - pull.onEnd(() => { - finish() - }) - ) + // Listen + await listener.listen(addrs[0]) + + // Create two connections to the listener + const socket1 = await transport.dial(addrs[0]) + await transport.dial(addrs[0]) + + pipe( + [Buffer.from('Some data that is never handled')], + socket1 + ).then(() => { + finish() }) + + // Closer the listener (will take a couple of seconds to time out) + await listener.close() + + // Pipe should have completed + await done }) describe('events', () => { - // eslint-disable-next-line - // TODO: figure out why it fails in the full test suite - it.skip('connection', (done) => { - const finish = plan(2, done) - + it('connection', (done) => { const listener = transport.createListener() - listener.on('connection', (conn) => { + listener.on('connection', async (conn) => { expect(conn).to.exist() - finish() - }) - - listener.listen(addrs[0], () => { - transport.dial(addrs[0], () => { - listener.close(finish) - }) - }) + listener.close() + done() + }); + + (async () => { + await listener.listen(addrs[0]) + await transport.dial(addrs[0]) + })() }) it('listening', (done) => { const listener = transport.createListener() - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(addrs[0]) }) - // eslint-disable-next-line - // TODO: how to get the listener to emit an error? - it.skip('error', (done) => { + it('error', (done) => { const listener = transport.createListener() listener.on('error', (err) => { expect(err).to.exist() - listener.close(done) + listener.close() + done() }) + listener.emit('error', new Error('my err')) }) it('close', (done) => { - const finish = plan(2, done) const listener = transport.createListener() - listener.on('close', finish) + listener.on('close', done); - listener.listen(addrs[0], () => { - listener.close(finish) - }) + (async () => { + await listener.listen(addrs[0]) + await listener.close() + })() }) }) }) } - -function plan (n, done) { - let i = 0 - return (err) => { - if (err) return done(err) - i++ - - if (i === n) done() - } -} From 355cfe5c0fd6ad6ecf9fac4a606c1b4a5327c056 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 12 Apr 2019 11:13:03 +0800 Subject: [PATCH 2/7] test: add tests for canceling dials --- package.json | 3 ++- src/dial-test.js | 41 +++++++++++++++++++++++++++++++++++++++-- src/errors.js | 16 ++++++++++++++++ src/index.js | 2 ++ 4 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 src/errors.js diff --git a/package.json b/package.json index b600136..0c6765e 100644 --- a/package.json +++ b/package.json @@ -38,8 +38,9 @@ "dirty-chai": "^2.0.1" }, "dependencies": { + "abort-controller": "^3.0.0", "chai": "^4.2.0", - "it-goodbye": "^1.0.0", + "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^5.0.2", "streaming-iterables": "^4.0.2" diff --git a/src/dial-test.js b/src/dial-test.js index 66b0a35..35d383b 100644 --- a/src/dial-test.js +++ b/src/dial-test.js @@ -9,15 +9,18 @@ chai.use(dirtyChai) const goodbye = require('it-goodbye') const { collect } = require('streaming-iterables') const pipe = require('it-pipe') +const AbortController = require('abort-controller') +const AbortError = require('./errors').AbortError module.exports = (common) => { describe('dial', () => { let addrs let transport + let connector let listener before(async () => { - ({ transport, addrs } = await common.setup()) + ({ addrs, transport, connector } = await common.setup()) }) after(() => common.teardown && common.teardown()) @@ -47,7 +50,41 @@ module.exports = (common) => { // Success: expected an error to be throw return } - expect.fail('Did not throw error') + expect.fail('Did not throw error attempting to connect to non-existent listener') + }) + + it('cancel before dialing', async () => { + const controller = new AbortController() + controller.abort() + const socket = transport.dial(addrs[0], { signal: controller.signal }) + + try { + await socket + } catch (err) { + expect(err.code).to.eql(AbortError.code) + return + } + expect.fail('Did not throw error with code ' + AbortError.code) + }) + + it('cancel while dialing', async () => { + // Add a delay to connect() so that we can cancel while the dial is in + // progress + connector.delay(100) + + const controller = new AbortController() + const socket = transport.dial(addrs[0], { signal: controller.signal }) + setTimeout(() => controller.abort(), 50) + + try { + await socket + } catch (err) { + expect(err.code).to.eql(AbortError.code) + return + } finally { + connector.restore() + } + expect.fail('Did not throw error with code ' + AbortError.code) }) }) } diff --git a/src/errors.js b/src/errors.js new file mode 100644 index 0000000..32289a5 --- /dev/null +++ b/src/errors.js @@ -0,0 +1,16 @@ +'use strict' + +class AbortError extends Error { + constructor () { + super('AbortError') + this.code = AbortError.code + } + + static get code () { + return 'ABORT_ERR' + } +} + +module.exports = { + AbortError +} diff --git a/src/index.js b/src/index.js index e8173e2..ce793f3 100644 --- a/src/index.js +++ b/src/index.js @@ -10,3 +10,5 @@ module.exports = (common) => { listen(common) }) } + +module.exports.AbortError = require('./errors').AbortError From d17dea180e9d9922a15630aa39c0e0ec1b1adc31 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 12 Apr 2019 11:33:34 +0800 Subject: [PATCH 3/7] feat: Adapter class --- package.json | 3 ++ src/adapter.js | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/index.js | 1 + 3 files changed, 84 insertions(+) create mode 100644 src/adapter.js diff --git a/package.json b/package.json index 0c6765e..723903c 100644 --- a/package.json +++ b/package.json @@ -39,10 +39,13 @@ }, "dependencies": { "abort-controller": "^3.0.0", + "async-iterator-to-pull-stream": "^1.3.0", "chai": "^4.2.0", + "interface-connection": "~0.3.3", "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^5.0.2", + "pull-stream": "^3.6.9", "streaming-iterables": "^4.0.2" }, "contributors": [ diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..e7c87db --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,80 @@ +'use strict' + +const { Connection } = require('interface-connection') +const toPull = require('async-iterator-to-pull-stream') +const error = require('pull-stream/sources/error') +const drain = require('pull-stream/sinks/drain') +const noop = () => {} + +function callbackify (fn) { + return async function (...args) { + let cb = args.pop() + if (typeof cb !== 'function') { + args.push(cb) + cb = noop + } + let res + try { + res = await fn(...args) + } catch (err) { + return cb(err) + } + cb(null, res) + } +} + +// Legacy adapter to old transport & connection interface +class Adapter { + constructor (transport) { + this.transport = transport + } + + dial (ma, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + callback = callback || noop + + const conn = new Connection() + + this.transport.dial(ma, options) + .then(socket => { + conn.setInnerConn(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + callback(null, conn) + }) + .catch(err => { + conn.setInnerConn({ sink: drain(), source: error(err) }) + callback(err) + }) + + return conn + } + + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = {} + } + + const server = this.transport.createListener(options, socket => { + const conn = new Connection(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + handler(conn) + }) + + const proxy = { + listen: callbackify(server.listen.bind(server)), + close: callbackify(server.close.bind(server)), + getAddrs: callbackify(server.getAddrs.bind(server)), + getObservedAddrs: callbackify(() => server.getObservedAddrs()) + } + + return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) + } +} + +module.exports = Adapter diff --git a/src/index.js b/src/index.js index ce793f3..73f1373 100644 --- a/src/index.js +++ b/src/index.js @@ -12,3 +12,4 @@ module.exports = (common) => { } module.exports.AbortError = require('./errors').AbortError +module.exports.Adapter = require('./adapter') From 639e84da6d7011402ae9664f199133c283a8a1a0 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 12:00:38 +0800 Subject: [PATCH 4/7] test: wait for listener.close() in tests --- src/listen-test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/listen-test.js b/src/listen-test.js index 6874b01..84d541d 100644 --- a/src/listen-test.js +++ b/src/listen-test.js @@ -61,7 +61,7 @@ module.exports = (common) => { listener.on('connection', async (conn) => { expect(conn).to.exist() - listener.close() + await listener.close() done() }); @@ -82,9 +82,9 @@ module.exports = (common) => { it('error', (done) => { const listener = transport.createListener() - listener.on('error', (err) => { + listener.on('error', async (err) => { expect(err).to.exist() - listener.close() + await listener.close() done() }) listener.emit('error', new Error('my err')) From baa62901dfd1fdf2a890485577ce4a72ec911843 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 12:35:46 +0800 Subject: [PATCH 5/7] chore: update README for async/await --- README.md | 111 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 73 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index cb3fd3b..fb41b43 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,9 @@ The primary goal of this module is to enable developers to pick and swap their t Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite. -The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to uniform several transports through a shimmed interface. +The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to provide a uniform API for several transports through a shimmed interface. -The API is presented with both Node.js and Go primitives, however, there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks. +The API is presented with both Node.js and Go primitives, however there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks. ## Lead Maintainer @@ -48,16 +48,32 @@ const YourTransport = require('../src') describe('compliance', () => { tests({ - setup (cb) { - let t = new YourTransport() + setup () { + let transport = new YourTransport() + const addrs = [ multiaddr('valid-multiaddr-for-your-transport'), multiaddr('valid-multiaddr2-for-your-transport') ] - cb(null, t, addrs) + + const network = require('my-network-lib') + const connect = network.connect + const connector = { + delay (delayMs) { + // Add a delay in the connection mechanism for the transport + // (this is used by the dial tests) + network.connect = (...args) => setTimeout(() => connect(...args), 100) + }, + restore () { + // Restore the connection mechanism to normal + network.connect = connect + } + } + + return { transport, addrs, connector } }, - teardown (cb) { - cb() + teardown () { + // Clean up any resources created by setup() } }) }) @@ -69,50 +85,73 @@ describe('compliance', () => { # API -A valid (read: that follows the interface defined) transport, must implement the following API. +A valid transport (one that follows the interface defined) must implement the following API: **Table of contents:** - type: `Transport` - `new Transport([options])` - - `transport.dial(multiaddr, [options, callback])` + - ` transport.dial(multiaddr, [options])` - `transport.createListener([options], handlerFunction)` - type: `transport.Listener` - event: 'listening' - event: 'close' - event: 'connection' - event: 'error' - - `listener.listen(multiaddr, [callback])` - - `listener.getAddrs(callback)` - - `listener.close([options])` + - ` listener.listen(multiaddr)` + - `listener.getAddrs()` + - ` listener.close([options])` ### Creating a transport instance - `JavaScript` - `var transport = new Transport([options])` -Creates a new Transport instance. `options` is a optional JavaScript object, might include the necessary parameters for the transport instance. +Creates a new Transport instance. `options` is an optional JavaScript object that should include the necessary parameters for the transport instance. -**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. One example is a libp2p-webrtc-star (or pretty much any other WebRTC flavour transport), where that, in order to dial, a peer needs to be part of some signalling network that is shared also with the listener. +**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener. ### Dial to another peer -- `JavaScript` - `var conn = transport.dial(multiaddr, [options, callback])` +- `JavaScript` - `const conn = await transport.dial(multiaddr, [options])` -This method dials a transport to the Peer listening on `multiaddr`. +This method uses a transport to dial a Peer listening on `multiaddr`. `multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr). -`stream` must implements the [interface-connection](https://github.com/libp2p/interface-connection) interface. +`conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface. + +`[options]` the options that may be passed to the dial. Must support the `signal` option (see below) + +The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`. -`[options]` is an optional argument, which can be used by some implementations +### Canceling a dial -`callback` should follow the `function (err)` signature. +Dials may be cancelled using an `AbortController`: -`err` is an `Error` instance to signal that the dial was unsuccessful, this error can be a 'timeout' or simply 'error'. +```Javascript +const AbortController = require('abort-controller') +const { AbortError } = require('interface-transport') +const controller = new AbortController() +try { + const conn = await mytransport.dial(ma, { signal: controller.signal }) + // Do stuff with conn here ... +} catch (err) { + if(err.code === AbortError.code) { + // Dial was aborted, just bail out + return + } + throw err +} + +// ---- +// In some other part of the code: + controller.abort() +// ---- +``` ### Create a listener -- `JavaScript` - `var listener = transport.createListener([options], handlerFunction)` +- `JavaScript` - `const listener = transport.createListener([options], handlerFunction)` This method creates a listener on the transport. @@ -120,37 +159,33 @@ This method creates a listener on the transport. `handlerFunction` is a function called each time a new connection is received. It must follow the following signature: `function (conn) {}`, where `conn` is a connection that follows the [`interface-connection`](https://github.com/diasdavid/interface-connection). -The listener object created, can emit the following events: +The listener object created may emit the following events: -- `listening` - -- `close` - -- `connection` - -- `error` - +- `listening` - when the listener is ready for incoming connections +- `close` - when the listener is closed +- `connection` - (`conn`) each time an incoming connection is received +- `error` - (`err`) each time there is an error on the connection ### Start a listener -- `JavaScript` - `listener.listen(multiaddr, [callback])` +- `JavaScript` - `await listener.listen(multiaddr)` This method puts the listener in `listening` mode, waiting for incoming connections. -`multiaddr` is the address where the listener should bind to. - -`callback` is a function called once the listener is ready. +`multiaddr` is the address that the listener should bind to. ### Get listener addrs -- `JavaScript` - `listener.getAddrs(callback)` +- `JavaScript` - `listener.getAddrs()` -This method retrieves the addresses in which this listener is listening. Useful for when listening on port 0 or any interface (0.0.0.0). +This method returns the addresses on which this listener is listening. Useful when listening on port 0 or any interface (0.0.0.0). ### Stop a listener -- `JavaScript` - `listener.close([options, callback])` - -This method closes the listener so that no more connections can be open on this transport instance. +- `JavaScript` - `await listener.close([options])` -`options` is an optional object that might contain the following properties: +This method closes the listener so that no more connections can be opened on this transport instance. - - `timeout` - A timeout value (in ms) that fires and destroys all the connections on this transport if the transport is not able to close graciously. (e.g { timeout: 1000 }) +`options` is an optional object that may contain the following properties: -`callback` is function that gets called when the listener is closed. It is optional. + - `timeout` - A timeout value (in ms) after which all connections on this transport will be destroyed if the transport is not able to close gracefully. (e.g { timeout: 1000 }) From 44501d5afbe20cf941e336aaf4e1a6a81b297e92 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 18:14:10 +0800 Subject: [PATCH 6/7] chore: Reorder params in README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fb41b43..bbe4f05 100644 --- a/README.md +++ b/README.md @@ -118,10 +118,10 @@ This method uses a transport to dial a Peer listening on `multiaddr`. `multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr). -`conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface. - `[options]` the options that may be passed to the dial. Must support the `signal` option (see below) +`conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface. + The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`. ### Canceling a dial From 2488a89b61c9eda7104aeedea072a15bf762b074 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 17 Apr 2019 21:18:35 +0800 Subject: [PATCH 7/7] chore: move semi-colons around --- src/listen-test.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/listen-test.js b/src/listen-test.js index 84d541d..8abdbca 100644 --- a/src/listen-test.js +++ b/src/listen-test.js @@ -63,9 +63,9 @@ module.exports = (common) => { expect(conn).to.exist() await listener.close() done() - }); + }) - (async () => { + ;(async () => { await listener.listen(addrs[0]) await transport.dial(addrs[0]) })() @@ -92,9 +92,9 @@ module.exports = (common) => { it('close', (done) => { const listener = transport.createListener() - listener.on('close', done); + listener.on('close', done) - (async () => { + ;(async () => { await listener.listen(addrs[0]) await listener.close() })()