Skip to content

Commit

Permalink
Update the listner
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Sep 21, 2023
1 parent b23dd6a commit e47e041
Showing 1 changed file with 37 additions and 38 deletions.
75 changes: 37 additions & 38 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ interface Context extends TCPCreateListenerOptions {
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
}

const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: MetricGroup
errors: CounterGroup
Expand All @@ -60,16 +57,14 @@ enum TCPListenerStatusCode {
* When server object is initialized but we don't know the listening address yet or
* the server object is stopped manually, can be resumed only by calling listen()
**/
INERT = 'inert',
/* When listener is aware of the address but the server is not started listening */
INITIALIZED = 'initializing',
LISTENING = 'listening',
DOWN = 0,
UP = 1,
/* During the connection limits */
PAUSED = 'paused',
PAUSED = 2,
}

type Status = { code: TCPListenerStatusCode.INERT } | {
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.INERT>
type Status = { code: TCPListenerStatusCode.DOWN } | {
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.DOWN>
listeningAddr: Multiaddr
peerId: string | null
netConfig: NetConfig
Expand All @@ -79,7 +74,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { code: TCPListenerStatusCode.INERT }
private status: Status = { code: TCPListenerStatusCode.DOWN }
private metrics?: TCPListenerMetrics
private addr: string

Expand Down Expand Up @@ -146,7 +141,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

this.metrics?.status.update({
[this.addr]: SERVER_STATUS_UP
[this.addr]: TCPListenerStatusCode.UP
})
}

Expand All @@ -157,17 +152,21 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
if(this.status.code === TCPListenerStatusCode.PAUSED) return

this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
[this.addr]: this.status.code
})
this.dispatchEvent(new CustomEvent('close'))

// If this event is emitted, the transport manager will remove the listener from it's cache
// in the meanwhile if the connections are dropped then listener will start listening again
// and the transport manager will not be able to close the server
if(this.status.code !== TCPListenerStatusCode.PAUSED){
this.dispatchEvent(new CustomEvent('close'))
}
})
}

private onSocket (socket: net.Socket): void {
if(this.status.code === TCPListenerStatusCode.INERT) {
if(this.status.code === TCPListenerStatusCode.DOWN) {
throw new Error('Server is is not listening yet')
}
// Avoid uncaught errors caused by unstable connections
Expand Down Expand Up @@ -252,7 +251,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

getAddrs (): Multiaddr[] {
if (this.status.code === TCPListenerStatusCode.INERT) {
if (this.status.code === TCPListenerStatusCode.DOWN) {
return []
}

Expand Down Expand Up @@ -284,22 +283,27 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

async listen (ma: Multiaddr): Promise<void> {
if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) {
if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ) {
throw Error('server is already listening')
}

const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma
const { backlog } = this.context

this.status = {
code: TCPListenerStatusCode.INITIALIZED,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
try {
this.status = {
code: TCPListenerStatusCode.UP,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
}

await this.resume()
} catch(err) {
this.status = {code: TCPListenerStatusCode.DOWN}
throw err
}

await this.resume()
}

async close (): Promise<void> {
Expand All @@ -314,10 +318,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
* Can resume a stopped or start an inert server
*/
private async resume (): Promise<void> {
if (
!(this.status.code === TCPListenerStatusCode.INITIALIZED ||
this.status.code === TCPListenerStatusCode.PAUSED) ||
this.server.listening) {
if (this.server.listening || this.status.code === TCPListenerStatusCode.DOWN) {
return
}

Expand All @@ -328,20 +329,18 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.server.once('error', reject)
this.server.listen(netConfig, resolve)
})
this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING }

this.status = { ...this.status, code: TCPListenerStatusCode.UP }
log('Listening on %s', this.server.address())
}

private async pause (permanent: boolean): Promise<void> {
if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.INERT }
if(!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.DOWN }
return
}

if (
!(this.status.code === TCPListenerStatusCode.INITIALIZED ||
this.status.code === TCPListenerStatusCode.LISTENING) ||
!this.server.listening) {
if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP){
return
}

Expand All @@ -362,7 +361,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene

// We need to set this status before closing server, so other procedures are aware
// during the time the server is closing
this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
this.status = permanent ? { code: TCPListenerStatusCode.DOWN } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
await new Promise<void>((resolve, reject) => {
this.server.close( err => { err ? reject(err) : resolve() })
})
Expand Down

0 comments on commit e47e041

Please sign in to comment.