diff --git a/packages/core/src/com/hosts/ws-client-host.ts b/packages/core/src/com/hosts/ws-client-host.ts index ffb874529..11a8043ec 100644 --- a/packages/core/src/com/hosts/ws-client-host.ts +++ b/packages/core/src/com/hosts/ws-client-host.ts @@ -1,4 +1,4 @@ -import { io, Socket, type SocketOptions } from 'socket.io-client'; +import { io, type ManagerOptions, type Socket, type SocketOptions } from 'socket.io-client'; import type { Message } from '../message-types.js'; import { BaseHost } from './base-host.js'; import { EventEmitter, IDisposable, SafeDisposable } from '@dazl/patterns'; @@ -10,9 +10,9 @@ export class WsClientHost extends BaseHost implements IDisposable { isDisposed = this.disposables.isDisposed; public connected: Promise; private socketClient: Socket; - public subscribers = new EventEmitter<{ disconnect: void; reconnect: void }>(); + public subscribers = new EventEmitter<{ connect: void; disconnect: string; reconnect: void }>(); - constructor(url: string, options?: Partial) { + constructor(url: string, options?: Partial) { super(); this.disposables.add('close socket', () => this.socketClient.close()); this.disposables.add('clear subscribers', () => this.subscribers.clear()); @@ -36,15 +36,16 @@ export class WsClientHost extends BaseHost implements IDisposable { }); this.socketClient.on('connect', () => { - this.socketClient.on('message', (data: unknown) => { - this.emitMessageHandlers(data as Message); - }); + this.subscribers.emit('connect', undefined); resolve(); }); - this.socketClient.on('disconnect', () => { - this.subscribers.emit('disconnect', undefined); - this.socketClient.close(); + this.socketClient.on('message', (data: unknown) => { + this.emitMessageHandlers(data as Message); + }); + + this.socketClient.on('disconnect', (reason) => { + this.subscribers.emit('disconnect', reason); }); this.socketClient.on('reconnect', () => { @@ -57,4 +58,18 @@ export class WsClientHost extends BaseHost implements IDisposable { public postMessage(data: any) { this.socketClient.emit('message', data); } + + disconnectSocket() { + if (this.socketClient.connected) { + this.socketClient.disconnect(); + } + } + reconnectSocket() { + if (!this.socketClient.connected) { + this.socketClient.connect(); + } + } + isConnected(): boolean { + return this.socketClient.connected; + } } diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index 11c592ba5..cd948b5b9 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -31,6 +31,10 @@ describe('Socket communication', () => { let port: number; const disposables = createDisposables(); + const disposeAfterTest = void }>(obj: T) => { + disposables.add(() => obj.dispose()); + return obj; + }; afterEach(() => disposables.dispose()); beforeEach(async () => { @@ -55,8 +59,8 @@ describe('Socket communication', () => { } }); - clientHost = new WsClientHost(serverTopology['server-host']); - serverHost = new WsServerHost(nameSpace); + clientHost = disposeAfterTest(new WsClientHost(serverTopology['server-host'])); + serverHost = disposeAfterTest(new WsServerHost(nameSpace)); await clientHost.connected; }); @@ -225,10 +229,10 @@ describe('Socket communication', () => { it('notifies if environment is disconnected', async () => { const spy = sinon.spy(); const clientCom = new Communication(clientHost, 'client-host', serverTopology); - const { id } = await socketClientInitializer({ + const { id } = disposeAfterTest(await socketClientInitializer({ communication: clientCom, env: new Environment('server-host', 'node', 'single'), - }); + })); expect(id).to.not.eq(undefined); @@ -250,23 +254,23 @@ describe('Socket communication', () => { createWaitForCall<(ev: { data: Message }) => void>('server'); const { waitForCall: waitForClient1Call, spy: spyClient1 } = createWaitForCall<(ev: { data: Message }) => void>('client'); - const clientHost1 = new WsClientHost(serverTopology['server-host']!); - const clientHost2 = new WsClientHost(serverTopology['server-host']!); + const clientHost1 = disposeAfterTest(new WsClientHost(serverTopology['server-host']!)); + const clientHost2 = disposeAfterTest(new WsClientHost(serverTopology['server-host']!)); const clientCom1 = new Communication(clientHost1, 'client-host1', serverTopology); const clientCom2 = new Communication(clientHost2, 'client-host2', serverTopology); new Communication(serverHost, 'server-host'); - await socketClientInitializer({ + disposeAfterTest(await socketClientInitializer({ communication: clientCom1, env: { env: 'server-host', }, - }); - await socketClientInitializer({ + })); + disposeAfterTest(await socketClientInitializer({ communication: clientCom2, env: { env: 'server-host', }, - }); + })); clientCom1.registerEnv('client-host2', clientCom1.getEnvironmentHost('server-host')!); serverHost.addEventListener('message', spyServer); clientHost1.addEventListener('message', spyClient1); diff --git a/packages/runtime-node/test/node-env.manager.unit.ts b/packages/runtime-node/test/node-env.manager.unit.ts index d63c61406..2cacb339b 100644 --- a/packages/runtime-node/test/node-env.manager.unit.ts +++ b/packages/runtime-node/test/node-env.manager.unit.ts @@ -12,6 +12,18 @@ import { runEnv as runAEnv } from '../test-kit/entrypoints/a.node.js'; import testFeature from '../test-kit/feature/test-feature.js'; describe('NodeEnvManager', () => { + const disposables = new Set<() => Promise | void>(); + afterEach(async () => { + for (const dispose of Array.from(disposables).reverse()) { + await dispose(); + } + disposables.clear(); + }); + const disposeAfterTest = void }>(obj: T) => { + disposables.add(() => obj.dispose()); + return obj; + }; + const meta = { url: import.meta.resolve('../test-kit/entrypoints/') }; const testCommunicationId = 'test'; @@ -38,15 +50,10 @@ describe('NodeEnvManager', () => { }, }; - manager = new NodeEnvManager(meta, featureEnvironmentsMapping); + manager = disposeAfterTest(new NodeEnvManager(meta, featureEnvironmentsMapping)); const { port } = await manager.autoLaunch(new Map([['feature', 'test-feature']])); nodeEnvsPort = port; - communication = getClientCom(port); - }); - - afterEach(async () => { - await communication.dispose(); - await manager.dispose(); + communication = disposeAfterTest(getClientCom(port)); }); it('should reach env "a"', async () => { @@ -67,8 +74,8 @@ describe('NodeEnvManager', () => { it('should handle two communication with the same', async () => { // setup new com instance with the same id - const communication2 = new Communication(new BaseHost(), testCommunicationId); - const host = new WsClientHost('http://localhost:' + nodeEnvsPort, {}); + const communication2 = disposeAfterTest(new Communication(new BaseHost(), testCommunicationId)); + const host = disposeAfterTest(new WsClientHost('http://localhost:' + nodeEnvsPort, {})); communication2.registerEnv(aEnv.env, host); communication2.registerEnv(bEnv.env, host); @@ -85,20 +92,19 @@ describe('NodeEnvManager', () => { }); describe('NodeEnvManager with 2 node envs, one remote the other in a worker thread', () => { - let closeEnvA: () => Promise; let nodeEnvsManager: NodeEnvManager; let communication: Communication; beforeEach(async () => { const { port: aPort, socketServer, close } = await launchEngineHttpServer(); - closeEnvA = close; + disposables.add(() => close()); await runAEnv({ Feature: testFeature, topLevelConfig: [ COM.configure({ config: { - host: new WsServerHost(socketServer), + host: disposeAfterTest(new WsServerHost(socketServer)), id: aEnv.env, }, }), @@ -124,14 +130,9 @@ describe('NodeEnvManager', () => { }, }; - nodeEnvsManager = new NodeEnvManager(meta, featureEnvironmentsMapping); + nodeEnvsManager = disposeAfterTest(new NodeEnvManager(meta, featureEnvironmentsMapping)); const { port } = await nodeEnvsManager.autoLaunch(new Map([['feature', 'test-feature']])); - communication = getClientCom(port); - }); - afterEach(async () => { - await communication.dispose(); - await closeEnvA(); - await nodeEnvsManager.dispose(); + communication = disposeAfterTest(getClientCom(port)); }); it('should reach env "a"', async () => {