Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
feat: Unix domain sockets (#208)
Browse files Browse the repository at this point in the history
This PR adds the functionality to handle UNIX domain sockets.

* This works with plain `/unix` multiaddresses but is unable to handle p2p encapsulated addresses (`/unix/.../p2p/...`) due to ambiguity. Because of this it causes errors when attempting to use `/unix` addresses in js-libp2p because it appends the `/p2p` address.
* On Windows this uses named pipes instead.

Despite the fact that it does not work with `/unix` addresses in js-libp2p directly, this still remains useful and is needed in other projects like [js-libp2p-daemon](https://github.com/libp2p/js-libp2p-daemon).

Refs #132.

Co-authored-by: saul <saul@organicdesign.nz>
  • Loading branch information
Saul and saul authored Sep 14, 2022
1 parent 962e399 commit 223f79b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// p2p multi-address code
export const CODE_P2P = 421
export const CODE_CIRCUIT = 290
export const CODE_UNIX = 400

// Time to wait for a connection to close gracefully before destroying it manually
export const CLOSE_TIMEOUT = 2000
Expand Down
15 changes: 10 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
import { createListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from '@libp2p/interfaces/errors'
import { CODE_CIRCUIT, CODE_P2P } from './constants.js'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Socket } from 'net'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Connection } from '@libp2p/interface-connection'

Expand Down Expand Up @@ -75,19 +75,20 @@ export class TCP implements Transport {

return await new Promise<Socket>((resolve, reject) => {
const start = Date.now()
const cOpts = multiaddrToNetConfig(ma)
const cOpts = multiaddrToNetConfig(ma) as (IpcSocketConnectOpts & TcpSocketConnectOpts)
const cOptsStr = cOpts.path ?? `${cOpts.host ?? ''}:${cOpts.port}`

log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)

const onError = (err: Error) => {
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
err.message = `connection error ${cOptsStr}: ${err.message}`

done(err)
}

const onTimeout = () => {
log('connection timeout %s:%s', cOpts.host, cOpts.port)
log('connection timeout %s', cOptsStr)

const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
Expand Down Expand Up @@ -155,6 +156,10 @@ export class TCP implements Transport {
return false
}

if (ma.protoCodes().includes(CODE_UNIX)) {
return true
}

return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P))
})
}
Expand Down
24 changes: 12 additions & 12 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,19 @@ export function createListener (context: Context) {
}

if (typeof address === 'string') {
throw new Error('Incorrect server address type')
}

try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
addrs = [listeningAddr]
} else {
try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
}

return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
Expand Down
22 changes: 12 additions & 10 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { logger } from '@libp2p/logger'
import toIterable from 'stream-to-it'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
import { multiaddrToNetConfig } from './utils.js'
import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -52,13 +53,14 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
remoteAddr = toMultiaddr(socket.remoteAddress, socket.remotePort)
}

const { host, port } = remoteAddr.toOptions()
const lOpts = multiaddrToNetConfig(remoteAddr)
const lOptsStr = lOpts.path ?? `${lOpts.host ?? ''}:${lOpts.port ?? ''}`
const { sink, source } = toIterable.duplex(socket)

// by default there is no timeout
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s:%s socket read timeout', host, port)
log('%s socket read timeout', lOptsStr)

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -72,7 +74,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
})

socket.once('close', () => {
log('%s:%s socket closed', host, port)
log('%s socket read timeout', lOptsStr)

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand Down Expand Up @@ -119,36 +121,36 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti

async close () {
if (socket.destroyed) {
log('%s:%s socket was already destroyed when trying to close', host, port)
log('%s socket was already destroyed when trying to close', lOptsStr)
return
}

log('%s:%s closing socket', host, port)
log('%s closing socket', lOptsStr)
await new Promise<void>((resolve, reject) => {
const start = Date.now()

// Attempt to end the socket. If it takes longer to close than the
// timeout, destroy it manually.
const timeout = setTimeout(() => {
if (socket.destroyed) {
log('%s:%s is already destroyed', host, port)
log('%s is already destroyed', lOptsStr)
resolve()
} else {
log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start)
log('%s socket close timeout after %dms, destroying it manually', lOptsStr, Date.now() - start)

// will trigger 'error' and 'close' events that resolves promise
socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT'))
}
}, closeTimeout).unref()

socket.once('close', () => {
log('%s:%s socket closed', host, port)
log('%s socket closed', lOptsStr)
// socket completely closed
clearTimeout(timeout)
resolve()
})
socket.once('error', (err: Error) => {
log('%s:%s socket error', host, port, err)
log('%s socket error', lOptsStr, err)

// error closing socket
if (maConn.timeline.close == null) {
Expand All @@ -171,7 +173,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
if (socket.writableLength > 0) {
// there are outgoing bytes waiting to be sent
socket.once('drain', () => {
log('%s:%s socket drained', host, port)
log('%s socket drained', lOptsStr)

// all bytes have been sent we can destroy the socket (maybe) before the timeout
socket.destroy()
Expand Down
12 changes: 9 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { Multiaddr } from '@multiformats/multiaddr'
import type { ListenOptions, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import os from 'os'
import path from 'path'

const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }

export function multiaddrToNetConfig (addr: Multiaddr) {
export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) {
const listenPath = addr.getPath()

// unix socket listening
if (listenPath != null) {
// TCP should not return unix socket else need to refactor listener which accepts connection options object
throw new Error('Unix Sockets are not supported by the TCP transport')
if (os.platform() === 'win32') {
// Use named pipes on Windows systems.
return { path: path.join('\\\\.\\pipe\\', listenPath) }
} else {
return { path: listenPath }
}
}

// tcp listening
Expand Down
12 changes: 5 additions & 7 deletions test/listen-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ describe('listen', () => {
}
})

// TCP doesn't support unix paths
it.skip('listen on path', async () => {
const mh = new Multiaddr(`/unix${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
it('listen on path', async () => {
const mh = new Multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)

listener = tcp.createListener({
upgrader
Expand Down Expand Up @@ -207,9 +206,8 @@ describe('dial', () => {
await listener.close()
})

// TCP doesn't support unix paths
it.skip('dial on path', async () => {
const ma = new Multiaddr(`/unix${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
it('dial on path', async () => {
const ma = new Multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)

const listener = tcp.createListener({
upgrader
Expand All @@ -226,7 +224,7 @@ describe('dial', () => {
async (source) => await all(source)
)

expect(values).to.deep.equal(['hey'])
expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey'))
await conn.close()
await listener.close()
})
Expand Down

0 comments on commit 223f79b

Please sign in to comment.