Skip to content

Commit

Permalink
feat: add support for unix multiaddr listen (#10)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The --sock param/flag has been replaced by --listen, which now expects a multiaddr string.

Example: `jsp2pd --sock=/tmp/p2p.sock` would now be `jsp2pd --listen=/unix/tmp/p2p.sock`

* feat: add support for unix multiaddr listen
* feat: add support for hostAddrs flag
* feat: add support for websockets
* feat: add announceAddrs support
* test: split up tests into files
* feat: use multiaddr instead of path for everything
* feat: update stream handler to use multiaddr bytes
* chore: fix lint
* chore: update multiaddr dep
* test: fix test runners
* fix: add a default host address
* fix: catch decapsulate errors when no ipfs present
* chore: fix feedback
  • Loading branch information
jacobheun authored Feb 26, 2019
1 parent c131161 commit 9106d68
Show file tree
Hide file tree
Showing 15 changed files with 916 additions and 600 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test -t node -f test/**/*.js",
"test:node": "aegir test -t node -f test/**/*.js",
"test": "aegir test -t node -f test/*.js test/**/*.js",
"test:node": "aegir test -t node -f test/*.js test/**/*.js",
"release": "aegir release --no-build -t node",
"release-minor": "aegir release --no-build --type minor -t node",
"release-major": "aegir release --no-build --type major -t node"
Expand Down Expand Up @@ -54,7 +54,8 @@
"libp2p-mplex": "~0.8.4",
"libp2p-secio": "~0.11.1",
"libp2p-tcp": "~0.13.0",
"multiaddr": "^6.0.3",
"libp2p-websockets": "~0.12.2",
"multiaddr": "^6.0.5",
"peer-book": "~0.9.0",
"peer-id": "~0.12.0",
"peer-info": "~0.15.0",
Expand Down
16 changes: 13 additions & 3 deletions src/cli/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ const log = console.log

const main = async (processArgs) => {
parser.yargs
.option('sock', {
desc: 'daemon control socket path',
.option('listen', {
desc: 'daemon control listen multiaddr',
type: 'string',
default: '/tmp/p2pd.sock'
default: '/unix/tmp/p2pd.sock'
})
.option('quiet', {
alias: 'q',
Expand All @@ -28,6 +28,16 @@ const main = async (processArgs) => {
type: 'string',
default: ''
})
.option('hostAddrs', {
desc: 'Comma separated list of multiaddrs the host should listen on',
type: 'string',
default: ''
})
.option('announceAddrs', {
desc: 'Comma separated list of multiaddrs the host should announce to the network',
type: 'string',
default: ''
})
.option('bootstrap', {
alias: 'b',
desc: 'Connects to bootstrap peers and bootstraps the dht if enabled',
Expand Down
17 changes: 9 additions & 8 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

const net = require('net')
const Socket = net.Socket
const path = require('path')
const { encode, decode } = require('length-prefixed-stream')
const { Request } = require('./protocol')
const LIMIT = 1 << 22 // 4MB

const { ends } = require('../src/util')
const { ends, multiaddrToNetConfig } = require('./util')

class Client {
constructor (socketPath) {
this.path = path.resolve(socketPath)
constructor (addr) {
this.multiaddr = addr
this.server = null
this.socket = new Socket({
readable: true,
Expand All @@ -27,7 +26,8 @@ class Client {
*/
attach () {
return new Promise((resolve, reject) => {
this.socket.connect(this.path, (err) => {
const options = multiaddrToNetConfig(this.multiaddr)
this.socket.connect(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand All @@ -37,11 +37,11 @@ class Client {
/**
* Starts a server listening at `socketPath`. New connections
* will be sent to the `connectionHandler`.
* @param {string} socketPath
* @param {Multiaddr} addr
* @param {function(Stream)} connectionHandler
* @returns {Promise}
*/
async startServer (socketPath, connectionHandler) {
async startServer (addr, connectionHandler) {
if (this.server) {
await this.stopServer()
}
Expand All @@ -50,7 +50,8 @@ class Client {
allowHalfOpen: true
}, connectionHandler)

this.server.listen(path.resolve(socketPath), (err) => {
const options = multiaddrToNetConfig(addr)
this.server.listen(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand Down
41 changes: 26 additions & 15 deletions src/daemon.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
'use strict'

const net = require('net')
const path = require('path')
const Libp2p = require('./libp2p')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const ma = require('multiaddr')
const CID = require('cids')
const { encode, decode } = require('length-prefixed-stream')
const { multiaddrToNetConfig } = require('./util')
const {
Request,
DHTRequest,
Expand All @@ -23,14 +23,14 @@ class Daemon {
/**
* @constructor
* @param {object} options
* @param {string} options.socketPath
* @param {string} options.multiaddr
* @param {Libp2p} options.libp2pNode
*/
constructor ({
socketPath,
multiaddr,
libp2pNode
}) {
this.socketPath = socketPath
this.multiaddr = ma(multiaddr)
this.libp2p = libp2pNode
this.server = net.createServer({
allowHalfOpen: true
Expand All @@ -53,7 +53,7 @@ class Daemon {
PeerId.createFromBytes(peer)
)
addrs.forEach((a) => {
peerInfo.multiaddrs.add(multiaddr(a))
peerInfo.multiaddrs.add(ma(a))
})

return this.libp2p.dial(peerInfo)
Expand Down Expand Up @@ -116,15 +116,16 @@ class Daemon {
registerStreamHandler (request) {
return new Promise((resolve, reject) => {
const protocols = request.streamHandler.proto
const socketPath = path.resolve(request.streamHandler.path)
const addr = ma(request.streamHandler.addr)
const addrString = addr.toString()

// If we have a handler, end it
if (this.streamHandlers[socketPath]) {
this.streamHandlers[socketPath].end()
delete this.streamHandlers[socketPath]
if (this.streamHandlers[addrString]) {
this.streamHandlers[addrString].end()
delete this.streamHandlers[addrString]
}

const socket = this.streamHandlers[socketPath] = new net.Socket({
const socket = this.streamHandlers[addrString] = new net.Socket({
readable: true,
writable: true,
allowHalfOpen: true
Expand Down Expand Up @@ -153,7 +154,8 @@ class Daemon {
})
})

socket.connect(socketPath, (err) => {
const options = multiaddrToNetConfig(addr)
socket.connect(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand Down Expand Up @@ -181,7 +183,8 @@ class Daemon {
async start () {
await this.libp2p.start()
return new Promise((resolve, reject) => {
this.server.listen(path.resolve(this.socketPath), (err) => {
const options = multiaddrToNetConfig(this.multiaddr)
this.server.listen(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand Down Expand Up @@ -368,7 +371,15 @@ class Daemon {
id: this.libp2p.peerInfo.id.toBytes(),
// temporary removal of "/ipfs/..." from multiaddrs
// this will be solved in: https://github.com/libp2p/js-libp2p/issues/323
addrs: this.libp2p.peerInfo.multiaddrs.toArray().map(m => m.decapsulate('ipfs').buffer)
addrs: this.libp2p.peerInfo.multiaddrs.toArray().map(m => {
let buffer
try {
buffer = m.decapsulate('ipfs').buffer
} catch (_) {
buffer = m.buffer
}
return buffer
})
}
}))
break
Expand Down Expand Up @@ -486,7 +497,7 @@ function ErrorResponse (message) {
const createDaemon = async (options) => {
const libp2pNode = await Libp2p.createLibp2p(options)
const daemon = new Daemon({
socketPath: options.sock,
multiaddr: options.listen,
libp2pNode: libp2pNode
})

Expand Down
33 changes: 29 additions & 4 deletions src/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const WS = require('libp2p-websockets')
const Bootstrap = require('libp2p-bootstrap')
const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
Expand Down Expand Up @@ -199,6 +200,10 @@ class DHT {
}

class DaemonLibp2p extends Libp2p {
constructor (libp2pOpts, { announceAddrs }) {
super(libp2pOpts)
this.announceAddrs = announceAddrs
}
get contentRouting () {
return this._contentRouting
}
Expand Down Expand Up @@ -227,6 +232,14 @@ class DaemonLibp2p extends Libp2p {
return new Promise((resolve, reject) => {
super.start((err) => {
if (err) return reject(err)

// replace with announce addrs until libp2p supports this directly
if (this.announceAddrs.length > 0) {
this.peerInfo.multiaddrs.clear()
this.announceAddrs.forEach(addr => {
this.peerInfo.multiaddrs.add(addr)
})
}
resolve()
})
})
Expand Down Expand Up @@ -295,27 +308,34 @@ class DaemonLibp2p extends Libp2p {
* @param {boolean} opts.connMgr
* @param {number} opts.connMgrLo
* @param {number} opts.connMgrHi
* @param {string} opts.sock
* @param {string} opts.id
* @param {string} opts.bootstrapPeers
* @param {string} opts.hostAddrs
* @returns {Libp2p}
*/
const createLibp2p = async ({
bootstrap,
bootstrapPeers,
hostAddrs,
announceAddrs,
dht,
dhtClient,
connMgr,
connMgrLo,
connMgrHi,
sock,
id
} = {}) => {
const peerInfo = await getPeerInfo(id)
const peerBook = new PeerBook()
const bootstrapList = bootstrapPeers ? bootstrapPeers.split(',').filter(s => s !== '') : null
const listenAddrs = hostAddrs ? hostAddrs.split(',').filter(s => s !== '') : ['/ip4/0.0.0.0/tcp/0']

peerInfo.multiaddrs.add(multiaddr('/ip4/0.0.0.0/tcp/0'))
announceAddrs = announceAddrs ? announceAddrs.split(',').filter(s => s !== '') : []
announceAddrs = announceAddrs.map(addr => multiaddr(addr))

listenAddrs.forEach(addr => {
peerInfo.multiaddrs.add(multiaddr(addr))
})

const libp2p = new DaemonLibp2p({
peerBook,
Expand All @@ -326,7 +346,8 @@ const createLibp2p = async ({
},
modules: {
transport: [
TCP
TCP,
WS
],
streamMuxer: [
MPLEX
Expand Down Expand Up @@ -362,6 +383,10 @@ const createLibp2p = async ({
pubsub: false
}
}
}, {
// using a secondary config until https://github.com/libp2p/js-libp2p/issues/202
// is completed
announceAddrs
})

return libp2p
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ message StreamOpenRequest {
}
message StreamHandlerRequest {
required string path = 1;
required bytes addr = 1;
repeated string proto = 2;
}
Expand Down
21 changes: 21 additions & 0 deletions src/util/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
'use strict'

const os = require('os')
const { resolve } = require('path')

exports.first = async iterator => {
for await (const value of iterator) return value
}
Expand All @@ -15,3 +18,21 @@ exports.ends = iterator => {
iterator.last = () => exports.last(iterator)
return iterator
}

/**
* Converts the multiaddr to a nodejs NET compliant option
* for .connect or .listen
* @param {Multiaddr} addr
* @returns {string|object} A nodejs NET compliant option
*/
exports.multiaddrToNetConfig = function multiaddrToNetConfig (addr) {
const listenPath = addr.getPath()
// unix socket listening
if (listenPath) {
return resolve(listenPath)
}
// tcp listening
return addr.nodeAddress()
}

exports.isWindows = os.platform() === 'win32'
Loading

0 comments on commit 9106d68

Please sign in to comment.