diff --git a/packages/runtime-node/src/ws-node-host.ts b/packages/runtime-node/src/ws-node-host.ts index 81b0e3b23..b7921c2f2 100644 --- a/packages/runtime-node/src/ws-node-host.ts +++ b/packages/runtime-node/src/ws-node-host.ts @@ -16,6 +16,7 @@ export class WsHost extends BaseHost { export class WsServerHost extends BaseHost implements IDisposable { private socketToEnvId = new Map(); + private clientIdToSocket = new Map(); private disposables = new SafeDisposable(WsServerHost.name); dispose = this.disposables.dispose; isDisposed = this.disposables.isDisposed; @@ -42,7 +43,17 @@ export class WsServerHost extends BaseHost implements IDisposable { } private onConnection = (socket: io.Socket): void => { - const nameSpace = (original: string) => `${socket.id}/${original}`; + const clientId = socket.handshake.auth?.clientId || socket.id; + + // disconnect previous connection + const existingSocket = this.clientIdToSocket.get(clientId); + if (existingSocket && existingSocket.connected) { + existingSocket.disconnect(true); + } + + this.clientIdToSocket.set(clientId, socket); + + const nameSpace = (original: string) => `${clientId}/${original}`; const onMessage = (message: Message): void => { // this mapping should not be here because of forwarding of messages // maybe change message forwarding to have 'forward destination' and correct 'from' @@ -74,6 +85,9 @@ export class WsServerHost extends BaseHost implements IDisposable { }); } } + if (this.clientIdToSocket.get(clientId) === socket) { + this.clientIdToSocket.delete(clientId); + } }); }; } diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index df8258ef9..e2156aad3 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -298,6 +298,28 @@ describe('Socket communication', () => { expect(message.from).to.equal('server-host'); }); }); + + it('Should disconnect previous connection when same clientId reconnects', async () => { + const stableClientId = 'stable-client-id'; + const disconnectSpy = sinon.spy(); + + const firstClient = disposeAfterTest( + new WsClientHost(serverTopology['server-host']!, { auth: { clientId: stableClientId } }), + ); + await firstClient.connected; + firstClient.subscribers.on('disconnect', disconnectSpy); + + expect(firstClient.isConnected(), 'first connected').to.eql(true); + + const secondClient = disposeAfterTest( + new WsClientHost(serverTopology['server-host']!, { auth: { clientId: stableClientId } }), + ); + await secondClient.connected; + + await waitFor(() => expect(firstClient.isConnected(), 'first disconnected').to.eql(false), { timeout: 2_000 }); + expect(disconnectSpy.callCount, 'first disconnected count').to.eq(1); + expect(secondClient.isConnected(), 'second connected').to.eql(true); + }); }); describe('IPC communication', () => {