From e47e041c4b5fc95b2d845039395af132f390b6a3 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Thu, 21 Sep 2023 12:37:44 +0200 Subject: [PATCH] Update the listner --- packages/transport-tcp/src/listener.ts | 75 +++++++++++++------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index a92e944d77..1cae6ea50e 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -46,9 +46,6 @@ interface Context extends TCPCreateListenerOptions { closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts } -const SERVER_STATUS_UP = 1 -const SERVER_STATUS_DOWN = 0 - export interface TCPListenerMetrics { status: MetricGroup errors: CounterGroup @@ -60,16 +57,14 @@ enum TCPListenerStatusCode { * When server object is initialized but we don't know the listening address yet or * the server object is stopped manually, can be resumed only by calling listen() **/ - INERT = 'inert', - /* When listener is aware of the address but the server is not started listening */ - INITIALIZED = 'initializing', - LISTENING = 'listening', + DOWN = 0, + UP = 1, /* During the connection limits */ - PAUSED = 'paused', + PAUSED = 2, } -type Status = { code: TCPListenerStatusCode.INERT } | { - code: Exclude +type Status = { code: TCPListenerStatusCode.DOWN } | { + code: Exclude listeningAddr: Multiaddr peerId: string | null netConfig: NetConfig @@ -79,7 +74,7 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { code: TCPListenerStatusCode.INERT } + private status: Status = { code: TCPListenerStatusCode.DOWN } private metrics?: TCPListenerMetrics private addr: string @@ -146,7 +141,7 @@ export class TCPListener extends EventEmitter implements Listene } this.metrics?.status.update({ - [this.addr]: SERVER_STATUS_UP + [this.addr]: TCPListenerStatusCode.UP }) } @@ -157,17 +152,21 @@ export class TCPListener extends EventEmitter implements Listene this.dispatchEvent(new CustomEvent('error', { detail: err })) }) .on('close', () => { - if(this.status.code === TCPListenerStatusCode.PAUSED) return - this.metrics?.status.update({ - [this.addr]: SERVER_STATUS_DOWN + [this.addr]: this.status.code }) - this.dispatchEvent(new CustomEvent('close')) + + // If this event is emitted, the transport manager will remove the listener from it's cache + // in the meanwhile if the connections are dropped then listener will start listening again + // and the transport manager will not be able to close the server + if(this.status.code !== TCPListenerStatusCode.PAUSED){ + this.dispatchEvent(new CustomEvent('close')) + } }) } private onSocket (socket: net.Socket): void { - if(this.status.code === TCPListenerStatusCode.INERT) { + if(this.status.code === TCPListenerStatusCode.DOWN) { throw new Error('Server is is not listening yet') } // Avoid uncaught errors caused by unstable connections @@ -252,7 +251,7 @@ export class TCPListener extends EventEmitter implements Listene } getAddrs (): Multiaddr[] { - if (this.status.code === TCPListenerStatusCode.INERT) { + if (this.status.code === TCPListenerStatusCode.DOWN) { return [] } @@ -284,7 +283,7 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) { + if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ) { throw Error('server is already listening') } @@ -292,14 +291,19 @@ export class TCPListener extends EventEmitter implements Listene const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma const { backlog } = this.context - this.status = { - code: TCPListenerStatusCode.INITIALIZED, - listeningAddr, - peerId, - netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) + try { + this.status = { + code: TCPListenerStatusCode.UP, + listeningAddr, + peerId, + netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) + } + + await this.resume() + } catch(err) { + this.status = {code: TCPListenerStatusCode.DOWN} + throw err } - - await this.resume() } async close (): Promise { @@ -314,10 +318,7 @@ export class TCPListener extends EventEmitter implements Listene * Can resume a stopped or start an inert server */ private async resume (): Promise { - if ( - !(this.status.code === TCPListenerStatusCode.INITIALIZED || - this.status.code === TCPListenerStatusCode.PAUSED) || - this.server.listening) { + if (this.server.listening || this.status.code === TCPListenerStatusCode.DOWN) { return } @@ -328,20 +329,18 @@ export class TCPListener extends EventEmitter implements Listene this.server.once('error', reject) this.server.listen(netConfig, resolve) }) - this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING } + + this.status = { ...this.status, code: TCPListenerStatusCode.UP } log('Listening on %s', this.server.address()) } private async pause (permanent: boolean): Promise { - if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) { - this.status = { code: TCPListenerStatusCode.INERT } + if(!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { + this.status = { code: TCPListenerStatusCode.DOWN } return } - if ( - !(this.status.code === TCPListenerStatusCode.INITIALIZED || - this.status.code === TCPListenerStatusCode.LISTENING) || - !this.server.listening) { + if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP){ return } @@ -362,7 +361,7 @@ export class TCPListener extends EventEmitter implements Listene // We need to set this status before closing server, so other procedures are aware // during the time the server is closing - this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED } + this.status = permanent ? { code: TCPListenerStatusCode.DOWN } : { ...this.status, code: TCPListenerStatusCode.PAUSED } await new Promise((resolve, reject) => { this.server.close( err => { err ? reject(err) : resolve() }) })