Skip to content
This repository has been archived by the owner on Feb 26, 2021. It is now read-only.

refactor: async #77

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
'use strict'

const parallel = require('async/parallel')
const rendezvous = require('libp2p-websocket-star-rendezvous')

let _r = []
let f = true // first run. used so metric gets only enabled once otherwise it crashes

function boot (done) {
async function boot () {
const base = (v) => Object.assign({
host: '0.0.0.0',
cryptoChallenge: false,
strictMultiaddr: false,
refreshPeerListIntervalMS: 1000
}, v)

parallel([['r1', {port: 15001, metrics: f}], ['r2', {port: 15002}], ['r3', {port: 15003, host: '::'}], ['r4', {port: 15004, cryptoChallenge: true}]].map((v) => async () => {
const r = await rendezvous.start(base(v.pop()))
console.log('%s: %s', v.pop(), r.info.uri)
_r.push(r)
}), done)
const rendezousList = [
['r1', { port: 15001, metrics: f }],
['r2', { port: 15002 }],
['r3', { port: 15003, host: '::' }],
['r4', { port: 15004, cryptoChallenge: true }]
]

_r = await Promise.all(rendezousList.map((v) => rendezvous.start(base(v.pop()))))

if (f) f = false
}

function stop (done) {
parallel(_r.map((r) => () => r.stop()), done)
async function stop () {
await Promise.all(_r.map((r) => r.stop()))
_r = []
}

Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
include:
- stage: check
script:
- npx aegir commitlint --travis
- npx aegir build --bundlesize
- npx aegir dep-check -- -i wrtc -i electron-webrtc
- npm run lint

Expand Down
30 changes: 19 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,42 @@
],
"license": "MIT",
"dependencies": {
"abortable-iterator": "^2.1.0",
"async": "^2.6.2",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"interface-connection": "~0.3.3",
"libp2p-crypto": "~0.16.1",
"mafmt": "^6.0.7",
"multiaddr": "^6.1.0",
"nanoid": "^2.0.1",
"libp2p-crypto": "~0.16.2",
"libp2p-utils": "^0.1.0",
"mafmt": "^7.0.0",
"multiaddr": "^7.1.0",
"nanoid": "^2.1.1",
"once": "^1.4.0",
"peer-id": "~0.12.2",
"peer-info": "~0.15.1",
"pull-stream": "^3.6.13",
"peer-id": "~0.13.3",
"peer-info": "~0.17.0",
"pull-stream": "^3.6.14",
"safe-buffer": "^5.2.0",
"socket.io-client": "^2.2.0",
"socket.io-client": "^2.3.0",
"socket.io-pull-stream": "~0.1.5",
"uuid": "^3.3.2"
"uuid": "^3.3.3"
},
"files": [
"src",
"dist"
],
"devDependencies": {
"aegir": "^20.0.0",
"aegir": "^20.3.1",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"it-pipe": "^1.0.1",
"libp2p-websocket-star-rendezvous": "~0.4.1",
"lodash": "^4.17.11"
"p-map": "^3.0.0",
"streaming-iterables": "^4.1.0"
},
"pre-push": [
"lint"
],
"repository": {
"type": "git",
"url": "git+https://github.com/libp2p/js-libp2p-websocket-star.git"
Expand Down
24 changes: 12 additions & 12 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,33 @@
],
"license": "MIT",
"dependencies": {
"@hapi/hapi": "^18.4.0",
"@hapi/inert": "^5.2.2",
"data-queue": "0.0.3",
"debug": "^4.1.1",
"@hapi/hapi": "^18.3.1",
"@hapi/inert": "^5.2.1",
"libp2p-crypto": "~0.17.0",
"mafmt": "^6.0.7",
"mafmt": "^7.0.0",
"menoetius": "~0.0.2",
"merge-recursive": "0.0.3",
"minimist": "^1.2.0",
"multiaddr": "^6.1.0",
"multiaddr": "^7.1.0",
"once": "^1.4.0",
"peer-id": "~0.13.1",
"peer-info": "~0.16.0",
"peer-id": "~0.13.3",
"peer-info": "~0.17.0",
"prom-client": "^11.5.3",
"socket.io": "^2.0.4",
"socket.io-client": "^2.0.4",
"socket.io-pull-stream": "^0.1.1",
"uuid": "^3.1.0"
"socket.io": "^2.3.0",
"socket.io-client": "^2.3.0",
"socket.io-pull-stream": "^0.1.5",
"uuid": "^3.3.3"
},
"directories": {
"test": "test"
},
"devDependencies": {
"aegir": "^19.0.5",
"aegir": "^20.3.1",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"lodash": "^4.17.11"
"lodash": "^4.17.15"
},
"repository": {
"type": "git",
Expand Down
2 changes: 1 addition & 1 deletion server/src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ function Protocol (log) {
}
this.handleSocket = (socket) => {
socket.r = {}
for (let request in this.requests) {
for (const request in this.requests) {
if (Object.prototype.hasOwnProperty.call(this.requests, request)) {
const r = this.requests[request]
socket.on(request, function () {
Expand Down
8 changes: 4 additions & 4 deletions server/test/rendezvous.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ describe('signalling server client', () => {
let c3
let c4

let c1mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo1')
let c2mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo2')
let c3mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo3')
let c4mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo4')
const c1mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo1')
const c2mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo2')
const c3mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo3')
const c4mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p-websocket-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSoooo4')

before(async () => {
const options = {
Expand Down
8 changes: 8 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

// p2p multi-address code
exports.CODE_P2P = 421
exports.CODE_CIRCUIT = 290

// Time to wait for a connection to close gracefully before destroying it manually
exports.CLOSE_TIMEOUT = 2000
121 changes: 67 additions & 54 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,49 +1,60 @@
'use strict'

const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:websocket-star')

const withIs = require('class-is')
const { EventEmitter } = require('events')
const errCode = require('err-code')

const multiaddr = require('multiaddr')
const EE = require('events').EventEmitter
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const Connection = require('interface-connection').Connection
const setImmediate = require('async/setImmediate')
const utils = require('./utils')
const Listener = require('./listener')
const cleanUrlSIO = utils.cleanUrlSIO
const mafmt = require('mafmt')
const withIs = require('class-is')

const { cleanUrlSIO } = require('./utils')
const Listener = require('./listener')
const toConnection = require('./socket-to-conn')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')

/**
* @class WebsocketStar
*/
class WebsocketStar {
/**
* WebsocketStar Transport
* @class
* @param {Object} options - Options for the listener
* @param {PeerId} options.id - Id for the crypto challenge
* @constructor
* @param {Object} options options
* @param {Upgrader} options.upgrader connection upgrader
* @param {PeerId} options.id id for the crypto challenge
* @param {boolean} options.allowJoinWithDisabledChallenge
*/
constructor (options) {
options = options || {}
constructor ({ upgrader, id, allowJoinWithDisabledChallenge }) {
assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
this._upgrader = upgrader
this.id = id
this.flag = allowJoinWithDisabledChallenge // let's just refer to it as "flag"

this.id = options.id
this.flag = options.allowJoinWithDisabledChallenge // let's just refer to it as "flag"
this.listenersRefs = {}

this.discovery = new EE()
// Discovery
this.discovery = new EventEmitter()
this.discovery.tag = 'websocketStar'
this.discovery.start = (callback) => {
setImmediate(callback)
this.discovery._isStarted = false
this.discovery.start = () => {
this.discovery._isStarted = true
}
this.discovery.stop = (callback) => {
setImmediate(callback)
this.discovery.stop = () => {
this.discovery._isStarted = false
}

this.listeners_list = {}
this._peerDiscovered = this._peerDiscovered.bind(this)
}

/**
* Sets the id after transport creation (aka the lazy way)
* @param {PeerId} id
* @returns {undefined}
* @returns {void}
*/
lazySetId (id) {
if (!id) return
Expand All @@ -52,30 +63,29 @@ class WebsocketStar {
}

/**
* Dials a peer
* @async
* @param {Multiaddr} ma - Multiaddr to dial to
* @param {Object} options
* @param {function} callback
* @returns {Connection}
* @param {Object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Connection} An upgraded connection
*/
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
async dial (ma, options = {}) {
log('dialing %s', ma)

const url = cleanUrlSIO(ma)
const listener = this.listenersRefs[url]

let url
try {
url = cleanUrlSIO(ma)
} catch (err) {
return callback(err) // early
}
const listener = this.listeners_list[url]
if (!listener) {
callback(new Error('No listener for this server'))
return new Connection()
throw errCode(new Error('No listener for this server'), 'ERR_NO_LISTENER_AVAILABLE')
}
return listener.dial(ma, options, callback)

const socket = await listener.dial(ma, options)
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)

const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}

/**
Expand All @@ -84,7 +94,7 @@ class WebsocketStar {
* @param {function} handler
* @returns {Listener}
*/
createListener (options, handler) {
createListener (options = {}, handler) {
if (typeof options === 'function') {
handler = options
options = {}
Expand All @@ -93,7 +103,7 @@ class WebsocketStar {
const listener = new Listener({
id: this.id,
handler,
listeners: this.listeners_list,
listeners: this.listenersRefs,
flag: this.flag
})

Expand All @@ -103,28 +113,31 @@ class WebsocketStar {
}

/**
* Filters multiaddrs
* @param {Multiaddr[]} multiaddrs
* @returns {boolean}
*/
* Takes a list of `Multiaddr`s and returns only valid Websockets addresses
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]} Valid Websockets multiaddrs
*/
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter((ma) => {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}

return multiaddrs.filter((ma) => mafmt.WebSocketStar.matches(ma))
return mafmt.WebSocketStar.matches(ma.decapsulateCode(CODE_P2P))
})
}

/**
* @private
* Used to fire peer events on the discovery part
* @param {Multiaddr} maStr
* @fires Discovery#peer
* @returns {undefined}
* @private
*/
_peerDiscovered (maStr) {
log('Peer Discovered:', maStr)
const peerIdStr = maStr.split('/ipfs/').pop()
const peerIdStr = maStr.split('/p2p/').pop()
const peerId = PeerId.createFromB58String(peerIdStr)
const peerInfo = new PeerInfo(peerId)

Expand Down
Loading