Skip to content
This repository was archived by the owner on Aug 29, 2023. It is now read-only.

Commit bff54fa

Browse files
feat: close server on maxConnections (#218)
* Close TCP server on maxConnections * Add unit test * chore: update jsdoc * Apply PR feedback * Update test Co-authored-by: Cayman <caymannava@gmail.com>
1 parent 5767c69 commit bff54fa

File tree

4 files changed

+226
-20
lines changed

4 files changed

+226
-20
lines changed

src/index.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import net from 'net'
22
import * as mafmt from '@multiformats/mafmt'
33
import { logger } from '@libp2p/logger'
44
import { toMultiaddrConnection } from './socket-to-conn.js'
5-
import { TCPListener } from './listener.js'
5+
import { CloseServerOnMaxConnectionsOpts, TCPListener } from './listener.js'
66
import { multiaddrToNetConfig } from './utils.js'
77
import { AbortError, CodeError } from '@libp2p/interfaces/errors'
88
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
@@ -35,6 +35,12 @@ export interface TCPOptions {
3535
* https://nodejs.org/api/net.html#servermaxconnections
3636
*/
3737
maxConnections?: number
38+
39+
/**
40+
* Close server (stop listening for new connections) if connections exceed a limit.
41+
* Open server (start listening for new connections) if connections fall below a limit.
42+
*/
43+
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
3844
}
3945

4046
/**
@@ -209,6 +215,7 @@ class TCP implements Transport {
209215
return new TCPListener({
210216
...options,
211217
maxConnections: this.opts.maxConnections,
218+
closeServerOnMaxConnections: this.opts.closeServerOnMaxConnections,
212219
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
213220
socketCloseTimeout: this.opts.socketCloseTimeout,
214221
metrics: this.components.metrics

src/listener.ts

+97-18
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
44
import { CODE_P2P } from './constants.js'
55
import {
66
getMultiaddrs,
7-
multiaddrToNetConfig
7+
multiaddrToNetConfig,
8+
NetConfig
89
} from './utils.js'
910
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
1011
import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection'
@@ -26,13 +27,22 @@ async function attemptClose (maConn: MultiaddrConnection) {
2627
}
2728
}
2829

30+
export interface CloseServerOnMaxConnectionsOpts {
31+
/** Server listens once connection count is less than `listenBelow` */
32+
listenBelow: number
33+
/** Close server once connection count is greater than or equal to `closeAbove` */
34+
closeAbove: number
35+
onListenError?: (err: Error) => void
36+
}
37+
2938
interface Context extends TCPCreateListenerOptions {
3039
handler?: (conn: Connection) => void
3140
upgrader: Upgrader
3241
socketInactivityTimeout?: number
3342
socketCloseTimeout?: number
3443
maxConnections?: number
3544
metrics?: Metrics
45+
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
3646
}
3747

3848
const SERVER_STATUS_UP = 1
@@ -44,7 +54,12 @@ export interface TCPListenerMetrics {
4454
events: CounterGroup
4555
}
4656

47-
type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
57+
type Status = {started: false} | {
58+
started: true
59+
listeningAddr: Multiaddr
60+
peerId: string | null
61+
netConfig: NetConfig
62+
}
4863

4964
export class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
5065
private readonly server: net.Server
@@ -69,6 +84,13 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
6984
this.server.maxConnections = context.maxConnections
7085
}
7186

87+
if (context.closeServerOnMaxConnections != null) {
88+
// Sanity check options
89+
if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) {
90+
throw Error('closeAbove must be >= listenBelow')
91+
}
92+
}
93+
7294
this.server
7395
.on('listening', () => {
7496
if (context.metrics != null) {
@@ -159,12 +181,33 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
159181

160182
socket.once('close', () => {
161183
this.connections.delete(maConn)
184+
185+
if (
186+
this.context.closeServerOnMaxConnections != null &&
187+
this.connections.size < this.context.closeServerOnMaxConnections.listenBelow
188+
) {
189+
// The most likely case of error is if the port taken by this application is binded by
190+
// another process during the time the server if closed. In that case there's not much
191+
// we can do. netListen() will be called again every time a connection is dropped, which
192+
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
193+
this.netListen().catch(e => {
194+
log.error('error attempting to listen server once connection count under limit', e)
195+
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
196+
})
197+
}
162198
})
163199

164200
if (this.context.handler != null) {
165201
this.context.handler(conn)
166202
}
167203

204+
if (
205+
this.context.closeServerOnMaxConnections != null &&
206+
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
207+
) {
208+
this.netClose()
209+
}
210+
168211
this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
169212
})
170213
.catch(async err => {
@@ -220,34 +263,70 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
220263
}
221264

222265
async listen (ma: Multiaddr) {
266+
if (this.status.started) {
267+
throw Error('server is already listening')
268+
}
269+
223270
const peerId = ma.getPeerId()
224271
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma
225272

226-
this.status = { started: true, listeningAddr, peerId }
273+
this.status = {
274+
started: true,
275+
listeningAddr,
276+
peerId,
277+
netConfig: multiaddrToNetConfig(listeningAddr)
278+
}
227279

228-
return await new Promise<void>((resolve, reject) => {
229-
const options = multiaddrToNetConfig(listeningAddr)
230-
this.server.on('error', (err) => {
231-
reject(err)
232-
})
233-
this.server.listen(options, () => {
234-
log('Listening on %s', this.server.address())
235-
resolve()
236-
})
237-
})
280+
await this.netListen()
238281
}
239282

240283
async close () {
241-
if (!this.server.listening) {
242-
return
243-
}
244-
245284
await Promise.all(
246285
Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn))
247286
)
248287

288+
// netClose already checks if server.listening
289+
this.netClose()
290+
}
291+
292+
private async netListen (): Promise<void> {
293+
if (!this.status.started || this.server.listening) {
294+
return
295+
}
296+
297+
const netConfig = this.status.netConfig
298+
249299
await new Promise<void>((resolve, reject) => {
250-
this.server.close(err => (err != null) ? reject(err) : resolve())
300+
// NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error'
301+
this.server.once('error', reject)
302+
this.server.listen(netConfig, resolve)
251303
})
304+
305+
log('Listening on %s', this.server.address())
306+
}
307+
308+
private netClose (): void {
309+
if (!this.status.started || !this.server.listening) {
310+
return
311+
}
312+
313+
log('Closing server on %s', this.server.address())
314+
315+
// NodeJS implementation tracks listening status with `this._handle` property.
316+
// - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown
317+
// - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675
318+
// - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN
319+
//
320+
// NOTE: Both listen and close are technically not async actions, so it's not necessary to track
321+
// states 'pending-close' or 'pending-listen'
322+
323+
// From docs https://nodejs.org/api/net.html#serverclosecallback
324+
// Stops the server from accepting new connections and keeps existing connections.
325+
// 'close' event is emitted only emitted when all connections are ended.
326+
// The optional callback will be called once the 'close' event occurs.
327+
//
328+
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
329+
// to pass a callback to close.
330+
this.server.close()
252331
}
253332
}

src/utils.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import path from 'path'
66

77
const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }
88

9-
export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) {
9+
export type NetConfig = ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts)
10+
11+
export function multiaddrToNetConfig (addr: Multiaddr): NetConfig {
1012
const listenPath = addr.getPath()
1113

1214
// unix socket listening

test/max-connections-close.spec.ts

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import net from 'node:net'
2+
import { promisify } from 'util'
3+
import { expect } from 'aegir/chai'
4+
import { mockUpgrader } from '@libp2p/interface-mocks'
5+
import { multiaddr } from '@multiformats/multiaddr'
6+
import { tcp } from '../src/index.js'
7+
import type { TCPListener } from '../src/listener.js'
8+
9+
describe('close server on maxConnections', () => {
10+
const afterEachCallbacks: Array<() => Promise<any> | any> = []
11+
afterEach(async () => {
12+
await Promise.all(afterEachCallbacks.map(fn => fn()))
13+
afterEachCallbacks.length = 0
14+
})
15+
16+
it('reject dial of connection above closeAbove', async () => {
17+
const listenBelow = 2
18+
const closeAbove = 3
19+
const port = 9900
20+
21+
const seenRemoteConnections = new Set<string>()
22+
const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })()
23+
24+
const upgrader = mockUpgrader()
25+
const listener = trasnport.createListener({ upgrader }) as TCPListener
26+
// eslint-disable-next-line @typescript-eslint/promise-function-async
27+
afterEachCallbacks.push(() => listener.close())
28+
await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
29+
30+
listener.addEventListener('connection', (conn) => {
31+
seenRemoteConnections.add(conn.detail.remoteAddr.toString())
32+
})
33+
34+
function createSocket (): net.Socket {
35+
const socket = net.connect({ port })
36+
37+
// eslint-disable-next-line @typescript-eslint/promise-function-async
38+
afterEachCallbacks.unshift(async () => {
39+
if (!socket.destroyed) {
40+
socket.destroy()
41+
await new Promise((resolve) => socket.on('close', resolve))
42+
}
43+
})
44+
45+
return socket
46+
}
47+
48+
async function assertConnectedSocket (i: number) {
49+
const socket = createSocket()
50+
51+
await new Promise<void>((resolve, reject) => {
52+
socket.once('connect', () => {
53+
resolve()
54+
})
55+
socket.once('error', (err) => {
56+
err.message = `Socket[${i}] ${err.message}`
57+
reject(err)
58+
})
59+
})
60+
61+
return socket
62+
}
63+
64+
async function assertRefusedSocket (i: number) {
65+
const socket = createSocket()
66+
67+
await new Promise<void>((resolve, reject) => {
68+
socket.once('connect', () => {
69+
reject(Error(`Socket[${i}] connected but was expected to reject`))
70+
})
71+
socket.once('error', (err) => {
72+
if (err.message.includes('ECONNREFUSED')) {
73+
resolve()
74+
} else {
75+
err.message = `Socket[${i}] unexpected error ${err.message}`
76+
reject(err)
77+
}
78+
})
79+
})
80+
}
81+
82+
async function assertServerConnections (connections: number) {
83+
// Expect server connections but allow time for sockets to connect or disconnect
84+
for (let i = 0; i < 100; i++) {
85+
// eslint-disable-next-line @typescript-eslint/dot-notation
86+
if (listener['connections'].size === connections) {
87+
return
88+
} else {
89+
await promisify(setTimeout)(10)
90+
}
91+
}
92+
// eslint-disable-next-line @typescript-eslint/dot-notation
93+
expect(listener['connections'].size).equals(connections, 'Wrong server connections')
94+
}
95+
96+
const socket1 = await assertConnectedSocket(1)
97+
const socket2 = await assertConnectedSocket(2)
98+
const socket3 = await assertConnectedSocket(3)
99+
await assertServerConnections(3)
100+
// Limit reached, server should be closed here
101+
await assertRefusedSocket(4)
102+
await assertRefusedSocket(5)
103+
// Destroy sockets to be have connections < listenBelow
104+
socket1.destroy()
105+
socket2.destroy()
106+
await assertServerConnections(1)
107+
// Attempt to connect more sockets
108+
const socket6 = await assertConnectedSocket(6)
109+
const socket7 = await assertConnectedSocket(7)
110+
await assertServerConnections(3)
111+
// Limit reached, server should be closed here
112+
await assertRefusedSocket(8)
113+
114+
expect(socket3.destroyed).equals(false, 'socket3 must not destroyed')
115+
expect(socket6.destroyed).equals(false, 'socket6 must not destroyed')
116+
expect(socket7.destroyed).equals(false, 'socket7 must not destroyed')
117+
})
118+
})

0 commit comments

Comments
 (0)