Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
feat: add types (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain authored Jun 10, 2021
1 parent 4789cf1 commit 3249e02
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 90 deletions.
7 changes: 7 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = {
build: {
config: {
platform: 'node'
}
}
}
3 changes: 2 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: npm install
- run: npx aegir lint
- run: npm run lint
- run: npx aegir dep-check -- -i wrtc -i electron-webrtc
- run: npm run build
test-node:
needs: check
runs-on: ${{ matrix.os }}
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ package-lock.json
coverage
.nyc_output
docs

dist
13 changes: 8 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
"main": "src/index.js",
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"release": "aegir release -t node --no-build",
"release-minor": "aegir release -t node --type minor --no-build",
"release-major": "aegir-release -t node --type major --no-build",
"release": "aegir release -t node",
"release-minor": "aegir release -t node --type minor",
"release-major": "aegir-release -t node --type major",
"coverage": "nyc --reporter=text --reporter=lcov npm run test:node"
},
"pre-push": [
Expand All @@ -37,10 +38,12 @@
"engines": {
"node": ">=14.0.0"
},
"types": "dist/src/index.d.ts",
"devDependencies": {
"aegir": "^33.0.0",
"@types/debug": "^4.1.5",
"aegir": "^33.2.0",
"it-pipe": "^1.1.0",
"libp2p-interfaces": "^0.9.0",
"libp2p-interfaces": "^0.11.0",
"sinon": "^10.0.1",
"streaming-iterables": "^5.0.2"
},
Expand Down
29 changes: 19 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const net = require('net')
const mafmt = require('mafmt')
// Missing Type
// @ts-ignore
const withIs = require('class-is')
const errCode = require('err-code')
const log = require('debug')('libp2p:tcp')
Expand All @@ -14,6 +16,9 @@ const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('libp2p-interfaces/src/transport/types').Upgrader} Upgrader
* @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener
* @typedef {import('net').Socket} Socket
*/

class TCP {
Expand All @@ -33,8 +38,8 @@ class TCP {
* @async
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal - Used to abort dial requests
* @returns {Connection} An upgraded Connection
* @param {AbortSignal} [options.signal] - Used to abort dial requests
* @returns {Promise<Connection>} An upgraded Connection
*/
async dial (ma, options) {
options = options || {}
Expand All @@ -50,7 +55,7 @@ class TCP {
* @private
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal - Used to abort dial requests
* @param {AbortSignal} [options.signal] - Used to abort dial requests
* @returns {Promise<Socket>} Resolves a TCP Socket
*/
_connect (ma, options = {}) {
Expand All @@ -65,13 +70,13 @@ class TCP {
log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)

const onError = err => {
const onError = /** @param {Error} err */ err => {
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
done(err)
}

const onTimeout = () => {
log('connnection timeout %s:%s', cOpts.host, cOpts.port)
log('connection timeout %s:%s', cOpts.host, cOpts.port)
const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
rawSocket.emit('error', err)
Expand All @@ -88,7 +93,7 @@ class TCP {
done(new AbortError())
}

const done = err => {
const done = /** @param {Error} [err] */ err => {
rawSocket.removeListener('error', onError)
rawSocket.removeListener('timeout', onTimeout)
rawSocket.removeListener('connect', onConnect)
Expand All @@ -110,17 +115,21 @@ class TCP {
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*
* @param {*} [options]
* @param {function(Connection)} handler
* @param {* | function(Connection):void} options
* @param {function(Connection):void} [handler]
* @returns {Listener} A TCP listener
*/
createListener (options, handler) {
let listenerHandler

if (typeof options === 'function') {
handler = options
listenerHandler = options
options = {}
} else {
listenerHandler = handler
}
options = options || {}
return createListener({ handler, upgrader: this._upgrader }, options)
return createListener({ handler: listenerHandler, upgrader: this._upgrader }, options)
}

/**
Expand Down
149 changes: 91 additions & 58 deletions src/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,30 @@
const net = require('net')
const EventEmitter = require('events')
const debug = require('debug')
const log = debug('libp2p:tcp:listener')
log.error = debug('libp2p:tcp:listener:error')

const log = Object.assign(
debug('libp2p:tcp:listener'),
{ error: debug('libp2p:tcp:listener:error') })
const toConnection = require('./socket-to-conn')
const { CODE_P2P } = require('./constants')
const {
getMultiaddrs,
multiaddrToNetConfig
} = require('./utils')

/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('libp2p-interfaces/src/transport/types').Upgrader} Upgrader
* @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection
* @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener
* @typedef {import('net').Server & {__connections: MultiaddrConnection[]}} Server
*/

/**
* Attempts to close the given maConn. If a failure occurs, it will be logged.
*
* @private
* @param {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} maConn
* @param {MultiaddrConnection} maConn
*/
async function attemptClose (maConn) {
try {
Expand All @@ -27,13 +36,80 @@ async function attemptClose (maConn) {
}
}

/**
* Create listener
*
* @param {object} context
* @param {function(Connection):void} context.handler
* @param {Upgrader} context.upgrader
* @param {*} options
* @returns {Listener}
*/
module.exports = ({ handler, upgrader }, options) => {
const listener = new EventEmitter()
/** @type {Server} */
// eslint-disable-next-line prefer-const
let server

/** @type {string | null} */
let peerId

/** @type {Multiaddr} */
let listeningAddr

const listener = Object.assign(new EventEmitter(), {
getAddrs: () => {
/** @type {Multiaddr[]} */
let addrs = []
/** @type {import('net').AddressInfo} */
// @ts-ignore
const address = server.address()

if (!address) {
throw new Error('Listener is not ready yet')
}

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma)
},
listen: async (/** @type {Multiaddr} */ ma) => {
listeningAddr = ma
peerId = ma.getPeerId()

if (peerId) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}

return new Promise((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, (/** @type {any} */ err) => {
if (err) return reject(err)
log('Listening on %s', server.address())
resolve(undefined)
})
})
},
close: async () => {
if (!server.listening) return

return new Promise((resolve, reject) => {
server.__connections.forEach(maConn => attemptClose(maConn))
server.close(err => err ? reject(err) : resolve(undefined))
})
}
})

const server = net.createServer(async socket => {
server = Object.assign(net.createServer(async socket => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => log('socket error', err))

/** @type {MultiaddrConnection} */
let maConn
let conn
try {
Expand All @@ -42,6 +118,7 @@ module.exports = ({ handler, upgrader }, options) => {
conn = await upgrader.upgradeInbound(maConn)
} catch (err) {
log.error('inbound connection failed', err)
// @ts-ignore
return attemptClose(maConn)
}

Expand All @@ -51,73 +128,29 @@ module.exports = ({ handler, upgrader }, options) => {

if (handler) handler(conn)
listener.emit('connection', conn)
})
}),
// Keep track of open connections to destroy in case of timeout
{ __connections: [] })

server
.on('listening', () => listener.emit('listening'))
.on('error', err => listener.emit('error', err))
.on('close', () => listener.emit('close'))

// Keep track of open connections to destroy in case of timeout
server.__connections = []

listener.close = () => {
if (!server.listening) return

return new Promise((resolve, reject) => {
server.__connections.forEach(maConn => attemptClose(maConn))
server.close(err => err ? reject(err) : resolve())
})
}

let peerId, listeningAddr

listener.listen = ma => {
listeningAddr = ma
peerId = ma.getPeerId()

if (peerId) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}

return new Promise((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, err => {
if (err) return reject(err)
log('Listening on %s', server.address())
resolve()
})
})
}

listener.getAddrs = () => {
let addrs = []
const address = server.address()

if (!address) {
throw new Error('Listener is not ready yet')
}

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma)
}

return listener
}

/**
* @param {Server} server
* @param {MultiaddrConnection} maConn
*/
function trackConn (server, maConn) {
server.__connections.push(maConn)

const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
}

// @ts-ignore
maConn.conn.once('close', untrackConn)
}
Loading

0 comments on commit 3249e02

Please sign in to comment.