From d0a3d594062233f7fcd23ef2ce9ab4e2ef688197 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 8 Apr 2017 05:28:00 -0700 Subject: [PATCH] refactor: split transport dialer and circuit logic --- src/circuit/constants.js | 29 +++ src/circuit/dialer.js | 238 +++++++++++++++++++++++ src/circuit/onion-dialer.js | 149 +++++++++++++++ src/{ => circuit}/relay.js | 16 +- src/circuit/utils.js | 41 ++++ src/codes.js | 22 --- src/dialer.js | 371 ++---------------------------------- src/index.js | 2 +- src/listener.js | 7 +- test/index.spec.js | 30 ++- 10 files changed, 499 insertions(+), 406 deletions(-) create mode 100644 src/circuit/constants.js create mode 100644 src/circuit/dialer.js create mode 100644 src/circuit/onion-dialer.js rename src/{ => circuit}/relay.js (89%) create mode 100644 src/circuit/utils.js delete mode 100644 src/codes.js diff --git a/src/circuit/constants.js b/src/circuit/constants.js new file mode 100644 index 0000000..2af03d4 --- /dev/null +++ b/src/circuit/constants.js @@ -0,0 +1,29 @@ +'use strict' + +module.exports = { + PRIOIRY: 100, + DIALER: { + ONION: 'onion', + TELESCOPE: 'telescope' + }, + RESPONSE: { + SUCCESS: 100, + HOP: { + SRC_ADDR_TOO_LONG: 220, + DST_ADDR_TOO_LONG: 221, + SRC_MULTIADDR_INVALID: 250, + DST_MULTIADDR_INVALID: 251, + NO_CONN_TO_DST: 260, + CANT_DIAL_DST: 261, + CANT_OPEN_DST_STREAM: 262, + CANT_SPEAK_RELAY: 270, + CANT_CONNECT_TO_SELF: 280 + }, + STOP: { + SRC_ADDR_TOO_LONG: 320, + DST_ADDR_TOO_LONG: 321, + SRC_MULTIADDR_INVALID: 350, + DST_MULTIADDR_INVALID: 351 + } + } +} diff --git a/src/circuit/dialer.js b/src/circuit/dialer.js new file mode 100644 index 0000000..3cb4530 --- /dev/null +++ b/src/circuit/dialer.js @@ -0,0 +1,238 @@ +'use strict' + +const pull = require('pull-stream') +const handshake = require('pull-handshake') +const Connection = require('interface-connection').Connection +const mafmt = require('mafmt') +const isFunction = require('lodash.isfunction') +const multiaddr = require('multiaddr') +const lp = require('pull-length-prefixed') +const constants = require('./constants') +const once = require('once') +const utils = require('./utils') + +const debug = require('debug') +const log = debug('libp2p:circuit:dialer') +log.err = debug('libp2p:circuit:error:dialer') + +const multicodec = require('../multicodec') + +class Dialer { + /** + * Creates an instance of Dialer. + * @param {Swarm} swarm - the swarm + * @param {any} options - config options + * + * @memberOf Dialer + */ + constructor (swarm, options) { + this.swarm = swarm + this.relayPeers = new Map() + this.options = options + // this.handler = handler // this handler is passed to the listener + + // get all the relay addresses for this swarm + const relays = this.filter(swarm._peerInfo.multiaddrs.toArray()) + + // if no explicit relays, add a default relay addr + if (relays.length === 0) { + this.swarm + ._peerInfo + .multiaddrs + .add(`/p2p-circuit/ipfs/${this.swarm._peerInfo.id.toB58String()}`) + } + + this.dialSwarmRelays(relays) + + this.swarm.on('peer-mux-established', this.dialRelay.bind(this)) + this.swarm.on('peer-mux-closed', (peerInfo) => { + this.relayPeers.delete(peerInfo.id.toB58String()) + }) + } + + /** + * Dial the relays in the Addresses.Swarm config + * + * @param {Array} relays + * @return {void} + */ + dialSwarmRelays (relays) { + // if we have relay addresses in swarm config, then dial those relays + this.swarm.on('listening', () => { + relays.forEach((relay) => { + let relaySegments = relay + .toString() + .split('/p2p-circuit') + .filter(segment => segment.length) + + relaySegments.forEach((relaySegment) => { + this.dialRelay(utils.peerInfoFromMa(relaySegment)) + }) + }) + }) + } + + /** + * Dial a peer over a relay + * + * @param {multiaddr} ma - the multiaddr of the peer to dial + * @param {Object} options - dial options + * @param {Function} cb - a callback called once dialed + * @returns {Connection} - the connection + * + * @memberOf Dialer + */ + dial (ma, options, cb) { + throw new Error('abstract class, method not implemented') + } + + /** + * Dial the destination peer over a relay + * + * @param {multiaddr} dstMa + * @param {Connection} relay + * @param {Function} cb + * @return {Function|void} + * @private + */ + dialPeer (dstMa, relay, cb) { + if (isFunction(relay)) { + cb = relay + relay = null + } + + if (!cb) { + cb = () => {} + } + + dstMa = multiaddr(dstMa) + // if no relay provided, dial on all available relays until one succeeds + if (!relay) { + const relays = Array.from(this.relayPeers.values()) + let next = (nextRelay) => { + if (!nextRelay) { + let err = `no relay peers were found` + log.err(err) + return cb(err) + } + + this.dialPeer(dstMa, nextRelay, (err, conn) => { + if (err) { + log.err(err) + return next(relays.shift()) + } + cb(null, new Connection(conn)) + }) + } + next(relays.shift()) + } else { + this.negotiateRelay(relay, dstMa, (err, conn) => { + if (err) { + log.err(`An error has occurred negotiating the relay connection`, err) + return cb(err) + } + + return cb(null, conn) + }) + } + } + + /** + * Negotiate the relay connection + * + * @param {Connection} conn - a connection to the relay + * @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for + * @param {Function} cb - a callback with that return the negotiated relay connection + * @returns {void} + * + * @memberOf Dialer + */ + negotiateRelay (conn, dstMa, cb) { + dstMa = multiaddr(dstMa) + + let stream = handshake({timeout: 1000 * 60}, cb) + let shake = stream.handshake + + log(`negotiating relay for peer ${dstMa.getPeerId()}`) + const values = [new Buffer(dstMa.toString())] + + pull( + pull.values(values), + lp.encode(), + pull.collect((err, encoded) => { + if (err) { + return cb(err) + } + + shake.write(encoded[0]) + shake.read(3, (err, data) => { + if (err) { + log.err(err) + return cb(err) + } + + if (Number(data.toString()) !== constants.RESPONSE.SUCCESS) { + cb(new Error(`Got ${data.toString()} error code trying to dial over relay`)) + } + + cb(null, shake.rest()) + }) + }) + ) + + pull(stream, conn, stream) + } + + /** + * Dial a relay peer by its PeerInfo + * + * @param {PeerInfo} peer - the PeerInfo of the relay peer + * @param {Function} cb - a callback with the connection to the relay peer + * @returns {Function|void} + * + * @memberOf Dialer + */ + dialRelay (peer, cb) { + cb = once(cb || (() => {})) + + const b58Id = utils.getB58String(peer) + const relay = this.relayPeers.get(b58Id) + if (relay) { + cb(null, relay) + } + + const relayConn = new Connection() + relayConn.setPeerInfo(peer) + // attempt to dia the relay so that we have a connection + this.swarm.dial(peer, multicodec.hop, once((err, conn) => { + if (err) { + log.err(err) + return cb(err) + } + + relayConn.setInnerConn(conn) + this.relayPeers.set(b58Id, relayConn) + cb(null, relayConn) + })) + } + + /** + * Filter check for all multiaddresses + * that this transport can dial on + * + * @param {any} multiaddrs + * @returns {Array} + * + * @memberOf Dialer + */ + filter (multiaddrs) { + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } + return multiaddrs.filter((ma) => { + return mafmt.Circuit.matches(ma) + }) + } +} + +module.exports = Dialer diff --git a/src/circuit/onion-dialer.js b/src/circuit/onion-dialer.js new file mode 100644 index 0000000..e7d954f --- /dev/null +++ b/src/circuit/onion-dialer.js @@ -0,0 +1,149 @@ +'use strict' + +const Dialer = require('./dialer') +const utils = require('./utils') +const isFunction = require('lodash.isfunction') +const multiaddr = require('multiaddr') +const Connection = require('interface-connection').Connection +const once = require('once') +const multicodec = require('../multicodec') + +const debug = require('debug') +const log = debug('libp2p:circuit:oniondialer') +log.err = debug('libp2p:circuit:error:oniondialer') + +class OnionDialer extends Dialer { + /** + * Dial a peer over a relay + * + * @param {multiaddr} ma - the multiaddr of the peer to dial + * @param {Object} options - dial options + * @param {Function} cb - a callback called once dialed + * @returns {Connection} - the connection + * + * @memberOf Dialer + */ + dial (ma, options, cb) { + if (isFunction(options)) { + cb = options + options = {} + } + + if (!cb) { + cb = () => {} + } + + let maddrs = multiaddr(ma).toString().split('/p2p-circuit').filter((a) => a.length) + if (maddrs.length > 0) { + const id = multiaddr(maddrs[maddrs.length - 1]).getPeerId() + if (this.swarm._peerInfo.id.toB58String() === id) { + let err = `Cant dial to self!` + log.err(err) + return cb(err) + } + } + + let dstConn = new Connection() + this._onionDial(maddrs, (err, conn) => { + if (err) { + log.err(err) + return cb(err) + } + dstConn.setInnerConn(conn) + cb(null, dstConn) + }) + + return dstConn + } + + /** + * Dial a peer using onion dial - dial all relays in the ma + * in sequence and circuit dest over the all the pipes + * + * @param {multiaddr} maddrs + * @param {Connection} relay + * @param {Function} cb + * @return {void} + * @private + */ + _onionDial (maddrs, relay, cb) { + if (isFunction(relay)) { + cb = relay + relay = null + } + + let dialAddr = maddrs.shift() + const dial = (dstAddr, relayConn) => { + if (maddrs.length) { + this.dialPeer(dstAddr, relayConn, (err, conn) => { + if (err) { + log.err(err) + return cb(err) + } + + dstAddr = multiaddr(dstAddr) + this.relayPeers.set(dstAddr.getPeerId(), conn) + this._upgradeToHop(utils.peerInfoFromMa(dstAddr), conn, (err, upgraded) => { + if (err) { + log.err(err) + return cb(err) + } + + this._onionDial(maddrs, upgraded, cb) + }) + }) + } else { + this.dialPeer(dstAddr, relayConn, (err, conn) => { + if (err) { + return cb(err) + } + cb(null, conn) + }) + } + } + + if (maddrs.length > 0) { + const relayPeer = utils.peerInfoFromMa(dialAddr) + this.dialRelay(relayPeer, once((err, relayConn) => { + if (err) { + log.err(err) + return cb(err) + } + + dial(maddrs.shift(), relayConn) + })) + } else { + dial(dialAddr, relay) + } + } + + /** + * Upgrade a raw connection to a relayed link (hop) + * + * @param {PeerInfo} pi + * @param {Connection} conn + * @param {Function} cb + * + * @return {Function|void} + * @private + */ + _upgradeToHop (pi, conn, cb) { + const proxyConn = new Connection() + const handler = this.swarm.connHandler(pi, multicodec.hop, proxyConn) + handler.handleNew(conn, (err, upgradedConn) => { + if (err) { + log.err(err) + return cb(err) + } + + if (!upgradedConn) { + proxyConn.setInnerConn(conn) + upgradedConn = proxyConn + } + + cb(null, upgradedConn) + }) + } +} + +module.exports = OnionDialer diff --git a/src/relay.js b/src/circuit/relay.js similarity index 89% rename from src/relay.js rename to src/circuit/relay.js index fd5b29b..2009dd5 100644 --- a/src/relay.js +++ b/src/circuit/relay.js @@ -8,10 +8,10 @@ const PeerInfo = require('peer-info') const PeerId = require('peer-id') const EE = require('events').EventEmitter const multiaddr = require('multiaddr') -const codes = require('./codes') +const constants = require('constants') const once = require('once') -const multicodec = require('./multicodec') +const multicodec = require('./../multicodec') const log = debug('libp2p:circuit:relay') log.err = debug('libp2p:circuit:error:relay') @@ -59,8 +59,8 @@ class Relay extends EE { let addr = multiaddr(msg.toString()) // read the src multiaddr if (addr.getPeerId() === this.libp2p.peerInfo.id.toB58String()) { - this.emit('error', String(codes.HOP.CANT_CONNECT_TO_SELF)) - shake.write(String(codes.HOP.CANT_CONNECT_TO_SELF)) + this.emit('error', String(constants.RESPONSE.HOP.CANT_CONNECT_TO_SELF)) + shake.write(String(constants.RESPONSE.HOP.CANT_CONNECT_TO_SELF)) return } @@ -156,6 +156,14 @@ class Relay extends EE { }) } + /** + * Dial the dest peer and create a circuit + * + * @param {Multiaddr} ma + * @param {Function} callback + * @returns {Function|void} + * @private + */ _dialPeer (ma, callback) { const peerInfo = new PeerInfo(PeerId.createFromB58String(ma.getPeerId())) peerInfo.multiaddrs.add(ma) diff --git a/src/circuit/utils.js b/src/circuit/utils.js new file mode 100644 index 0000000..f70371d --- /dev/null +++ b/src/circuit/utils.js @@ -0,0 +1,41 @@ +'use strict' + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') + +/** + * Get b58 string from multiaddr or peerinfo + * + * @param {Multiaddr|PeerInfo} peer + * @return {*} + */ +exports.getB58String = function getB58String (peer) { + let b58Id + if (peer instanceof multiaddr || (typeof peer === 'string' || peer instanceof String)) { + const relayMa = multiaddr(peer) + b58Id = relayMa.getPeerId() + } else { + b58Id = peer.id.toB58String() + } + + return b58Id +} + +/** + * Helper to make a peer info from a multiaddrs + * + * @param {Multiaddr} ma + * @return {PeerInfo} + * @private + */ +exports.peerInfoFromMa = function peerInfoFromMa (ma) { + ma = multiaddr(ma) + const peer = new PeerInfo(PeerId.createFromB58String(ma.getPeerId())) + const addr = ma.decapsulate('ipfs') + if (addr.toString().length > 1 && addr.toString() !== '/') { + peer.multiaddrs.add(addr) + } + + return peer +} diff --git a/src/codes.js b/src/codes.js deleted file mode 100644 index 4290323..0000000 --- a/src/codes.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict' - -module.exports = { - SUCCESS: 100, - HOP: { - SRC_ADDR_TOO_LONG: 220, - DST_ADDR_TOO_LONG: 221, - SRC_MULTIADDR_INVALID: 250, - DST_MULTIADDR_INVALID: 251, - NO_CONN_TO_DST: 260, - CANT_DIAL_DST: 261, - CANT_OPEN_DST_STREAM: 262, - CANT_SPEAK_RELAY: 270, - CANT_CONNECT_TO_SELF: 280 - }, - STOP: { - SRC_ADDR_TOO_LONG: 320, - DST_ADDR_TOO_LONG: 321, - SRC_MULTIADDR_INVALID: 350, - DST_MULTIADDR_INVALID: 351 - } -} diff --git a/src/dialer.js b/src/dialer.js index 0ad1ff4..1ef9be5 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,22 +1,12 @@ 'use strict' -const pull = require('pull-stream') -const handshake = require('pull-handshake') -const Connection = require('interface-connection').Connection -const mafmt = require('mafmt') -const PeerInfo = require('peer-info') -const PeerId = require('peer-id') -const isFunction = require('lodash.isfunction') -const multiaddr = require('multiaddr') -const lp = require('pull-length-prefixed') const debug = require('debug') -const codes = require('./codes') -const once = require('once') +const constants = require('./circuit/constants') -const log = debug('libp2p:circuit:dialer') -log.err = debug('libp2p:circuit:error:dialer') +const OnionDialer = require('./circuit/onion-dialer') -const multicodec = require('./multicodec') +const log = debug('libp2p:circuit:transportdialer') +log.err = debug('libp2p:circuit:error:transportdialer') const createListener = require('./listener') @@ -25,42 +15,21 @@ class Dialer { * Creates an instance of Dialer. * @param {Swarm} swarm - the swarm * @param {any} options - config options - * @param {Function} handler - the handler that is passed to the listener on transport instantiation * * @memberOf Dialer */ - constructor (swarm, options, handler) { - if (isFunction(options)) { - handler = options - options = {} - } - - // if (!handler) { - // handler = () => {} - // } + constructor (swarm, options) { + options = options || {} this.swarm = swarm - this.relayPeers = new Map() - this.options = options - // this.handler = handler // this handler is passed to the listener + this.dialer = null - // get all the relay addresses for this swarm - const relays = this.filter(swarm._peerInfo.multiaddrs.toArray()) - - // if no explicit relays, add a default relay addr - if (relays.length === 0) { - this.swarm - ._peerInfo - .multiaddrs - .add(`/p2p-circuit/ipfs/${this.swarm._peerInfo.id.toB58String()}`) + // TODO: this really should be injected + if (options.dialer === constants.DIALER.TELESCOPE) { + // TODO: init telescope dialer + } else { + this.dialer = new OnionDialer(swarm, options) } - - this.dialSwarmRelays(relays) - - this.swarm.on('peer-mux-established', this.dialRelay.bind(this)) - this.swarm.on('peer-mux-closed', (peerInfo) => { - this.relayPeers.delete(peerInfo.id.toB58String()) - }) } get priority () { @@ -71,28 +40,6 @@ class Dialer { throw new Error('Priority is read only!') } - /** - * Dial the relays in the Addresses.Swarm config - * - * @param {Array} relays - * @return {void} - */ - dialSwarmRelays (relays) { - // if we have relay addresses in swarm config, then dial those relays - this.swarm.on('listening', () => { - relays.forEach((relay) => { - let relaySegments = relay - .toString() - .split('/p2p-circuit') - .filter(segment => segment.length) - - relaySegments.forEach((relaySegment) => { - this.dialRelay(this._peerInfoFromMa(relaySegment)) - }) - }) - }) - } - /** * Dial a peer over a relay * @@ -104,176 +51,7 @@ class Dialer { * @memberOf Dialer */ dial (ma, options, cb) { - if (isFunction(options)) { - cb = options - options = {} - } - - if (!cb) { - cb = () => {} - } - - let maddrs = multiaddr(ma).toString().split('/p2p-circuit').filter((a) => a.length) - if (maddrs.length > 0) { - const id = multiaddr(maddrs[maddrs.length - 1]).getPeerId() - if (this.swarm._peerInfo.id.toB58String() === id) { - let err = `Cant dial to self!` - log.err(err) - return cb(err) - } - } - - let dstConn = new Connection() - this._onionDial(maddrs, (err, conn) => { - if (err) { - log.err(err) - return cb(err) - } - dstConn.setInnerConn(conn) - cb(null, dstConn) - }) - - return dstConn - } - - /** - * Dial a peer using onion dial - dial all relays in the ma - * in sequence and circuit dest over the all the pipes - * - * @param {multiaddr} maddrs - * @param {Connection} relay - * @param {Function} cb - * @return {void} - * @private - */ - _onionDial (maddrs, relay, cb) { - if (isFunction(relay)) { - cb = relay - relay = null - } - - let dialAddr = maddrs.shift() - const dial = (dstAddr, relayConn) => { - if (maddrs.length) { - this._dialPeer(dstAddr, relayConn, (err, conn) => { - if (err) { - log.err(err) - return cb(err) - } - - dstAddr = multiaddr(dstAddr) - this.relayPeers.set(dstAddr.getPeerId(), conn) - this._upgradeToHop(this._peerInfoFromMa(dstAddr), conn, (err, upgraded) => { - if (err) { - log.err(err) - return cb(err) - } - - this._onionDial(maddrs, upgraded, cb) - }) - }) - } else { - this._dialPeer(dstAddr, relayConn, (err, conn) => { - if (err) { - return cb(err) - } - cb(null, conn) - }) - } - } - - if (maddrs.length > 0) { - const relayPeer = this._peerInfoFromMa(dialAddr) - this.dialRelay(relayPeer, once((err, relayConn) => { - if (err) { - log.err(err) - return cb(err) - } - - dial(maddrs.shift(), relayConn) - })) - } else { - dial(dialAddr, relay) - } - } - - /** - * Upgrade a raw connection to a relayed link (hop) - * - * @param {PeerInfo} pi - * @param {Connection} conn - * @param {Function} cb - * - * @return {Function|void} - * @private - */ - _upgradeToHop (pi, conn, cb) { - const proxyConn = new Connection() - const handler = this.swarm.connHandler(pi, multicodec.hop, proxyConn) - handler.handleNew(conn, (err, upgradedConn) => { - if (err) { - log.err(err) - return cb(err) - } - - if (!upgradedConn) { - proxyConn.setInnerConn(conn) - upgradedConn = proxyConn - } - - cb(null, upgradedConn) - }) - } - - /** - * Dial the destination peer over a relay - * - * @param {multiaddr} dstMa - * @param {Connection} relay - * @param {Function} cb - * @return {Function|void} - * @private - */ - _dialPeer (dstMa, relay, cb) { - if (isFunction(relay)) { - cb = relay - relay = null - } - - if (!cb) { - cb = () => {} - } - - dstMa = multiaddr(dstMa) - // if no relay provided, dial on all available relays until one succeeds - if (!relay) { - const relays = Array.from(this.relayPeers.values()) - let next = (nextRelay) => { - if (!nextRelay) { - let err = `no relay peers were found` - log.err(err) - return cb(err) - } - - this._dialPeer(dstMa, nextRelay, (err, conn) => { - if (err) { - log.err(err) - return next(relays.shift()) - } - cb(null, new Connection(conn)) - }) - } - next(relays.shift()) - } else { - this._negotiateRelay(relay, dstMa, (err, conn) => { - if (err) { - log.err(`An error has occurred negotiating the relay connection`, err) - return cb(err) - } - - return cb(null, conn) - }) - } + return this.dialer.dial(ma, options, cb) } /** @@ -292,122 +70,6 @@ class Dialer { return createListener(this.swarm, options, handler) } - /** - * Negotiate the relay connection - * - * @param {Connection} conn - a connection to the relay - * @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for - * @param {Function} cb - a callback with that return the negotiated relay connection - * @returns {void} - * - * @memberOf Dialer - */ - _negotiateRelay (conn, dstMa, cb) { - dstMa = multiaddr(dstMa) - - let stream = handshake({timeout: 1000 * 60}, cb) - let shake = stream.handshake - - log(`negotiating relay for peer ${dstMa.getPeerId()}`) - const values = [new Buffer(dstMa.toString())] - - pull( - pull.values(values), - lp.encode(), - pull.collect((err, encoded) => { - if (err) { - return cb(err) - } - - shake.write(encoded[0]) - shake.read(3, (err, data) => { - if (err) { - log.err(err) - return cb(err) - } - - if (Number(data.toString()) !== codes.SUCCESS) { - cb(new Error(`Got ${data.toString()} error code trying to dial over relay`)) - } - - cb(null, shake.rest()) - }) - }) - ) - - pull(stream, conn, stream) - } - - /** - * Get b58 string from multiaddr or peerinfo - * - * @param {Multiaddr|PeerInfo} peer - * @return {*} - * @private - */ - _getB58String (peer) { - let b58Id - if (peer instanceof multiaddr || (typeof peer === 'string' || peer instanceof String)) { - const relayMa = multiaddr(peer) - b58Id = relayMa.getPeerId() - } else { - b58Id = peer.id.toB58String() - } - - return b58Id - } - - /** - * Helper to make a peer info from a multiaddrs - * - * @param {Multiaddr} ma - * @return {PeerInfo} - * @private - */ - _peerInfoFromMa (ma) { - ma = multiaddr(ma) - const peer = new PeerInfo(PeerId.createFromB58String(ma.getPeerId())) - const addr = ma.decapsulate('ipfs') - if (addr.toString().length > 1 && addr.toString() !== '/') { - peer.multiaddrs.add(addr) - } - - return peer - } - - /** - * Dial a relay peer by its PeerInfo - * - * @param {PeerInfo} peer - the PeerInfo of the relay peer - * @param {Function} cb - a callback with the connection to the relay peer - * @returns {Function|void} - * - * @memberOf Dialer - */ - dialRelay (peer, cb) { - cb = once(cb || (() => {})) - - const b58Id = this._getB58String(peer) - const relay = this.relayPeers.get(b58Id) - if (relay) { - cb(null, relay) - } - - const relayConn = new Connection() - relayConn.setPeerInfo(peer) - // attempt to dia the relay so that we have a connection - this.swarm.dial(peer, multicodec.hop, once((err, conn) => { - if (err) { - log.err(err) - return cb(err) - } - - relayConn.setInnerConn(conn) - this.relayPeers.set(b58Id, relayConn) - cb(null, relayConn) - })) - } - /** * Filter check for all multiaddresses * that this transport can dial on @@ -418,12 +80,7 @@ class Dialer { * @memberOf Dialer */ filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } - return multiaddrs.filter((ma) => { - return mafmt.Circuit.matches(ma) - }) + return this.dialer.filter(multiaddrs) } } diff --git a/src/index.js b/src/index.js index c8f738e..92a3667 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,7 @@ 'use strict' module.exports = { - Relay: require('./relay'), + Relay: require('./circuit/relay'), Dialer: require('./dialer'), multicodec: require('./multicodec'), tag: 'Circuit' diff --git a/src/listener.js b/src/listener.js index 4e73b0e..dc3be10 100644 --- a/src/listener.js +++ b/src/listener.js @@ -8,7 +8,7 @@ const multiaddr = require('multiaddr') const handshake = require('pull-handshake') const Connection = require('interface-connection').Connection const mafmt = require('mafmt') -const codes = require('./codes') +const constants = require('./circuit/constants') const debug = require('debug') @@ -43,8 +43,9 @@ module.exports = (swarm, options, handler) => { return cb(err) } - peerInfo.multiaddrs.add(multiaddr(msg.toString())) // add the addr we got along with the relay request - shake.write(String(codes.SUCCESS)) + // add the addr we got along with the relay request + peerInfo.multiaddrs.add(multiaddr(msg.toString())) + shake.write(String(constants.RESPONSE.SUCCESS)) let newConn = new Connection(shake.rest(), conn) newConn.setPeerInfo(peerInfo) diff --git a/test/index.spec.js b/test/index.spec.js index bdb9e48..784712b 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -78,9 +78,9 @@ describe('test relay', function () { // its own waterfall([ (cb) => nodeA.swarm.transports['Circuit'] - .dialRelay(relayNode.peerInfo, cb), + .dialer.dialRelay(relayNode.peerInfo, cb), (conn, cb) => nodeB.swarm.transports['Circuit'] - .dialRelay(relayNode.peerInfo, cb) + .dialer.dialRelay(relayNode.peerInfo, cb) ], () => setTimeout(done, 1000)) // WS needs some time to initialize }) }) @@ -92,15 +92,7 @@ describe('test relay', function () { utils.stopNodes(testNodes, () => done()) }) - it('should dial to a node over a relay and write a value', function (done) { - utils.dialAndReverse(testNodes.nodeA, testNodes.nodeB, ['hello'], (err, result) => { - if (err) return done(err) - expect(result[0]).to.equal('hello'.split('').reverse('').join('')) - done() - }) - }) - - it('should dial to a node over a relay and write several values', function (done) { + it('should dial to a node over a relay and write values', function (done) { utils.dialAndReverse( testNodes.nodeA, testNodes.nodeB, @@ -207,13 +199,13 @@ describe('test relay', function () { // its own waterfall([ (cb) => nodeA.swarm.transports['Circuit'] - .dialRelay(relayNode1.peerInfo, cb), + .dialer.dialRelay(relayNode1.peerInfo, cb), (conn, cb) => nodeA.swarm.transports['Circuit'] - .dialRelay(relayNode2.peerInfo, cb), + .dialer.dialRelay(relayNode2.peerInfo, cb), (conn, cb) => nodeB.swarm.transports['Circuit'] - .dialRelay(relayNode1.peerInfo, cb), + .dialer.dialRelay(relayNode1.peerInfo, cb), (conn, cb) => nodeB.swarm.transports['Circuit'] - .dialRelay(relayNode2.peerInfo, cb) + .dialer.dialRelay(relayNode2.peerInfo, cb) ], () => setTimeout(done, 1000)) // WS needs some time to initialize }) }) @@ -337,13 +329,13 @@ describe('test relay', function () { // its own waterfall([ (cb) => nodeA.swarm.transports['Circuit'] - .dialRelay(relayNode1.peerInfo, cb), + .dialer.dialRelay(relayNode1.peerInfo, cb), (conn, cb) => nodeA.swarm.transports['Circuit'] - .dialRelay(relayNode2.peerInfo, cb), + .dialer.dialRelay(relayNode2.peerInfo, cb), (conn, cb) => nodeB.swarm.transports['Circuit'] - .dialRelay(relayNode1.peerInfo, cb), + .dialer.dialRelay(relayNode1.peerInfo, cb), (conn, cb) => nodeB.swarm.transports['Circuit'] - .dialRelay(relayNode2.peerInfo, cb), + .dialer.dialRelay(relayNode2.peerInfo, cb), (conn, cb) => relayNode1.dial(relayNode2.peerInfo, cb), (conn, cb) => relayNode2.dial(relayNode1.peerInfo, cb) ], () => setTimeout(done, 1000)) // WS needs some time to initialize