Skip to content

Commit

Permalink
Various tweaks to make the socket server shut down cleanly (#683)
Browse files Browse the repository at this point in the history
* Tweak pinging; terminate hung connections

* Close all connections, including lost connections on shutdown

* Tweak guest counting

* Make private things private

* Check lost connection timeouts during ping phase
  • Loading branch information
goto-bus-stop authored Dec 7, 2024
1 parent 77b4633 commit 35dbbc2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 77 deletions.
43 changes: 22 additions & 21 deletions src/SocketServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import lodash from 'lodash';
import sjson from 'secure-json-parse';
import { WebSocketServer } from 'ws';
import Ajv from 'ajv';
import ms from 'ms';
import { stdSerializers } from 'pino';
import { socketVote } from './controllers/booth.js';
import { disconnectUser } from './controllers/users.js';
Expand All @@ -13,10 +12,13 @@ import AuthedConnection from './sockets/AuthedConnection.js';
import LostConnection from './sockets/LostConnection.js';
import { serializeUser } from './utils/serialize.js';

const { debounce, isEmpty } = lodash;
const { isEmpty } = lodash;

export const REDIS_ACTIVE_SESSIONS = 'users';

const PING_INTERVAL = 10_000;
const GUEST_COUNT_INTERVAL = 2_000;

/**
* @typedef {import('./schema.js').User} User
*/
Expand Down Expand Up @@ -102,10 +104,10 @@ class SocketServer {

#pinger;

/**
* Update online guests count and broadcast an update if necessary.
*/
#recountGuests;
/** Update online guests count and broadcast an update if necessary. */
#guestCountInterval;

#guestCountDirty = true;

/**
* Handlers for commands that come in from clients.
Expand Down Expand Up @@ -187,16 +189,17 @@ class SocketServer {

this.#pinger = setInterval(() => {
this.ping();
}, ms('10 seconds'));
}, PING_INTERVAL);

this.#recountGuests = debounce(() => {
if (this.#closing) {
this.#guestCountInterval = setInterval(() => {
if (!this.#guestCountDirty) {
return;
}
this.#recountGuestsInternal().catch((error) => {

this.#recountGuests().catch((error) => {
this.#logger.error({ err: error }, 'counting guests failed');
});
}, ms('2 seconds'));
}, GUEST_COUNT_INTERVAL);

this.#clientActions = {
sendChat: (user, message) => {
Expand Down Expand Up @@ -629,7 +632,7 @@ class SocketServer {
this.#logger.trace({ type: connection.constructor.name, userID, sessionID }, 'add connection');

this.#connections.push(connection);
this.#recountGuests();
this.#guestCountDirty = true;
}

/**
Expand All @@ -647,7 +650,7 @@ class SocketServer {
this.#connections.splice(i, 1);

connection.removed();
this.#recountGuests();
this.#guestCountDirty = true;
}

/**
Expand Down Expand Up @@ -703,12 +706,12 @@ class SocketServer {
clearInterval(this.#pinger);

this.#closing = true;
for (const connection of this.#wss.clients) {
clearInterval(this.#guestCountInterval);

for (const connection of this.#connections) {
connection.close();
}

this.#recountGuests.cancel();

const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
await closeWsServer();
await this.#redisSubscription.quit();
Expand All @@ -717,7 +720,7 @@ class SocketServer {
/**
* Get the connection instance for a specific user.
*
* @param {User|string} user The user.
* @param {User|import('./schema.js').UserID} user The user.
* @returns {Connection|undefined}
*/
connection(user) {
Expand All @@ -727,9 +730,7 @@ class SocketServer {

ping() {
this.#connections.forEach((connection) => {
if ('socket' in connection) {
connection.ping();
}
connection.ping();
});
}

Expand Down Expand Up @@ -779,7 +780,7 @@ class SocketServer {
return parseInt(rawCount, 10);
}

async #recountGuestsInternal() {
async #recountGuests() {
const { redis } = this.#uw;
const guests = this.#connections
.filter((connection) => connection instanceof GuestConnection)
Expand Down
47 changes: 36 additions & 11 deletions src/sockets/AuthedConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ import Ultron from 'ultron';
import WebSocket from 'ws';
import sjson from 'secure-json-parse';

const PING_TIMEOUT = 5_000;
const DEAD_TIMEOUT = 30_000;

class AuthedConnection extends EventEmitter {
#events;

#logger;

#lastMessage = Date.now();

/**
* @param {import('../Uwave.js').default} uw
* @param {import('ws').WebSocket} socket
Expand All @@ -16,19 +23,23 @@ class AuthedConnection extends EventEmitter {
super();
this.uw = uw;
this.socket = socket;
this.events = new Ultron(this.socket);
this.#events = new Ultron(this.socket);
this.user = user;
this.sessionID = sessionID;
this.#logger = uw.logger.child({
ns: 'uwave:sockets', connectionType: 'AuthedConnection', userId: this.user.id, sessionID,
});

this.events.on('close', () => {
this.#events.on('close', () => {
this.emit('close', { banned: this.banned });
});
this.events.on('message', this.onMessage.bind(this));
this.#events.on('message', (raw) => {
this.#onMessage(raw);
});
this.#events.on('pong', () => {
this.#onPong();
});

this.lastMessage = Date.now();
this.sendWaiting();
}

Expand Down Expand Up @@ -66,28 +77,42 @@ class AuthedConnection extends EventEmitter {

/**
* @param {string|Buffer} raw
* @private
*/
onMessage(raw) {
#onMessage(raw) {
this.#lastMessage = Date.now();
const { command, data } = sjson.safeParse(raw) ?? {};
if (command) {
this.emit('command', command, data);
}
}

#onPong() {
this.#lastMessage = Date.now();
}

/**
* @param {string} command
* @param {import('type-fest').JsonValue} data
*/
send(command, data) {
this.socket.send(JSON.stringify({ command, data }));
this.lastMessage = Date.now();
this.#lastMessage = Date.now();
}

#timeSinceLastMessage() {
return Date.now() - this.#lastMessage;
}

ping() {
if (Date.now() - this.lastMessage > 5000 && this.socket.readyState === WebSocket.OPEN) {
this.socket.send('-');
this.lastMessage = Date.now();
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}
if (this.#timeSinceLastMessage() > DEAD_TIMEOUT) {
this.socket.terminate();
return;
}
if (this.#timeSinceLastMessage() > PING_TIMEOUT) {
this.socket.ping();
}
}

Expand All @@ -104,7 +129,7 @@ class AuthedConnection extends EventEmitter {
}

removed() {
this.events.remove();
this.#events.remove();
}

toString() {
Expand Down
37 changes: 28 additions & 9 deletions src/sockets/GuestConnection.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import EventEmitter from 'node:events';
import Ultron from 'ultron';
import WebSocket from 'ws';

const PING_TIMEOUT = 5_000;
const DEAD_TIMEOUT = 30_000;

class GuestConnection extends EventEmitter {
#events;

#logger;

#lastMessage = Date.now();

/**
* @param {import('../Uwave.js').default} uw
* @param {import('ws').WebSocket} socket
Expand All @@ -16,21 +24,23 @@ class GuestConnection extends EventEmitter {
this.options = options;
this.#logger = uw.logger.child({ ns: 'uwave:sockets', connectionType: 'GuestConnection', userId: null });

this.events = new Ultron(socket);
this.#events = new Ultron(socket);

this.events.on('close', () => {
this.#events.on('close', () => {
this.emit('close');
});

this.events.on('message', /** @param {string|Buffer} token */ (token) => {
this.#events.on('message', /** @param {string|Buffer} token */ (token) => {
this.attemptAuth(token.toString()).then(() => {
this.send('authenticated');
}).catch((error) => {
this.send('error', error.message);
});
});

this.lastMessage = Date.now();
this.#events.on('pong', () => {
this.#lastMessage = Date.now();
});
}

/**
Expand Down Expand Up @@ -73,13 +83,22 @@ class GuestConnection extends EventEmitter {
*/
send(command, data) {
this.socket.send(JSON.stringify({ command, data }));
this.lastMessage = Date.now();
}

#timeSinceLastMessage() {
return Date.now() - this.#lastMessage;
}

ping() {
if (Date.now() - this.lastMessage > 5000) {
this.socket.send('-');
this.lastMessage = Date.now();
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}
if (this.#timeSinceLastMessage() > DEAD_TIMEOUT) {
this.socket.terminate();
return;
}
if (this.#timeSinceLastMessage() > PING_TIMEOUT) {
this.socket.ping();
}
}

Expand All @@ -89,7 +108,7 @@ class GuestConnection extends EventEmitter {
}

removed() {
this.events.remove();
this.#events.remove();
}

toString() {
Expand Down
Loading

0 comments on commit 35dbbc2

Please sign in to comment.