Skip to content

Commit

Permalink
fix: keep websocket connection alive (#5047)
Browse files Browse the repository at this point in the history
fix: keep websocket connection alive by sending a ping message every 50 seconds
- setup heartbeat ping/pong message handlers
  • Loading branch information
petermakowski authored Jul 11, 2023
1 parent 08c62f5 commit fa6eebb
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 1 deletion.
60 changes: 60 additions & 0 deletions src/app/base/sagas/websockets/websockets.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import { pollAction, handlePolling } from "./handlers/polling-requests";
import { handleUnsubscribe } from "./handlers/unsubscribe";
import type { WebSocketChannel } from "./websockets";
import {
WEBSOCKET_PING_INTERVAL,
createConnection,
handleMessage,
handleNotifyMessage,
handlePingMessage,
sendMessage,
watchMessages,
watchWebSockets,
Expand All @@ -34,6 +36,7 @@ import WebSocketClient, {
} from "websocket-client";
import type {
WebSocketResponseNotify,
WebSocketResponsePing,
WebSocketResponseResult,
} from "websocket-client";

Expand Down Expand Up @@ -95,6 +98,23 @@ describe("websocket sagas", () => {
.run();
});

it("sends a websocket ping message to keep the connection alive", () => {
return expectSaga(watchWebSockets, socketClient)
.dispatch({
type: "status/websocketConnected",
})
.put({
type: "status/websocketPing",
meta: {
poll: true,
pollInterval: WEBSOCKET_PING_INTERVAL,
model: "status",
method: "ping",
},
})
.run();
});

it("can create a WebSocket connection", () => {
expect.assertions(1);
const socket = createConnection(socketClient);
Expand Down Expand Up @@ -143,6 +163,31 @@ describe("websocket sagas", () => {
);
});

it("can send a WebSocket ping message", () => {
const action = {
type: "status/websocketPing",
meta: {
model: "status",
method: "ping",
type: WebSocketMessageType.PING,
},
payload: null,
};
const saga = sendMessage(socketClient, action);
expect(saga.next().value).toEqual(
put({
meta: { item: null },
type: "status/websocketPingStart",
})
);
expect(saga.next().value).toEqual(
call([socketClient, socketClient.send], action, {
method: "status.ping",
type: WebSocketMessageType.PING,
})
);
});

it("can send a WebSocket message with a request id", () => {
const action = {
type: "test/action",
Expand Down Expand Up @@ -432,6 +477,21 @@ describe("websocket sagas", () => {
);
});

it("can handle a WebSocket ping message", () => {
const saga = handleMessage(socketChannel, socketClient);
const response: WebSocketResponsePing = {
request_id: 1,
result: 1,
rtype: 0,
type: WebSocketMessageType.PING_REPLY,
};
expect(saga.next().value).toEqual(take(socketChannel));
expect(saga.next({ data: JSON.stringify(response) }).value).toEqual(
call(handlePingMessage, response)
);
expect(saga.next().value).toEqual(take(socketChannel));
});

it("can handle a WebSocket notify message", () => {
const saga = handleMessage(socketChannel, socketClient);
const response: WebSocketResponseNotify = {
Expand Down
42 changes: 41 additions & 1 deletion src/app/base/sagas/websockets/websockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
WebSocketRequestMessage,
WebSocketActionParams,
WebSocketResponseNotify,
WebSocketResponsePing,
} from "../../../../websocket-client";

import {
Expand Down Expand Up @@ -83,6 +84,19 @@ export function* handleNotifyMessage({
});
}

/**
* Handle incoming ping response messages.
*
*/
export function* handlePingMessage({
result,
}: WebSocketResponsePing): SagaGenerator<void> {
yield* put({
type: "status/websocketPingReply",
payload: result,
});
}

/**
* Create a WebSocket connection via the client.
*/
Expand Down Expand Up @@ -186,6 +200,10 @@ export function* handleMessage(
const response = JSON.parse(websocketEvent.data);

switch (response.type) {
case WebSocketMessageType.PING_REPLY: {
yield* call(handlePingMessage, response);
break;
}
case WebSocketMessageType.NOTIFY: {
yield* call(handleNotifyMessage, response);
break;
Expand Down Expand Up @@ -270,7 +288,11 @@ const buildMessage = (
) => {
const message: WebSocketRequestMessage = {
method: `${meta.model}.${meta.method}`,
type: WebSocketMessageType.REQUEST,
// type is always request, except for ping messages
type:
meta.model === "status" && meta.method === "ping"
? WebSocketMessageType.PING
: WebSocketMessageType.REQUEST,
};
const hasMultipleDispatches = meta.dispatchMultiple && Array.isArray(params);
if (params && !hasMultipleDispatches) {
Expand Down Expand Up @@ -422,6 +444,23 @@ export function* setupWebSocket({
}
}

/**
* Send a ping message to the server every 50 seconds to keep the connection alive.
*
**/
export const WEBSOCKET_PING_INTERVAL = 50 * 1000; // 50 seconds
function* handleWebsocketPing() {
yield* put({
type: "status/websocketPing",
meta: {
poll: true,
pollInterval: WEBSOCKET_PING_INTERVAL,
model: "status",
method: "ping",
},
});
}

/**
* Set up websocket connection on request via status/websocketConnect action
* @param {Array} messageHandlers - Additional sagas to be handled by the
Expand All @@ -435,4 +474,5 @@ export function* watchWebSockets(
websocketClient,
messageHandlers,
});
yield* takeLatest("status/websocketConnected", handleWebsocketPing);
}
8 changes: 8 additions & 0 deletions src/websocket-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ export type WebSocketRequestMessage = {
params?: Record<string, unknown> | Record<string, unknown>[];
};

export type WebSocketPingMessage = {
type: WebSocketMessageType.PING;
};

export type WebSocketRequest = WebSocketMessage & WebSocketRequestMessage;

export type WebSocketResponseResult<R = unknown> = WebSocketMessage & {
Expand All @@ -54,6 +58,10 @@ export type WebSocketResponseNotify = {
type: WebSocketMessageType.NOTIFY;
};

export type WebSocketResponsePing = WebSocketResponseResult<number> & {
type: WebSocketMessageType.PING_REPLY;
};

export type WebSocketActionParams = AnyObject | AnyObject[];

export type WebSocketAction<P = WebSocketActionParams> = PayloadAction<
Expand Down

0 comments on commit fa6eebb

Please sign in to comment.