Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion packages/runtime-node/src/ws-node-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class WsHost extends BaseHost {

export class WsServerHost extends BaseHost implements IDisposable {
private socketToEnvId = new Map<string, { socket: io.Socket; clientID: string }>();
private clientIdToSocket = new Map<string, io.Socket>();
private disposables = new SafeDisposable(WsServerHost.name);
dispose = this.disposables.dispose;
isDisposed = this.disposables.isDisposed;
Expand All @@ -42,7 +43,17 @@ export class WsServerHost extends BaseHost implements IDisposable {
}

private onConnection = (socket: io.Socket): void => {
const nameSpace = (original: string) => `${socket.id}/${original}`;
const clientId = socket.handshake.auth?.clientId || socket.id;

// disconnect previous connection
const existingSocket = this.clientIdToSocket.get(clientId);
if (existingSocket && existingSocket.connected) {
existingSocket.disconnect(true);
}

this.clientIdToSocket.set(clientId, socket);

const nameSpace = (original: string) => `${clientId}/${original}`;
const onMessage = (message: Message): void => {
// this mapping should not be here because of forwarding of messages
// maybe change message forwarding to have 'forward destination' and correct 'from'
Expand Down Expand Up @@ -74,6 +85,9 @@ export class WsServerHost extends BaseHost implements IDisposable {
});
}
}
if (this.clientIdToSocket.get(clientId) === socket) {
this.clientIdToSocket.delete(clientId);
}
});
};
}
22 changes: 22 additions & 0 deletions packages/runtime-node/test/node-com.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,28 @@ describe('Socket communication', () => {
expect(message.from).to.equal('server-host');
});
});

it('Should disconnect previous connection when same clientId reconnects', async () => {
const stableClientId = 'stable-client-id';
const disconnectSpy = sinon.spy();

const firstClient = disposeAfterTest(
new WsClientHost(serverTopology['server-host']!, { auth: { clientId: stableClientId } }),
);
await firstClient.connected;
firstClient.subscribers.on('disconnect', disconnectSpy);

expect(firstClient.isConnected(), 'first connected').to.eql(true);

const secondClient = disposeAfterTest(
new WsClientHost(serverTopology['server-host']!, { auth: { clientId: stableClientId } }),
);
await secondClient.connected;

await waitFor(() => expect(firstClient.isConnected(), 'first disconnected').to.eql(false), { timeout: 2_000 });
expect(disconnectSpy.callCount, 'first disconnected count').to.eq(1);
expect(secondClient.isConnected(), 'second connected').to.eql(true);
});
});

describe('IPC communication', () => {
Expand Down