Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions packages/core/src/com/hosts/ws-client-host.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { io, Socket, type SocketOptions } from 'socket.io-client';
import { io, type ManagerOptions, type Socket, type SocketOptions } from 'socket.io-client';
import type { Message } from '../message-types.js';
import { BaseHost } from './base-host.js';
import { EventEmitter, IDisposable, SafeDisposable } from '@dazl/patterns';
Expand All @@ -10,9 +10,9 @@ export class WsClientHost extends BaseHost implements IDisposable {
isDisposed = this.disposables.isDisposed;
public connected: Promise<void>;
private socketClient: Socket;
public subscribers = new EventEmitter<{ disconnect: void; reconnect: void }>();
public subscribers = new EventEmitter<{ connect: void; disconnect: string; reconnect: void }>();

constructor(url: string, options?: Partial<SocketOptions>) {
constructor(url: string, options?: Partial<ManagerOptions & SocketOptions>) {
super();
this.disposables.add('close socket', () => this.socketClient.close());
this.disposables.add('clear subscribers', () => this.subscribers.clear());
Expand All @@ -36,15 +36,16 @@ export class WsClientHost extends BaseHost implements IDisposable {
});

this.socketClient.on('connect', () => {
this.socketClient.on('message', (data: unknown) => {
this.emitMessageHandlers(data as Message);
});
this.subscribers.emit('connect', undefined);
resolve();
});

this.socketClient.on('disconnect', () => {
this.subscribers.emit('disconnect', undefined);
this.socketClient.close();
this.socketClient.on('message', (data: unknown) => {
this.emitMessageHandlers(data as Message);
});

this.socketClient.on('disconnect', (reason) => {
this.subscribers.emit('disconnect', reason);
});

this.socketClient.on('reconnect', () => {
Expand All @@ -57,4 +58,18 @@ export class WsClientHost extends BaseHost implements IDisposable {
public postMessage(data: any) {
this.socketClient.emit('message', data);
}

disconnectSocket() {
if (this.socketClient.connected) {
this.socketClient.disconnect();
}
}
reconnectSocket() {
if (!this.socketClient.connected) {
this.socketClient.connect();
}
}
isConnected(): boolean {
return this.socketClient.connected;
}
}
24 changes: 14 additions & 10 deletions packages/runtime-node/test/node-com.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ describe('Socket communication', () => {
let port: number;

const disposables = createDisposables();
const disposeAfterTest = <T extends { dispose: () => void }>(obj: T) => {
disposables.add(() => obj.dispose());
return obj;
};
afterEach(() => disposables.dispose());

beforeEach(async () => {
Expand All @@ -55,8 +59,8 @@ describe('Socket communication', () => {
}
});

clientHost = new WsClientHost(serverTopology['server-host']);
serverHost = new WsServerHost(nameSpace);
clientHost = disposeAfterTest(new WsClientHost(serverTopology['server-host']));
serverHost = disposeAfterTest(new WsServerHost(nameSpace));
await clientHost.connected;
});

Expand Down Expand Up @@ -225,10 +229,10 @@ describe('Socket communication', () => {
it('notifies if environment is disconnected', async () => {
const spy = sinon.spy();
const clientCom = new Communication(clientHost, 'client-host', serverTopology);
const { id } = await socketClientInitializer({
const { id } = disposeAfterTest(await socketClientInitializer({
communication: clientCom,
env: new Environment('server-host', 'node', 'single'),
});
}));

expect(id).to.not.eq(undefined);

Expand All @@ -250,23 +254,23 @@ describe('Socket communication', () => {
createWaitForCall<(ev: { data: Message }) => void>('server');
const { waitForCall: waitForClient1Call, spy: spyClient1 } =
createWaitForCall<(ev: { data: Message }) => void>('client');
const clientHost1 = new WsClientHost(serverTopology['server-host']!);
const clientHost2 = new WsClientHost(serverTopology['server-host']!);
const clientHost1 = disposeAfterTest(new WsClientHost(serverTopology['server-host']!));
const clientHost2 = disposeAfterTest(new WsClientHost(serverTopology['server-host']!));
const clientCom1 = new Communication(clientHost1, 'client-host1', serverTopology);
const clientCom2 = new Communication(clientHost2, 'client-host2', serverTopology);
new Communication(serverHost, 'server-host');
await socketClientInitializer({
disposeAfterTest(await socketClientInitializer({
communication: clientCom1,
env: {
env: 'server-host',
},
});
await socketClientInitializer({
}));
disposeAfterTest(await socketClientInitializer({
communication: clientCom2,
env: {
env: 'server-host',
},
});
}));
clientCom1.registerEnv('client-host2', clientCom1.getEnvironmentHost('server-host')!);
serverHost.addEventListener('message', spyServer);
clientHost1.addEventListener('message', spyClient1);
Expand Down
39 changes: 20 additions & 19 deletions packages/runtime-node/test/node-env.manager.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ import { runEnv as runAEnv } from '../test-kit/entrypoints/a.node.js';
import testFeature from '../test-kit/feature/test-feature.js';

describe('NodeEnvManager', () => {
const disposables = new Set<() => Promise<void> | void>();
afterEach(async () => {
for (const dispose of Array.from(disposables).reverse()) {
await dispose();
}
disposables.clear();
});
const disposeAfterTest = <T extends { dispose: () => void }>(obj: T) => {
disposables.add(() => obj.dispose());
return obj;
};

const meta = { url: import.meta.resolve('../test-kit/entrypoints/') };
const testCommunicationId = 'test';

Expand All @@ -38,15 +50,10 @@ describe('NodeEnvManager', () => {
},
};

manager = new NodeEnvManager(meta, featureEnvironmentsMapping);
manager = disposeAfterTest(new NodeEnvManager(meta, featureEnvironmentsMapping));
const { port } = await manager.autoLaunch(new Map([['feature', 'test-feature']]));
nodeEnvsPort = port;
communication = getClientCom(port);
});

afterEach(async () => {
await communication.dispose();
await manager.dispose();
communication = disposeAfterTest(getClientCom(port));
});

it('should reach env "a"', async () => {
Expand All @@ -67,8 +74,8 @@ describe('NodeEnvManager', () => {

it('should handle two communication with the same', async () => {
// setup new com instance with the same id
const communication2 = new Communication(new BaseHost(), testCommunicationId);
const host = new WsClientHost('http://localhost:' + nodeEnvsPort, {});
const communication2 = disposeAfterTest(new Communication(new BaseHost(), testCommunicationId));
const host = disposeAfterTest(new WsClientHost('http://localhost:' + nodeEnvsPort, {}));

communication2.registerEnv(aEnv.env, host);
communication2.registerEnv(bEnv.env, host);
Expand All @@ -85,20 +92,19 @@ describe('NodeEnvManager', () => {
});

describe('NodeEnvManager with 2 node envs, one remote the other in a worker thread', () => {
let closeEnvA: () => Promise<void>;
let nodeEnvsManager: NodeEnvManager;
let communication: Communication;

beforeEach(async () => {
const { port: aPort, socketServer, close } = await launchEngineHttpServer();
closeEnvA = close;
disposables.add(() => close());

await runAEnv({
Feature: testFeature,
topLevelConfig: [
COM.configure({
config: {
host: new WsServerHost(socketServer),
host: disposeAfterTest(new WsServerHost(socketServer)),
id: aEnv.env,
},
}),
Expand All @@ -124,14 +130,9 @@ describe('NodeEnvManager', () => {
},
};

nodeEnvsManager = new NodeEnvManager(meta, featureEnvironmentsMapping);
nodeEnvsManager = disposeAfterTest(new NodeEnvManager(meta, featureEnvironmentsMapping));
const { port } = await nodeEnvsManager.autoLaunch(new Map([['feature', 'test-feature']]));
communication = getClientCom(port);
});
afterEach(async () => {
await communication.dispose();
await closeEnvA();
await nodeEnvsManager.dispose();
communication = disposeAfterTest(getClientCom(port));
});

it('should reach env "a"', async () => {
Expand Down