From b23dd6aa722ca1c199a68157fc8a9be2a828aaba Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Mon, 18 Sep 2023 13:29:03 +0200 Subject: [PATCH 01/10] Fix the bug of listner intermediary state --- packages/transport-tcp/src/listener.ts | 78 +++++++++++++++++++------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index c7de579471..a92e944d77 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -55,8 +55,21 @@ export interface TCPListenerMetrics { events: CounterGroup } -type Status = { started: false } | { - started: true +enum TCPListenerStatusCode { + /** + * When server object is initialized but we don't know the listening address yet or + * the server object is stopped manually, can be resumed only by calling listen() + **/ + INERT = 'inert', + /* When listener is aware of the address but the server is not started listening */ + INITIALIZED = 'initializing', + LISTENING = 'listening', + /* During the connection limits */ + PAUSED = 'paused', +} + +type Status = { code: TCPListenerStatusCode.INERT } | { + code: Exclude listeningAddr: Multiaddr peerId: string | null netConfig: NetConfig @@ -66,7 +79,7 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { started: false } + private status: Status = { code: TCPListenerStatusCode.INERT } private metrics?: TCPListenerMetrics private addr: string @@ -144,6 +157,8 @@ export class TCPListener extends EventEmitter implements Listene this.dispatchEvent(new CustomEvent('error', { detail: err })) }) .on('close', () => { + if(this.status.code === TCPListenerStatusCode.PAUSED) return + this.metrics?.status.update({ [this.addr]: SERVER_STATUS_DOWN }) @@ -152,6 +167,9 @@ export class TCPListener extends EventEmitter implements Listene } private onSocket (socket: net.Socket): void { + if(this.status.code === TCPListenerStatusCode.INERT) { + throw new Error('Server is is not listening yet') + } // Avoid uncaught errors caused by unstable connections socket.on('error', err => { log('socket error', err) @@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter implements Listene let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr: this.status.started ? this.status.listeningAddr : undefined, + listeningAddr: this.status.code ? this.status.listeningAddr : undefined, socketInactivityTimeout: this.context.socketInactivityTimeout, socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, @@ -191,7 +209,7 @@ export class TCPListener extends EventEmitter implements Listene // another process during the time the server if closed. In that case there's not much // we can do. netListen() will be called again every time a connection is dropped, which // acts as an eventual retry mechanism. onListenError allows the consumer act on this. - this.netListen().catch(e => { + this.resume().catch(e => { log.error('error attempting to listen server once connection count under limit', e) this.context.closeServerOnMaxConnections?.onListenError?.(e as Error) }) @@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter implements Listene this.context.closeServerOnMaxConnections != null && this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove ) { - this.netClose() + this.pause(false).catch(e => { + log.error('error attempting to close server once connection count over limit', e) + }) } this.dispatchEvent(new CustomEvent('connection', { detail: conn })) @@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter implements Listene } getAddrs (): Multiaddr[] { - if (!this.status.started) { + if (this.status.code === TCPListenerStatusCode.INERT) { return [] } @@ -264,7 +284,7 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.started) { + if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) { throw Error('server is already listening') } @@ -273,13 +293,13 @@ export class TCPListener extends EventEmitter implements Listene const { backlog } = this.context this.status = { - started: true, + code: TCPListenerStatusCode.INITIALIZED, listeningAddr, peerId, netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) } - await this.netListen() + await this.resume() } async close (): Promise { @@ -287,12 +307,17 @@ export class TCPListener extends EventEmitter implements Listene Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) ) - // netClose already checks if server.listening - this.netClose() + await this.pause(true) } - private async netListen (): Promise { - if (!this.status.started || this.server.listening) { + /** + * Can resume a stopped or start an inert server + */ + private async resume (): Promise { + if ( + !(this.status.code === TCPListenerStatusCode.INITIALIZED || + this.status.code === TCPListenerStatusCode.PAUSED) || + this.server.listening) { return } @@ -303,12 +328,20 @@ export class TCPListener extends EventEmitter implements Listene this.server.once('error', reject) this.server.listen(netConfig, resolve) }) - + this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING } log('Listening on %s', this.server.address()) } - private netClose (): void { - if (!this.status.started || !this.server.listening) { + private async pause (permanent: boolean): Promise { + if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) { + this.status = { code: TCPListenerStatusCode.INERT } + return + } + + if ( + !(this.status.code === TCPListenerStatusCode.INITIALIZED || + this.status.code === TCPListenerStatusCode.LISTENING) || + !this.server.listening) { return } @@ -326,9 +359,12 @@ export class TCPListener extends EventEmitter implements Listene // Stops the server from accepting new connections and keeps existing connections. // 'close' event is emitted only emitted when all connections are ended. // The optional callback will be called once the 'close' event occurs. - // - // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary - // to pass a callback to close. - this.server.close() + + // We need to set this status before closing server, so other procedures are aware + // during the time the server is closing + this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED } + await new Promise((resolve, reject) => { + this.server.close( err => { err ? reject(err) : resolve() }) + }) } } From e47e041c4b5fc95b2d845039395af132f390b6a3 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Thu, 21 Sep 2023 12:37:44 +0200 Subject: [PATCH 02/10] Update the listner --- packages/transport-tcp/src/listener.ts | 75 +++++++++++++------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index a92e944d77..1cae6ea50e 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -46,9 +46,6 @@ interface Context extends TCPCreateListenerOptions { closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts } -const SERVER_STATUS_UP = 1 -const SERVER_STATUS_DOWN = 0 - export interface TCPListenerMetrics { status: MetricGroup errors: CounterGroup @@ -60,16 +57,14 @@ enum TCPListenerStatusCode { * When server object is initialized but we don't know the listening address yet or * the server object is stopped manually, can be resumed only by calling listen() **/ - INERT = 'inert', - /* When listener is aware of the address but the server is not started listening */ - INITIALIZED = 'initializing', - LISTENING = 'listening', + DOWN = 0, + UP = 1, /* During the connection limits */ - PAUSED = 'paused', + PAUSED = 2, } -type Status = { code: TCPListenerStatusCode.INERT } | { - code: Exclude +type Status = { code: TCPListenerStatusCode.DOWN } | { + code: Exclude listeningAddr: Multiaddr peerId: string | null netConfig: NetConfig @@ -79,7 +74,7 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { code: TCPListenerStatusCode.INERT } + private status: Status = { code: TCPListenerStatusCode.DOWN } private metrics?: TCPListenerMetrics private addr: string @@ -146,7 +141,7 @@ export class TCPListener extends EventEmitter implements Listene } this.metrics?.status.update({ - [this.addr]: SERVER_STATUS_UP + [this.addr]: TCPListenerStatusCode.UP }) } @@ -157,17 +152,21 @@ export class TCPListener extends EventEmitter implements Listene this.dispatchEvent(new CustomEvent('error', { detail: err })) }) .on('close', () => { - if(this.status.code === TCPListenerStatusCode.PAUSED) return - this.metrics?.status.update({ - [this.addr]: SERVER_STATUS_DOWN + [this.addr]: this.status.code }) - this.dispatchEvent(new CustomEvent('close')) + + // If this event is emitted, the transport manager will remove the listener from it's cache + // in the meanwhile if the connections are dropped then listener will start listening again + // and the transport manager will not be able to close the server + if(this.status.code !== TCPListenerStatusCode.PAUSED){ + this.dispatchEvent(new CustomEvent('close')) + } }) } private onSocket (socket: net.Socket): void { - if(this.status.code === TCPListenerStatusCode.INERT) { + if(this.status.code === TCPListenerStatusCode.DOWN) { throw new Error('Server is is not listening yet') } // Avoid uncaught errors caused by unstable connections @@ -252,7 +251,7 @@ export class TCPListener extends EventEmitter implements Listene } getAddrs (): Multiaddr[] { - if (this.status.code === TCPListenerStatusCode.INERT) { + if (this.status.code === TCPListenerStatusCode.DOWN) { return [] } @@ -284,7 +283,7 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) { + if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ) { throw Error('server is already listening') } @@ -292,14 +291,19 @@ export class TCPListener extends EventEmitter implements Listene const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma const { backlog } = this.context - this.status = { - code: TCPListenerStatusCode.INITIALIZED, - listeningAddr, - peerId, - netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) + try { + this.status = { + code: TCPListenerStatusCode.UP, + listeningAddr, + peerId, + netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) + } + + await this.resume() + } catch(err) { + this.status = {code: TCPListenerStatusCode.DOWN} + throw err } - - await this.resume() } async close (): Promise { @@ -314,10 +318,7 @@ export class TCPListener extends EventEmitter implements Listene * Can resume a stopped or start an inert server */ private async resume (): Promise { - if ( - !(this.status.code === TCPListenerStatusCode.INITIALIZED || - this.status.code === TCPListenerStatusCode.PAUSED) || - this.server.listening) { + if (this.server.listening || this.status.code === TCPListenerStatusCode.DOWN) { return } @@ -328,20 +329,18 @@ export class TCPListener extends EventEmitter implements Listene this.server.once('error', reject) this.server.listen(netConfig, resolve) }) - this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING } + + this.status = { ...this.status, code: TCPListenerStatusCode.UP } log('Listening on %s', this.server.address()) } private async pause (permanent: boolean): Promise { - if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) { - this.status = { code: TCPListenerStatusCode.INERT } + if(!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { + this.status = { code: TCPListenerStatusCode.DOWN } return } - if ( - !(this.status.code === TCPListenerStatusCode.INITIALIZED || - this.status.code === TCPListenerStatusCode.LISTENING) || - !this.server.listening) { + if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP){ return } @@ -362,7 +361,7 @@ export class TCPListener extends EventEmitter implements Listene // We need to set this status before closing server, so other procedures are aware // during the time the server is closing - this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED } + this.status = permanent ? { code: TCPListenerStatusCode.DOWN } : { ...this.status, code: TCPListenerStatusCode.PAUSED } await new Promise((resolve, reject) => { this.server.close( err => { err ? reject(err) : resolve() }) }) From 631123f9568e125431c9941895e5e4ee30816f3e Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Thu, 21 Sep 2023 12:37:58 +0200 Subject: [PATCH 03/10] Add unit tests for the connection limit --- .../test/connection-limits.spec.ts | 219 ++++++++++++++++++ .../test/max-connections-close.spec.ts | 121 ---------- 2 files changed, 219 insertions(+), 121 deletions(-) create mode 100644 packages/transport-tcp/test/connection-limits.spec.ts delete mode 100644 packages/transport-tcp/test/max-connections-close.spec.ts diff --git a/packages/transport-tcp/test/connection-limits.spec.ts b/packages/transport-tcp/test/connection-limits.spec.ts new file mode 100644 index 0000000000..d0197f33af --- /dev/null +++ b/packages/transport-tcp/test/connection-limits.spec.ts @@ -0,0 +1,219 @@ +import net from 'node:net' +import { promisify } from 'util' +import { EventEmitter } from '@libp2p/interface/events' +import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import { tcp } from '../src/index.js' +import type { TCPListener } from '../src/listener.js' + +const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise | any>) => { + function createSocket(i: number): net.Socket { + const socket = net.connect({ host: '127.0.0.1', port }) + + closeCallbacks.unshift(async function closeHandler(): Promise{ + if (!socket.destroyed) { + socket.destroy() + await new Promise((resolve) => socket.on('close', resolve)) + } + }) + return socket + } + + async function assertConnectedSocket(i: number) : Promise { + const socket = createSocket(i) + + await new Promise((resolve, reject) => { + socket.once('connect', () => { + resolve() + }) + socket.once('error', (err) => { + err.message = `Socket[${i}] ${err.message}` + reject(err) + }) + }) + + return socket + } + + async function assertRefusedSocket (i: number): Promise { + const socket = createSocket(i) + + await new Promise((resolve, reject) => { + socket.once('connect', () => { + reject(Error(`Socket[${i}] connected but was expected to reject`)) + }) + socket.once('error', (err) => { + if (err.message.includes('ECONNREFUSED')) { + resolve() + } else { + err.message = `Socket[${i}] unexpected error ${err.message}` + reject(err) + } + }) + }) + + return socket + } + + return { assertConnectedSocket, assertRefusedSocket } +} + + +async function assertServerConnections (listener: TCPListener, connections: number): Promise { + // Expect server connections but allow time for sockets to connect or disconnect + for (let i = 0; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/dot-notation + if (listener['connections'].size === connections) { + return + } else { + await promisify(setTimeout)(10) + } + } + // eslint-disable-next-line @typescript-eslint/dot-notation + expect(listener['connections'].size).equals(connections, 'Wrong server connections') +} + +describe('closeAbove/listenBelow', () => { + const afterEachCallbacks: Array<() => Promise | any> = [] + afterEach(async () => { + await Promise.all(afterEachCallbacks.map(fn => fn())) + afterEachCallbacks.length = 0 + }) + + it('reject dial of connection above closeAbove', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const {assertConnectedSocket, assertRefusedSocket} = buildSocketAssertions(port, afterEachCallbacks) + + await assertConnectedSocket(1) + await assertConnectedSocket(2) + await assertConnectedSocket(3) + await assertServerConnections(listener, 3) + + // Limit reached, server should be closed here + await assertRefusedSocket(4); + await assertRefusedSocket(5); + await assertServerConnections(listener, 3) + }) + + it('accepts dial of connection when connection drop listenBelow limit', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const {assertConnectedSocket} = buildSocketAssertions(port, afterEachCallbacks) + + const socket1 = await assertConnectedSocket(1) + const socket2 = await assertConnectedSocket(2) + await assertConnectedSocket(3) + await assertServerConnections(listener, 3) + + // Destroy sockets to be have connections < listenBelow + socket1.destroy() + socket2.destroy() + // After destroying 2 sockets connections will be below "listenBelow" limit + await assertServerConnections(listener, 1) + + // Now it should be able to accept new connections + await assertConnectedSocket(4); + await assertConnectedSocket(5); + + // 2 connections dropped and 2 new connections accepted + await assertServerConnections(listener, 3) + }) + + it('should not emit "close" event when server is stopped due to "closeAbove" limit', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + + let closeEventCallCount = 0; + listener.addEventListener('close', () => { + closeEventCallCount += 1 + }) + + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const {assertConnectedSocket} = buildSocketAssertions(port, afterEachCallbacks) + + + await assertConnectedSocket(1) + await assertConnectedSocket(2) + await assertConnectedSocket(3) + await assertServerConnections(listener, 3) + + // Limit reached, server should be closed but should not emit "close" event + expect(closeEventCallCount).equals(0) + }) + + it('should not emit "listening" event when server is resumed due to "listenBelow" limit', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + + let listeningEventCallCount = 0; + listener.addEventListener('listening', () => { + listeningEventCallCount += 1 + }) + + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const {assertConnectedSocket} = buildSocketAssertions(port, afterEachCallbacks) + + // Server should be listening now + expect(listeningEventCallCount).equals(1) + + const socket1 = await assertConnectedSocket(1) + const socket2 = await assertConnectedSocket(2) + await assertConnectedSocket(3) + // Limit reached, server should be closed now + await assertServerConnections(listener, 3) + + // Close some sockets to resume listening + socket1.destroy() + socket2.destroy() + + // Wait for listener to emit event + await promisify(setTimeout)(50) + + // Server should emit the "listening" event again + expect(listeningEventCallCount).equals(2) + }) +}) diff --git a/packages/transport-tcp/test/max-connections-close.spec.ts b/packages/transport-tcp/test/max-connections-close.spec.ts deleted file mode 100644 index 177c71bbe9..0000000000 --- a/packages/transport-tcp/test/max-connections-close.spec.ts +++ /dev/null @@ -1,121 +0,0 @@ -import net from 'node:net' -import { promisify } from 'util' -import { EventEmitter } from '@libp2p/interface/events' -import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' -import { multiaddr } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import { tcp } from '../src/index.js' -import type { TCPListener } from '../src/listener.js' - -describe('close server on maxConnections', () => { - const afterEachCallbacks: Array<() => Promise | any> = [] - afterEach(async () => { - await Promise.all(afterEachCallbacks.map(fn => fn())) - afterEachCallbacks.length = 0 - }) - - it('reject dial of connection above closeAbove', async () => { - const listenBelow = 2 - const closeAbove = 3 - const port = 9900 - - const seenRemoteConnections = new Set() - const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() - - const upgrader = mockUpgrader({ - events: new EventEmitter() - }) - const listener = trasnport.createListener({ upgrader }) as TCPListener - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) - await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) - - listener.addEventListener('connection', (conn) => { - seenRemoteConnections.add(conn.detail.remoteAddr.toString()) - }) - - function createSocket (): net.Socket { - const socket = net.connect({ host: '127.0.0.1', port }) - - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.unshift(async () => { - if (!socket.destroyed) { - socket.destroy() - await new Promise((resolve) => socket.on('close', resolve)) - } - }) - - return socket - } - - async function assertConnectedSocket (i: number): Promise { - const socket = createSocket() - - await new Promise((resolve, reject) => { - socket.once('connect', () => { - resolve() - }) - socket.once('error', (err) => { - err.message = `Socket[${i}] ${err.message}` - reject(err) - }) - }) - - return socket - } - - async function assertRefusedSocket (i: number): Promise { - const socket = createSocket() - - await new Promise((resolve, reject) => { - socket.once('connect', () => { - reject(Error(`Socket[${i}] connected but was expected to reject`)) - }) - socket.once('error', (err) => { - if (err.message.includes('ECONNREFUSED')) { - resolve() - } else { - err.message = `Socket[${i}] unexpected error ${err.message}` - reject(err) - } - }) - }) - } - - async function assertServerConnections (connections: number): Promise { - // Expect server connections but allow time for sockets to connect or disconnect - for (let i = 0; i < 100; i++) { - // eslint-disable-next-line @typescript-eslint/dot-notation - if (listener['connections'].size === connections) { - return - } else { - await promisify(setTimeout)(10) - } - } - // eslint-disable-next-line @typescript-eslint/dot-notation - expect(listener['connections'].size).equals(connections, 'Wrong server connections') - } - - const socket1 = await assertConnectedSocket(1) - const socket2 = await assertConnectedSocket(2) - const socket3 = await assertConnectedSocket(3) - await assertServerConnections(3) - // Limit reached, server should be closed here - await assertRefusedSocket(4) - await assertRefusedSocket(5) - // Destroy sockets to be have connections < listenBelow - socket1.destroy() - socket2.destroy() - await assertServerConnections(1) - // Attempt to connect more sockets - const socket6 = await assertConnectedSocket(6) - const socket7 = await assertConnectedSocket(7) - await assertServerConnections(3) - // Limit reached, server should be closed here - await assertRefusedSocket(8) - - expect(socket3.destroyed).equals(false, 'socket3 must not destroyed') - expect(socket6.destroyed).equals(false, 'socket6 must not destroyed') - expect(socket7.destroyed).equals(false, 'socket7 must not destroyed') - }) -}) From 2e5346a83cc35382cee492857328a3dad002f254 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Thu, 21 Sep 2023 17:14:26 +0200 Subject: [PATCH 04/10] Fix the lint errors --- packages/transport-tcp/src/listener.ts | 28 ++++++++-------- .../test/connection-limits.spec.ts | 32 +++++++++---------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index 1cae6ea50e..e1be29c72a 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -53,8 +53,8 @@ export interface TCPListenerMetrics { } enum TCPListenerStatusCode { - /** - * When server object is initialized but we don't know the listening address yet or + /** + * When server object is initialized but we don't know the listening address yet or * the server object is stopped manually, can be resumed only by calling listen() **/ DOWN = 0, @@ -159,14 +159,14 @@ export class TCPListener extends EventEmitter implements Listene // If this event is emitted, the transport manager will remove the listener from it's cache // in the meanwhile if the connections are dropped then listener will start listening again // and the transport manager will not be able to close the server - if(this.status.code !== TCPListenerStatusCode.PAUSED){ + if (this.status.code !== TCPListenerStatusCode.PAUSED) { this.dispatchEvent(new CustomEvent('close')) } }) } private onSocket (socket: net.Socket): void { - if(this.status.code === TCPListenerStatusCode.DOWN) { + if (this.status.code === TCPListenerStatusCode.DOWN) { throw new Error('Server is is not listening yet') } // Avoid uncaught errors caused by unstable connections @@ -178,7 +178,7 @@ export class TCPListener extends EventEmitter implements Listene let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr: this.status.code ? this.status.listeningAddr : undefined, + listeningAddr: this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ? this.status.listeningAddr : undefined, socketInactivityTimeout: this.context.socketInactivityTimeout, socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, @@ -283,7 +283,7 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ) { + if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED) { throw Error('server is already listening') } @@ -298,10 +298,10 @@ export class TCPListener extends EventEmitter implements Listene peerId, netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) } - + await this.resume() - } catch(err) { - this.status = {code: TCPListenerStatusCode.DOWN} + } catch (err) { + this.status = { code: TCPListenerStatusCode.DOWN } throw err } } @@ -335,12 +335,12 @@ export class TCPListener extends EventEmitter implements Listene } private async pause (permanent: boolean): Promise { - if(!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { + if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { this.status = { code: TCPListenerStatusCode.DOWN } - return + return } - if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP){ + if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP) { return } @@ -358,12 +358,12 @@ export class TCPListener extends EventEmitter implements Listene // Stops the server from accepting new connections and keeps existing connections. // 'close' event is emitted only emitted when all connections are ended. // The optional callback will be called once the 'close' event occurs. - + // We need to set this status before closing server, so other procedures are aware // during the time the server is closing this.status = permanent ? { code: TCPListenerStatusCode.DOWN } : { ...this.status, code: TCPListenerStatusCode.PAUSED } await new Promise((resolve, reject) => { - this.server.close( err => { err ? reject(err) : resolve() }) + this.server.close(err => { (err != null) ? reject(err) : resolve() }) }) } } diff --git a/packages/transport-tcp/test/connection-limits.spec.ts b/packages/transport-tcp/test/connection-limits.spec.ts index d0197f33af..6488219eba 100644 --- a/packages/transport-tcp/test/connection-limits.spec.ts +++ b/packages/transport-tcp/test/connection-limits.spec.ts @@ -7,11 +7,11 @@ import { expect } from 'aegir/chai' import { tcp } from '../src/index.js' import type { TCPListener } from '../src/listener.js' -const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise | any>) => { - function createSocket(i: number): net.Socket { +const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise | any>): { assertConnectedSocket: (i: number) => Promise, assertRefusedSocket: (i: number) => Promise } => { + function createSocket (i: number): net.Socket { const socket = net.connect({ host: '127.0.0.1', port }) - - closeCallbacks.unshift(async function closeHandler(): Promise{ + + closeCallbacks.unshift(async function closeHandler (): Promise { if (!socket.destroyed) { socket.destroy() await new Promise((resolve) => socket.on('close', resolve)) @@ -20,7 +20,7 @@ const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise return socket } - async function assertConnectedSocket(i: number) : Promise { + async function assertConnectedSocket (i: number): Promise { const socket = createSocket(i) await new Promise((resolve, reject) => { @@ -59,7 +59,6 @@ const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise return { assertConnectedSocket, assertRefusedSocket } } - async function assertServerConnections (listener: TCPListener, connections: number): Promise { // Expect server connections but allow time for sockets to connect or disconnect for (let i = 0; i < 100; i++) { @@ -95,7 +94,7 @@ describe('closeAbove/listenBelow', () => { // eslint-disable-next-line @typescript-eslint/promise-function-async afterEachCallbacks.push(() => listener.close()) await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) - const {assertConnectedSocket, assertRefusedSocket} = buildSocketAssertions(port, afterEachCallbacks) + const { assertConnectedSocket, assertRefusedSocket } = buildSocketAssertions(port, afterEachCallbacks) await assertConnectedSocket(1) await assertConnectedSocket(2) @@ -103,8 +102,8 @@ describe('closeAbove/listenBelow', () => { await assertServerConnections(listener, 3) // Limit reached, server should be closed here - await assertRefusedSocket(4); - await assertRefusedSocket(5); + await assertRefusedSocket(4) + await assertRefusedSocket(5) await assertServerConnections(listener, 3) }) @@ -122,7 +121,7 @@ describe('closeAbove/listenBelow', () => { // eslint-disable-next-line @typescript-eslint/promise-function-async afterEachCallbacks.push(() => listener.close()) await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) - const {assertConnectedSocket} = buildSocketAssertions(port, afterEachCallbacks) + const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) const socket1 = await assertConnectedSocket(1) const socket2 = await assertConnectedSocket(2) @@ -136,8 +135,8 @@ describe('closeAbove/listenBelow', () => { await assertServerConnections(listener, 1) // Now it should be able to accept new connections - await assertConnectedSocket(4); - await assertConnectedSocket(5); + await assertConnectedSocket(4) + await assertConnectedSocket(5) // 2 connections dropped and 2 new connections accepted await assertServerConnections(listener, 3) @@ -157,15 +156,14 @@ describe('closeAbove/listenBelow', () => { // eslint-disable-next-line @typescript-eslint/promise-function-async afterEachCallbacks.push(() => listener.close()) - let closeEventCallCount = 0; + let closeEventCallCount = 0 listener.addEventListener('close', () => { closeEventCallCount += 1 }) await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) - const {assertConnectedSocket} = buildSocketAssertions(port, afterEachCallbacks) + const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) - await assertConnectedSocket(1) await assertConnectedSocket(2) await assertConnectedSocket(3) @@ -189,13 +187,13 @@ describe('closeAbove/listenBelow', () => { // eslint-disable-next-line @typescript-eslint/promise-function-async afterEachCallbacks.push(() => listener.close()) - let listeningEventCallCount = 0; + let listeningEventCallCount = 0 listener.addEventListener('listening', () => { listeningEventCallCount += 1 }) await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) - const {assertConnectedSocket} = buildSocketAssertions(port, afterEachCallbacks) + const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) // Server should be listening now expect(listeningEventCallCount).equals(1) From 620cd8090f99673891041f70834960c35f4fda78 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Sat, 23 Sep 2023 15:36:11 +0200 Subject: [PATCH 05/10] Update code with feedback --- packages/transport-tcp/src/listener.ts | 47 ++++++++++--------- .../test/connection-limits.spec.ts | 4 +- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index e1be29c72a..72b6ed7e8e 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -1,4 +1,5 @@ import net from 'net' +import { CodeError } from '@libp2p/interface/errors' import { EventEmitter, CustomEvent } from '@libp2p/interface/events' import { logger } from '@libp2p/logger' import { CODE_P2P } from './constants.js' @@ -57,14 +58,14 @@ enum TCPListenerStatusCode { * When server object is initialized but we don't know the listening address yet or * the server object is stopped manually, can be resumed only by calling listen() **/ - DOWN = 0, - UP = 1, + INACTIVE = 0, + ACTIVE = 1, /* During the connection limits */ PAUSED = 2, } -type Status = { code: TCPListenerStatusCode.DOWN } | { - code: Exclude +type Status = { code: TCPListenerStatusCode.INACTIVE } | { + code: Exclude listeningAddr: Multiaddr peerId: string | null netConfig: NetConfig @@ -74,7 +75,7 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { code: TCPListenerStatusCode.DOWN } + private status: Status = { code: TCPListenerStatusCode.INACTIVE } private metrics?: TCPListenerMetrics private addr: string @@ -96,7 +97,7 @@ export class TCPListener extends EventEmitter implements Listene if (context.closeServerOnMaxConnections != null) { // Sanity check options if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) { - throw Error('closeAbove must be >= listenBelow') + throw new CodeError('closeAbove must be >= listenBelow', 'ERROR_CONNECTION_LIMITS') } } @@ -141,7 +142,7 @@ export class TCPListener extends EventEmitter implements Listene } this.metrics?.status.update({ - [this.addr]: TCPListenerStatusCode.UP + [this.addr]: TCPListenerStatusCode.ACTIVE }) } @@ -166,8 +167,8 @@ export class TCPListener extends EventEmitter implements Listene } private onSocket (socket: net.Socket): void { - if (this.status.code === TCPListenerStatusCode.DOWN) { - throw new Error('Server is is not listening yet') + if (this.status.code === TCPListenerStatusCode.INACTIVE) { + throw new CodeError('Server is is not listening yet', 'ERR_SERVER_NOT_RUNNING') } // Avoid uncaught errors caused by unstable connections socket.on('error', err => { @@ -178,7 +179,7 @@ export class TCPListener extends EventEmitter implements Listene let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr: this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ? this.status.listeningAddr : undefined, + listeningAddr: this.status.code === TCPListenerStatusCode.ACTIVE || this.status.code === TCPListenerStatusCode.PAUSED ? this.status.listeningAddr : undefined, socketInactivityTimeout: this.context.socketInactivityTimeout, socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, @@ -206,7 +207,7 @@ export class TCPListener extends EventEmitter implements Listene ) { // The most likely case of error is if the port taken by this application is binded by // another process during the time the server if closed. In that case there's not much - // we can do. netListen() will be called again every time a connection is dropped, which + // we can do. resume() will be called again every time a connection is dropped, which // acts as an eventual retry mechanism. onListenError allows the consumer act on this. this.resume().catch(e => { log.error('error attempting to listen server once connection count under limit', e) @@ -251,7 +252,7 @@ export class TCPListener extends EventEmitter implements Listene } getAddrs (): Multiaddr[] { - if (this.status.code === TCPListenerStatusCode.DOWN) { + if (this.status.code === TCPListenerStatusCode.INACTIVE) { return [] } @@ -283,8 +284,8 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED) { - throw Error('server is already listening') + if (this.status.code === TCPListenerStatusCode.ACTIVE || this.status.code === TCPListenerStatusCode.PAUSED) { + throw new CodeError('server is already listening', 'ERR_SERVER_ALREADY_LISTENING') } const peerId = ma.getPeerId() @@ -293,7 +294,7 @@ export class TCPListener extends EventEmitter implements Listene try { this.status = { - code: TCPListenerStatusCode.UP, + code: TCPListenerStatusCode.ACTIVE, listeningAddr, peerId, netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) @@ -301,7 +302,7 @@ export class TCPListener extends EventEmitter implements Listene await this.resume() } catch (err) { - this.status = { code: TCPListenerStatusCode.DOWN } + this.status = { code: TCPListenerStatusCode.INACTIVE } throw err } } @@ -311,14 +312,16 @@ export class TCPListener extends EventEmitter implements Listene Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) ) - await this.pause(true) + await this.pause(true).catch(e => { + log.error('error attempting to close server once connection count over limit', e) + }) } /** * Can resume a stopped or start an inert server */ private async resume (): Promise { - if (this.server.listening || this.status.code === TCPListenerStatusCode.DOWN) { + if (this.server.listening || this.status.code === TCPListenerStatusCode.INACTIVE) { return } @@ -330,17 +333,17 @@ export class TCPListener extends EventEmitter implements Listene this.server.listen(netConfig, resolve) }) - this.status = { ...this.status, code: TCPListenerStatusCode.UP } + this.status = { ...this.status, code: TCPListenerStatusCode.ACTIVE } log('Listening on %s', this.server.address()) } private async pause (permanent: boolean): Promise { if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { - this.status = { code: TCPListenerStatusCode.DOWN } + this.status = { code: TCPListenerStatusCode.INACTIVE } return } - if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP) { + if (!this.server.listening || this.status.code !== TCPListenerStatusCode.ACTIVE) { return } @@ -361,7 +364,7 @@ export class TCPListener extends EventEmitter implements Listene // We need to set this status before closing server, so other procedures are aware // during the time the server is closing - this.status = permanent ? { code: TCPListenerStatusCode.DOWN } : { ...this.status, code: TCPListenerStatusCode.PAUSED } + this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED } await new Promise((resolve, reject) => { this.server.close(err => { (err != null) ? reject(err) : resolve() }) }) diff --git a/packages/transport-tcp/test/connection-limits.spec.ts b/packages/transport-tcp/test/connection-limits.spec.ts index 6488219eba..f2f81deb1c 100644 --- a/packages/transport-tcp/test/connection-limits.spec.ts +++ b/packages/transport-tcp/test/connection-limits.spec.ts @@ -70,7 +70,7 @@ async function assertServerConnections (listener: TCPListener, connections: numb } } // eslint-disable-next-line @typescript-eslint/dot-notation - expect(listener['connections'].size).equals(connections, 'Wrong server connections') + expect(listener['connections'].size).equals(connections, 'invalid amount of server connections') } describe('closeAbove/listenBelow', () => { @@ -173,7 +173,7 @@ describe('closeAbove/listenBelow', () => { expect(closeEventCallCount).equals(0) }) - it('should not emit "listening" event when server is resumed due to "listenBelow" limit', async () => { + it('should emit "listening" event when server is resumed due to "listenBelow" limit', async () => { const listenBelow = 2 const closeAbove = 3 const port = 9900 From ea620dce72246b878c6597a441bf0106dd2d296a Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Thu, 28 Sep 2023 11:22:17 +0200 Subject: [PATCH 06/10] Fix the transport manager to close all listeners --- packages/libp2p/src/transport-manager.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/libp2p/src/transport-manager.ts b/packages/libp2p/src/transport-manager.ts index 070503764a..4e28a7eff9 100644 --- a/packages/libp2p/src/transport-manager.ts +++ b/packages/libp2p/src/transport-manager.ts @@ -262,12 +262,22 @@ export class DefaultTransportManager implements TransportManager, Startable { * If a transport has any running listeners, they will be closed. */ async remove (key: string): Promise { - log('removing %s', key) + const listeners = this.listeners.get(key) ?? [] + log('removing transport %s', key) // Close any running listeners - for (const listener of this.listeners.get(key) ?? []) { - await listener.close() + const tasks = [] + log('closing listeners for %s', key) + while (listeners.length > 0) { + const listener = listeners.pop() + + if (listener == null) { + continue + } + + tasks.push(listener.close()) } + await Promise.all(tasks) this.transports.delete(key) this.listeners.delete(key) From d4259ae74b1c8192b3e5e0a28ecc2b5214fb9afb Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Fri, 29 Sep 2023 11:51:19 +0200 Subject: [PATCH 07/10] Update the listner to close all connections later --- packages/transport-tcp/src/listener.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index 72b6ed7e8e..f34c5fc167 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -308,13 +308,15 @@ export class TCPListener extends EventEmitter implements Listene } async close (): Promise { - await Promise.all( - Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) - ) - + // First close the server so we don't accept new connections await this.pause(true).catch(e => { log.error('error attempting to close server once connection count over limit', e) }) + + // Then close all existing connections + await Promise.all( + Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) + ) } /** From a6722a99e6c5d15424ce14461c887f3ee6c24503 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Fri, 29 Sep 2023 17:43:29 +0200 Subject: [PATCH 08/10] Update the listener close handler --- packages/transport-tcp/src/listener.ts | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index f34c5fc167..e6e7b8e1cf 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -167,7 +167,7 @@ export class TCPListener extends EventEmitter implements Listene } private onSocket (socket: net.Socket): void { - if (this.status.code === TCPListenerStatusCode.INACTIVE) { + if (this.status.code !== TCPListenerStatusCode.ACTIVE) { throw new CodeError('Server is is not listening yet', 'ERR_SERVER_NOT_RUNNING') } // Avoid uncaught errors caused by unstable connections @@ -179,7 +179,7 @@ export class TCPListener extends EventEmitter implements Listene let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr: this.status.code === TCPListenerStatusCode.ACTIVE || this.status.code === TCPListenerStatusCode.PAUSED ? this.status.listeningAddr : undefined, + listeningAddr: this.status.listeningAddr, socketInactivityTimeout: this.context.socketInactivityTimeout, socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, @@ -308,15 +308,13 @@ export class TCPListener extends EventEmitter implements Listene } async close (): Promise { - // First close the server so we don't accept new connections - await this.pause(true).catch(e => { - log.error('error attempting to close server once connection count over limit', e) - }) - - // Then close all existing connections - await Promise.all( - Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) - ) + // Close connections and server the same time to avoid any race condition + await Promise.all([ + Promise.all(Array.from(this.connections.values()).map(async maConn => attemptClose(maConn))), + this.pause(true).catch(e => { + log.error('error attempting to close server once connection count over limit', e) + }) + ]) } /** From 83f5340c6113358145bdd1e416be876ebdfe6bd3 Mon Sep 17 00:00:00 2001 From: Chad Nehemiah Date: Sat, 30 Sep 2023 16:07:03 -0400 Subject: [PATCH 09/10] Update packages/libp2p/src/transport-manager.ts --- packages/libp2p/src/transport-manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/libp2p/src/transport-manager.ts b/packages/libp2p/src/transport-manager.ts index 4e28a7eff9..b9a754bd76 100644 --- a/packages/libp2p/src/transport-manager.ts +++ b/packages/libp2p/src/transport-manager.ts @@ -263,7 +263,7 @@ export class DefaultTransportManager implements TransportManager, Startable { */ async remove (key: string): Promise { const listeners = this.listeners.get(key) ?? [] - log('removing transport %s', key) + log.trace('removing transport %s', key) // Close any running listeners const tasks = [] From fdd3e96252d518245b9d1e2570fff0f0081ba1c4 Mon Sep 17 00:00:00 2001 From: Chad Nehemiah Date: Sat, 30 Sep 2023 16:07:12 -0400 Subject: [PATCH 10/10] Update packages/libp2p/src/transport-manager.ts --- packages/libp2p/src/transport-manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/libp2p/src/transport-manager.ts b/packages/libp2p/src/transport-manager.ts index b9a754bd76..38f5359616 100644 --- a/packages/libp2p/src/transport-manager.ts +++ b/packages/libp2p/src/transport-manager.ts @@ -267,7 +267,7 @@ export class DefaultTransportManager implements TransportManager, Startable { // Close any running listeners const tasks = [] - log('closing listeners for %s', key) + log.trace('closing listeners for %s', key) while (listeners.length > 0) { const listener = listeners.pop()