Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions packages/core/src/com/initializers/socket-server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import type { SocketOptions } from 'socket.io-client';
import { WsClientHost } from '../hosts/ws-client-host.js';
import type { InitializerOptions } from './types.js';
import { Communication } from '../communication.js';

export interface SocketClientInitializerOptions extends InitializerOptions, Partial<SocketOptions> {}
export interface SocketClientInitializerOptions extends Partial<SocketOptions> {
communication: Communication;
env: { env: string };
envUrl?: string;
}

export const socketClientInitializer = async ({
communication,
env: { env },
envUrl: serverUrl,
...socketClientOptions
}: SocketClientInitializerOptions) => {
const url = communication.topology[env] || location.origin;
const url = serverUrl || communication.topology[env] || location.origin;
const instanceId = env;
const host = new WsClientHost(url, socketClientOptions);
if (communication.getEnvironmentHost(instanceId)) {
Expand All @@ -29,15 +34,15 @@ export const socketClientInitializer = async ({

return {
id: instanceId,
onDisconnect: (cb: () => void) => {
host.subscribers.on('disconnect', cb);
},
onReconnect: (cb: () => void) => {
host.subscribers.on('reconnect', cb);
},
dispose: () => {
communication.clearEnvironment(instanceId, undefined, false);
return host.dispose();
},
getMetrics: () => {
return Promise.resolve({
marks: [],
measures: [],
});
},
};
};
71 changes: 28 additions & 43 deletions packages/runtime-node/src/node-env-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,31 @@ import {
IRunOptions,
Message,
MultiCounter,
UniversalWorkerHost,
socketClientInitializer,
} from '@wixc3/engine-core';
import { IDisposable, SetMultiMap } from '@wixc3/patterns';
import { fileURLToPath } from 'node:url';
import { parseArgs } from 'node:util';
import { extname } from 'node:path';
import { WsServerHost } from './ws-node-host.js';
import { ILaunchHttpServerOptions, launchEngineHttpServer } from './launch-http-server.js';
import { runWorker } from './worker-thread-initializer2.js';
import { getMetricsFromWorker, bindMetricsListener, type PerformanceMetrics } from './metrics-utils.js';
import { rpcCall } from './micro-rpc.js';
import { workerThreadInitializer2 } from './worker-thread-initializer2.js';
import { bindMetricsListener, type PerformanceMetrics } from './metrics-utils.js';

export interface RunningNodeEnvironment {
id: string;
dispose(): Promise<void>;
getMetrics(): Promise<PerformanceMetrics>;
}

export interface NodeEnvConfig extends Pick<AnyEnvironment, 'env' | 'endpointType'> {
envType: AnyEnvironment['envType'] | 'remote';
remoteUrl?: string;
}

export type NodeEnvsFeatureMapping = {
featureToEnvironments: Record<string, string[]>;
availableEnvironments: Record<string, Pick<AnyEnvironment, 'env' | 'envType' | 'endpointType'>>;
availableEnvironments: Record<string, NodeEnvConfig>;
};

export class NodeEnvManager implements IDisposable {
Expand Down Expand Up @@ -53,7 +57,7 @@ export class NodeEnvManager implements IDisposable {

const staticDirPath = fileURLToPath(new URL('../web', this.importMeta.url));
const { port, socketServer, app, close } = await launchEngineHttpServer({ staticDirPath, ...serverOptions });
runtimeOptions.set('devServerPort', port.toString());
runtimeOptions.set('enginePort', port.toString());

const clientsHost = new WsServerHost(socketServer);
clientsHost.addEventListener('message', handleRegistrationOnMessage);
Expand Down Expand Up @@ -127,9 +131,7 @@ export class NodeEnvManager implements IDisposable {
}

await Promise.all(
envNames.map((envName) =>
this.initializeWorkerEnvironment(envName, runtimeOptions, forwardingCom, verbose),
),
envNames.map((envName) => this.initializeEnvironment(envName, runtimeOptions, forwardingCom, verbose)),
);
}

Expand All @@ -141,7 +143,7 @@ export class NodeEnvManager implements IDisposable {
return new URL(`${env.env}.${env.envType}${extname(this.importMeta.url)}`, this.importMeta.url);
}

async initializeWorkerEnvironment(
async initializeEnvironment(
envName: string,
runtimeOptions: IRunOptions,
forwardingCom: Communication,
Expand All @@ -151,10 +153,22 @@ export class NodeEnvManager implements IDisposable {
if (!env) {
throw new Error(`environment ${envName} not found`);
}
const envInstanceId =
env.endpointType === 'single' ? env.env : `${envName}/${this.envInstanceIdCounter.next(envName)}`;
const worker = runWorker(envInstanceId, this.createEnvironmentFileUrl(envName), runtimeOptions);
const runningEnv = await connectWorkerToProxyCom(envName, worker, forwardingCom);
let runningEnv: RunningNodeEnvironment;
if (env.envType === 'remote') {
if (!env.remoteUrl) {
throw new Error(`Remote URL for environment ${envName} is not defined`);
}
runningEnv = await socketClientInitializer({ communication: forwardingCom, env, envUrl: env.remoteUrl });
} else {
const envWithInit = workerThreadInitializer2({
communication: forwardingCom,
env: env,
workerURL: this.createEnvironmentFileUrl(envName),
runtimeOptions: runtimeOptions,
});
await envWithInit.initialize();
runningEnv = envWithInit;
}

this.openEnvironments.add(envName, runningEnv);
if (verbose) {
Expand All @@ -177,35 +191,6 @@ export class NodeEnvManager implements IDisposable {
}
}

async function connectWorkerToProxyCom(
envName: string,
worker: ReturnType<typeof runWorker>,
forwardingCom: Communication,
) {
const workerHost = new UniversalWorkerHost(worker, envName);
forwardingCom.registerMessageHandler(workerHost);
forwardingCom.registerEnv(envName, workerHost);
await forwardingCom.envReady(envName);
const runningEnv: RunningNodeEnvironment = {
id: envName,
dispose: async () => {
forwardingCom.clearEnvironment(envName);
if (process.env.ENGINE_GRACEFUL_TERMINATION !== 'false') {
try {
await rpcCall(worker, 'terminate', 15000);
} catch (e) {
console.error(`failed terminating environment gracefully ${envName}, terminating worker.`, e);
}
}
await worker.terminate();
},
getMetrics: async () => {
return getMetricsFromWorker(worker);
},
};
return runningEnv;
}

export function parseRuntimeOptions() {
const { values: args } = parseArgs({
strict: false,
Expand Down
26 changes: 18 additions & 8 deletions packages/runtime-node/src/worker-thread-initializer2.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import { IRunOptions, InitializerOptions, UniversalWorkerHost } from '@wixc3/engine-core';
import { AnyEnvironment, Communication, IRunOptions, UniversalWorkerHost } from '@wixc3/engine-core';
import { Worker } from '@wixc3/isomorphic-worker/worker';
import { type UniversalWorkerOptions } from '@wixc3/isomorphic-worker/types';
import { createDisposables } from '@wixc3/patterns';
import { getMetricsFromWorker, type PerformanceMetrics } from './metrics-utils.js';
import { getMetricsFromWorker } from './metrics-utils.js';
import { rpcCall } from './micro-rpc.js';
import type { RunningNodeEnvironment } from './node-env-manager.js';

export interface WorkerThreadInitializer2 {
id: string;
dispose: () => Promise<void>;
export interface WorkerThreadInitializer2 extends RunningNodeEnvironment {
initialize: () => Promise<void>;
getMetrics(): Promise<PerformanceMetrics>;
}

export interface WorkerThreadInitializerOptions2 extends InitializerOptions {
export interface WorkerThreadInitializerOptions2 {
workerURL: URL;
runtimeOptions?: IRunOptions;
env: Pick<AnyEnvironment, 'env' | 'endpointType'>;
communication: Communication;
}

export function runWorker(instanceId: string, workerURL: URL, runtimeOptions?: IRunOptions) {
Expand Down Expand Up @@ -46,7 +47,16 @@ export function workerThreadInitializer2({
execArgv: process.execArgv,
} as UniversalWorkerOptions);

disposables.add('terminate worker', () => worker.terminate());
disposables.add('terminate worker', async () => {
if (process.env.ENGINE_GRACEFUL_TERMINATION !== 'false') {
try {
await rpcCall(worker, 'terminate', 15000);
} catch (e) {
console.error(`failed terminating environment gracefully ${instanceId}, terminating worker.`, e);
}
}
await worker.terminate();
});

const host = new UniversalWorkerHost(worker, instanceId);
communication.registerEnv(instanceId, host);
Expand Down
13 changes: 4 additions & 9 deletions packages/runtime-node/test/node-com.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,15 @@ describe('Socket communication', () => {
it('notifies if environment is disconnected', async () => {
const spy = sinon.spy();
const clientCom = new Communication(clientHost, 'client-host', serverTopology);
const { onDisconnect } = await socketClientInitializer({
const { id } = await socketClientInitializer({
communication: clientCom,
env: new Environment('server-host', 'node', 'single'),
});

expect(onDisconnect).to.not.eq(undefined);
expect(id).to.not.eq(undefined);

onDisconnect(spy);
const host = clientCom.getEnvironmentHost(id);
(host as WsClientHost).subscribers.on('disconnect', spy);
await socketServer.close();
await waitFor(
() => {
Expand All @@ -258,18 +259,12 @@ describe('Socket communication', () => {
communication: clientCom1,
env: {
env: 'server-host',
endpointType: 'single',
envType: 'node',
dependencies: [],
},
});
await socketClientInitializer({
communication: clientCom2,
env: {
env: 'server-host',
endpointType: 'single',
envType: 'node',
dependencies: [],
},
});
clientCom1.registerEnv('client-host2', clientCom1.getEnvironmentHost('server-host')!);
Expand Down
Loading