Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
feat: callbacks -> async / await (#44)
Browse files Browse the repository at this point in the history
* feat: callbacks -> async / await

BREAKING CHANGE: All places in the API that used callbacks are now replaced with async/await

* test: add tests for canceling dials

* feat: Adapter class
  • Loading branch information
dirkmc authored and jacobheun committed Apr 17, 2019
1 parent 6e99899 commit b30ee5f
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 158 deletions.
111 changes: 73 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ The primary goal of this module is to enable developers to pick and swap their t

Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite.

The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to uniform several transports through a shimmed interface.
The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to provide a uniform API for several transports through a shimmed interface.

The API is presented with both Node.js and Go primitives, however, there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.
The API is presented with both Node.js and Go primitives, however there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.

## Lead Maintainer

Expand Down Expand Up @@ -48,16 +48,32 @@ const YourTransport = require('../src')

describe('compliance', () => {
tests({
setup (cb) {
let t = new YourTransport()
setup () {
let transport = new YourTransport()

const addrs = [
multiaddr('valid-multiaddr-for-your-transport'),
multiaddr('valid-multiaddr2-for-your-transport')
]
cb(null, t, addrs)

const network = require('my-network-lib')
const connect = network.connect
const connector = {
delay (delayMs) {
// Add a delay in the connection mechanism for the transport
// (this is used by the dial tests)
network.connect = (...args) => setTimeout(() => connect(...args), 100)
},
restore () {
// Restore the connection mechanism to normal
network.connect = connect
}
}

return { transport, addrs, connector }
},
teardown (cb) {
cb()
teardown () {
// Clean up any resources created by setup()
}
})
})
Expand All @@ -69,88 +85,107 @@ describe('compliance', () => {
# API

A valid (read: that follows the interface defined) transport, must implement the following API.
A valid transport (one that follows the interface defined) must implement the following API:

**Table of contents:**

- type: `Transport`
- `new Transport([options])`
- `transport.dial(multiaddr, [options, callback])`
- `<Promise> transport.dial(multiaddr, [options])`
- `transport.createListener([options], handlerFunction)`
- type: `transport.Listener`
- event: 'listening'
- event: 'close'
- event: 'connection'
- event: 'error'
- `listener.listen(multiaddr, [callback])`
- `listener.getAddrs(callback)`
- `listener.close([options])`
- `<Promise> listener.listen(multiaddr)`
- `listener.getAddrs()`
- `<Promise> listener.close([options])`

### Creating a transport instance

- `JavaScript` - `var transport = new Transport([options])`

Creates a new Transport instance. `options` is a optional JavaScript object, might include the necessary parameters for the transport instance.
Creates a new Transport instance. `options` is an optional JavaScript object that should include the necessary parameters for the transport instance.

**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. One example is a libp2p-webrtc-star (or pretty much any other WebRTC flavour transport), where that, in order to dial, a peer needs to be part of some signalling network that is shared also with the listener.
**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener.

### Dial to another peer

- `JavaScript` - `var conn = transport.dial(multiaddr, [options, callback])`
- `JavaScript` - `const conn = await transport.dial(multiaddr, [options])`

This method dials a transport to the Peer listening on `multiaddr`.
This method uses a transport to dial a Peer listening on `multiaddr`.

`multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr).

`stream` must implements the [interface-connection](https://github.com/libp2p/interface-connection) interface.
`[options]` the options that may be passed to the dial. Must support the `signal` option (see below)

`conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface.

The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`.

`[options]` is an optional argument, which can be used by some implementations
### Canceling a dial

`callback` should follow the `function (err)` signature.
Dials may be cancelled using an `AbortController`:

`err` is an `Error` instance to signal that the dial was unsuccessful, this error can be a 'timeout' or simply 'error'.
```Javascript
const AbortController = require('abort-controller')
const { AbortError } = require('interface-transport')
const controller = new AbortController()
try {
const conn = await mytransport.dial(ma, { signal: controller.signal })
// Do stuff with conn here ...
} catch (err) {
if(err.code === AbortError.code) {
// Dial was aborted, just bail out
return
}
throw err
}

// ----
// In some other part of the code:
controller.abort()
// ----
```

### Create a listener

- `JavaScript` - `var listener = transport.createListener([options], handlerFunction)`
- `JavaScript` - `const listener = transport.createListener([options], handlerFunction)`

This method creates a listener on the transport.

`options` is an optional object that contains the properties the listener must have, in order to properly listen on a given transport/socket.

`handlerFunction` is a function called each time a new connection is received. It must follow the following signature: `function (conn) {}`, where `conn` is a connection that follows the [`interface-connection`](https://github.com/diasdavid/interface-connection).

The listener object created, can emit the following events:
The listener object created may emit the following events:

- `listening` -
- `close` -
- `connection` -
- `error` -
- `listening` - when the listener is ready for incoming connections
- `close` - when the listener is closed
- `connection` - (`conn`) each time an incoming connection is received
- `error` - (`err`) each time there is an error on the connection

### Start a listener

- `JavaScript` - `listener.listen(multiaddr, [callback])`
- `JavaScript` - `await listener.listen(multiaddr)`

This method puts the listener in `listening` mode, waiting for incoming connections.

`multiaddr` is the address where the listener should bind to.

`callback` is a function called once the listener is ready.
`multiaddr` is the address that the listener should bind to.

### Get listener addrs

- `JavaScript` - `listener.getAddrs(callback)`
- `JavaScript` - `listener.getAddrs()`

This method retrieves the addresses in which this listener is listening. Useful for when listening on port 0 or any interface (0.0.0.0).
This method returns the addresses on which this listener is listening. Useful when listening on port 0 or any interface (0.0.0.0).

### Stop a listener

- `JavaScript` - `listener.close([options, callback])`

This method closes the listener so that no more connections can be open on this transport instance.
- `JavaScript` - `await listener.close([options])`

`options` is an optional object that might contain the following properties:
This method closes the listener so that no more connections can be opened on this transport instance.

- `timeout` - A timeout value (in ms) that fires and destroys all the connections on this transport if the transport is not able to close graciously. (e.g { timeout: 1000 })
`options` is an optional object that may contain the following properties:

`callback` is function that gets called when the listener is closed. It is optional.
- `timeout` - A timeout value (in ms) after which all connections on this transport will be destroyed if the transport is not able to close gracefully. (e.g { timeout: 1000 })
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
"dirty-chai": "^2.0.1"
},
"dependencies": {
"abort-controller": "^3.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"chai": "^4.2.0",
"interface-connection": "~0.3.3",
"it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0",
"multiaddr": "^5.0.2",
"pull-goodbye": "~0.0.2",
"pull-serializer": "~0.3.2",
"pull-stream": "^3.6.9"
"pull-stream": "^3.6.9",
"streaming-iterables": "^4.0.2"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand Down
80 changes: 80 additions & 0 deletions src/adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict'

const { Connection } = require('interface-connection')
const toPull = require('async-iterator-to-pull-stream')
const error = require('pull-stream/sources/error')
const drain = require('pull-stream/sinks/drain')
const noop = () => {}

function callbackify (fn) {
return async function (...args) {
let cb = args.pop()
if (typeof cb !== 'function') {
args.push(cb)
cb = noop
}
let res
try {
res = await fn(...args)
} catch (err) {
return cb(err)
}
cb(null, res)
}
}

// Legacy adapter to old transport & connection interface
class Adapter {
constructor (transport) {
this.transport = transport
}

dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

callback = callback || noop

const conn = new Connection()

this.transport.dial(ma, options)
.then(socket => {
conn.setInnerConn(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
conn.close = callbackify(socket.close.bind(socket))
callback(null, conn)
})
.catch(err => {
conn.setInnerConn({ sink: drain(), source: error(err) })
callback(err)
})

return conn
}

createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}

const server = this.transport.createListener(options, socket => {
const conn = new Connection(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
handler(conn)
})

const proxy = {
listen: callbackify(server.listen.bind(server)),
close: callbackify(server.close.bind(server)),
getAddrs: callbackify(server.getAddrs.bind(server)),
getObservedAddrs: callbackify(() => server.getObservedAddrs())
}

return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] })
}
}

module.exports = Adapter
Loading

0 comments on commit b30ee5f

Please sign in to comment.