diff --git a/packages/core/src/com/initializers/socket-server.ts b/packages/core/src/com/initializers/socket-server.ts index 97743fea8..e67d89f11 100644 --- a/packages/core/src/com/initializers/socket-server.ts +++ b/packages/core/src/com/initializers/socket-server.ts @@ -1,15 +1,20 @@ import type { SocketOptions } from 'socket.io-client'; import { WsClientHost } from '../hosts/ws-client-host.js'; -import type { InitializerOptions } from './types.js'; +import { Communication } from '../communication.js'; -export interface SocketClientInitializerOptions extends InitializerOptions, Partial {} +export interface SocketClientInitializerOptions extends Partial { + communication: Communication; + env: { env: string }; + envUrl?: string; +} export const socketClientInitializer = async ({ communication, env: { env }, + envUrl: serverUrl, ...socketClientOptions }: SocketClientInitializerOptions) => { - const url = communication.topology[env] || location.origin; + const url = serverUrl || communication.topology[env] || location.origin; const instanceId = env; const host = new WsClientHost(url, socketClientOptions); if (communication.getEnvironmentHost(instanceId)) { @@ -29,15 +34,15 @@ export const socketClientInitializer = async ({ return { id: instanceId, - onDisconnect: (cb: () => void) => { - host.subscribers.on('disconnect', cb); - }, - onReconnect: (cb: () => void) => { - host.subscribers.on('reconnect', cb); - }, dispose: () => { communication.clearEnvironment(instanceId, undefined, false); return host.dispose(); }, + getMetrics: () => { + return Promise.resolve({ + marks: [], + measures: [], + }); + }, }; }; diff --git a/packages/runtime-node/src/node-env-manager.ts b/packages/runtime-node/src/node-env-manager.ts index 2375661e4..d300bc320 100644 --- a/packages/runtime-node/src/node-env-manager.ts +++ b/packages/runtime-node/src/node-env-manager.ts @@ -4,7 +4,7 @@ import { IRunOptions, Message, MultiCounter, - UniversalWorkerHost, + socketClientInitializer, } from '@wixc3/engine-core'; import { IDisposable, SetMultiMap } from '@wixc3/patterns'; import { fileURLToPath } from 'node:url'; @@ -12,9 +12,8 @@ import { parseArgs } from 'node:util'; import { extname } from 'node:path'; import { WsServerHost } from './ws-node-host.js'; import { ILaunchHttpServerOptions, launchEngineHttpServer } from './launch-http-server.js'; -import { runWorker } from './worker-thread-initializer2.js'; -import { getMetricsFromWorker, bindMetricsListener, type PerformanceMetrics } from './metrics-utils.js'; -import { rpcCall } from './micro-rpc.js'; +import { workerThreadInitializer2 } from './worker-thread-initializer2.js'; +import { bindMetricsListener, type PerformanceMetrics } from './metrics-utils.js'; export interface RunningNodeEnvironment { id: string; @@ -22,9 +21,14 @@ export interface RunningNodeEnvironment { getMetrics(): Promise; } +export interface NodeEnvConfig extends Pick { + envType: AnyEnvironment['envType'] | 'remote'; + remoteUrl?: string; +} + export type NodeEnvsFeatureMapping = { featureToEnvironments: Record; - availableEnvironments: Record>; + availableEnvironments: Record; }; export class NodeEnvManager implements IDisposable { @@ -53,7 +57,7 @@ export class NodeEnvManager implements IDisposable { const staticDirPath = fileURLToPath(new URL('../web', this.importMeta.url)); const { port, socketServer, app, close } = await launchEngineHttpServer({ staticDirPath, ...serverOptions }); - runtimeOptions.set('devServerPort', port.toString()); + runtimeOptions.set('enginePort', port.toString()); const clientsHost = new WsServerHost(socketServer); clientsHost.addEventListener('message', handleRegistrationOnMessage); @@ -127,9 +131,7 @@ export class NodeEnvManager implements IDisposable { } await Promise.all( - envNames.map((envName) => - this.initializeWorkerEnvironment(envName, runtimeOptions, forwardingCom, verbose), - ), + envNames.map((envName) => this.initializeEnvironment(envName, runtimeOptions, forwardingCom, verbose)), ); } @@ -141,7 +143,7 @@ export class NodeEnvManager implements IDisposable { return new URL(`${env.env}.${env.envType}${extname(this.importMeta.url)}`, this.importMeta.url); } - async initializeWorkerEnvironment( + async initializeEnvironment( envName: string, runtimeOptions: IRunOptions, forwardingCom: Communication, @@ -151,10 +153,22 @@ export class NodeEnvManager implements IDisposable { if (!env) { throw new Error(`environment ${envName} not found`); } - const envInstanceId = - env.endpointType === 'single' ? env.env : `${envName}/${this.envInstanceIdCounter.next(envName)}`; - const worker = runWorker(envInstanceId, this.createEnvironmentFileUrl(envName), runtimeOptions); - const runningEnv = await connectWorkerToProxyCom(envName, worker, forwardingCom); + let runningEnv: RunningNodeEnvironment; + if (env.envType === 'remote') { + if (!env.remoteUrl) { + throw new Error(`Remote URL for environment ${envName} is not defined`); + } + runningEnv = await socketClientInitializer({ communication: forwardingCom, env, envUrl: env.remoteUrl }); + } else { + const envWithInit = workerThreadInitializer2({ + communication: forwardingCom, + env: env, + workerURL: this.createEnvironmentFileUrl(envName), + runtimeOptions: runtimeOptions, + }); + await envWithInit.initialize(); + runningEnv = envWithInit; + } this.openEnvironments.add(envName, runningEnv); if (verbose) { @@ -177,35 +191,6 @@ export class NodeEnvManager implements IDisposable { } } -async function connectWorkerToProxyCom( - envName: string, - worker: ReturnType, - forwardingCom: Communication, -) { - const workerHost = new UniversalWorkerHost(worker, envName); - forwardingCom.registerMessageHandler(workerHost); - forwardingCom.registerEnv(envName, workerHost); - await forwardingCom.envReady(envName); - const runningEnv: RunningNodeEnvironment = { - id: envName, - dispose: async () => { - forwardingCom.clearEnvironment(envName); - if (process.env.ENGINE_GRACEFUL_TERMINATION !== 'false') { - try { - await rpcCall(worker, 'terminate', 15000); - } catch (e) { - console.error(`failed terminating environment gracefully ${envName}, terminating worker.`, e); - } - } - await worker.terminate(); - }, - getMetrics: async () => { - return getMetricsFromWorker(worker); - }, - }; - return runningEnv; -} - export function parseRuntimeOptions() { const { values: args } = parseArgs({ strict: false, diff --git a/packages/runtime-node/src/worker-thread-initializer2.ts b/packages/runtime-node/src/worker-thread-initializer2.ts index 3aae04fe7..4cdde48b1 100644 --- a/packages/runtime-node/src/worker-thread-initializer2.ts +++ b/packages/runtime-node/src/worker-thread-initializer2.ts @@ -1,19 +1,20 @@ -import { IRunOptions, InitializerOptions, UniversalWorkerHost } from '@wixc3/engine-core'; +import { AnyEnvironment, Communication, IRunOptions, UniversalWorkerHost } from '@wixc3/engine-core'; import { Worker } from '@wixc3/isomorphic-worker/worker'; import { type UniversalWorkerOptions } from '@wixc3/isomorphic-worker/types'; import { createDisposables } from '@wixc3/patterns'; -import { getMetricsFromWorker, type PerformanceMetrics } from './metrics-utils.js'; +import { getMetricsFromWorker } from './metrics-utils.js'; +import { rpcCall } from './micro-rpc.js'; +import type { RunningNodeEnvironment } from './node-env-manager.js'; -export interface WorkerThreadInitializer2 { - id: string; - dispose: () => Promise; +export interface WorkerThreadInitializer2 extends RunningNodeEnvironment { initialize: () => Promise; - getMetrics(): Promise; } -export interface WorkerThreadInitializerOptions2 extends InitializerOptions { +export interface WorkerThreadInitializerOptions2 { workerURL: URL; runtimeOptions?: IRunOptions; + env: Pick; + communication: Communication; } export function runWorker(instanceId: string, workerURL: URL, runtimeOptions?: IRunOptions) { @@ -46,7 +47,16 @@ export function workerThreadInitializer2({ execArgv: process.execArgv, } as UniversalWorkerOptions); - disposables.add('terminate worker', () => worker.terminate()); + disposables.add('terminate worker', async () => { + if (process.env.ENGINE_GRACEFUL_TERMINATION !== 'false') { + try { + await rpcCall(worker, 'terminate', 15000); + } catch (e) { + console.error(`failed terminating environment gracefully ${instanceId}, terminating worker.`, e); + } + } + await worker.terminate(); + }); const host = new UniversalWorkerHost(worker, instanceId); communication.registerEnv(instanceId, host); diff --git a/packages/runtime-node/test/node-com.unit.ts b/packages/runtime-node/test/node-com.unit.ts index 829091cdc..dba809bde 100644 --- a/packages/runtime-node/test/node-com.unit.ts +++ b/packages/runtime-node/test/node-com.unit.ts @@ -225,14 +225,15 @@ describe('Socket communication', () => { it('notifies if environment is disconnected', async () => { const spy = sinon.spy(); const clientCom = new Communication(clientHost, 'client-host', serverTopology); - const { onDisconnect } = await socketClientInitializer({ + const { id } = await socketClientInitializer({ communication: clientCom, env: new Environment('server-host', 'node', 'single'), }); - expect(onDisconnect).to.not.eq(undefined); + expect(id).to.not.eq(undefined); - onDisconnect(spy); + const host = clientCom.getEnvironmentHost(id); + (host as WsClientHost).subscribers.on('disconnect', spy); await socketServer.close(); await waitFor( () => { @@ -258,18 +259,12 @@ describe('Socket communication', () => { communication: clientCom1, env: { env: 'server-host', - endpointType: 'single', - envType: 'node', - dependencies: [], }, }); await socketClientInitializer({ communication: clientCom2, env: { env: 'server-host', - endpointType: 'single', - envType: 'node', - dependencies: [], }, }); clientCom1.registerEnv('client-host2', clientCom1.getEnvironmentHost('server-host')!); diff --git a/packages/runtime-node/test/node-env.manager.unit.ts b/packages/runtime-node/test/node-env.manager.unit.ts index 945406b5a..ead76cb82 100644 --- a/packages/runtime-node/test/node-env.manager.unit.ts +++ b/packages/runtime-node/test/node-env.manager.unit.ts @@ -1,81 +1,163 @@ import { expect } from 'chai'; -import { BaseHost, Communication, WsClientHost } from '@wixc3/engine-core'; -import { NodeEnvsFeatureMapping, NodeEnvManager } from '@wixc3/engine-runtime-node'; +import { BaseHost, COM, Communication, WsClientHost } from '@wixc3/engine-core'; +import { + launchEngineHttpServer, + NodeEnvManager, + type NodeEnvsFeatureMapping, + WsServerHost, +} from '@wixc3/engine-runtime-node'; import { aEnv, bEnv } from '../test-kit/feature/envs.js'; import { EchoService } from '../test-kit/feature/types.js'; +import { runEnv as runAEnv } from '../test-kit/entrypoints/a.node.js'; +import testFeature from '../test-kit/feature/test-feature.js'; -describe('NodeEnvManager with 2 node envs, remote api call', () => { +describe('NodeEnvManager', () => { + const meta = { url: import.meta.resolve('../test-kit/entrypoints/') }; const testCommunicationId = 'test'; - let manager: NodeEnvManager; - let communication: Communication; - let nodeEnvsPort: number; - beforeEach(async () => { - const featureEnvironmentsMapping: NodeEnvsFeatureMapping = { - featureToEnvironments: { - 'test-feature': [aEnv.env, bEnv.env], - }, - availableEnvironments: { - a: { - env: aEnv.env, - endpointType: 'single', - envType: 'node', + + describe('NodeEnvManager with 2 node envs, remote api call', () => { + let manager: NodeEnvManager; + let communication: Communication; + let nodeEnvsPort: number; + beforeEach(async () => { + const featureEnvironmentsMapping: NodeEnvsFeatureMapping = { + featureToEnvironments: { + 'test-feature': [aEnv.env, bEnv.env], }, - b: { - env: bEnv.env, - endpointType: 'single', - envType: 'node', + availableEnvironments: { + a: { + env: aEnv.env, + endpointType: 'single', + envType: 'node', + }, + b: { + env: bEnv.env, + endpointType: 'single', + envType: 'node', + }, }, - }, - }; - const meta = { url: import.meta.resolve('../test-kit/entrypoints/') }; + }; - manager = new NodeEnvManager(meta, featureEnvironmentsMapping); - const { port } = await manager.autoLaunch(new Map([['feature', 'test-feature']])); - nodeEnvsPort = port; - const host = new WsClientHost('http://localhost:' + port, {}); - const com = new Communication(new BaseHost(), testCommunicationId); - com.registerEnv(aEnv.env, host); - com.registerEnv(bEnv.env, host); - com.registerMessageHandler(host); - communication = com; - }); + manager = new NodeEnvManager(meta, featureEnvironmentsMapping); + const { port } = await manager.autoLaunch(new Map([['feature', 'test-feature']])); + nodeEnvsPort = port; + communication = getClientCom(port); + }); - afterEach(async () => { - await communication.dispose(); - await manager.dispose(); - }); + afterEach(async () => { + await communication.dispose(); + await manager.dispose(); + }); - it('should reach env "a"', async () => { - const api = communication.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); + it('should reach env "a"', async () => { + const api = communication.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); - expect(await api.echo()).to.equal('a'); - }); - it('should reach env "a", env "a" should reach env "b"', async () => { - const api = communication.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); + expect(await api.echo()).to.equal('a'); + }); + it('should reach env "a", env "a" should reach env "b"', async () => { + const api = communication.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); - expect(await api.echoChained()).to.equal('b'); - }); - it('should reach env "b", env "b" should reach env "a"', async () => { - const api = communication.apiProxy({ id: bEnv.env }, { id: 'test-feature.echoBService' }); + expect(await api.echoChained()).to.equal('b'); + }); + it('should reach env "b", env "b" should reach env "a"', async () => { + const api = communication.apiProxy({ id: bEnv.env }, { id: 'test-feature.echoBService' }); + + expect(await api.echoChained()).to.equal('a'); + }); + + it('should handle two communication with the same', async () => { + // setup new com instance with the same id + const communication2 = new Communication(new BaseHost(), testCommunicationId); + const host = new WsClientHost('http://localhost:' + nodeEnvsPort, {}); + + communication2.registerEnv(aEnv.env, host); + communication2.registerEnv(bEnv.env, host); + communication2.registerMessageHandler(host); + + const api1 = communication.apiProxy({ id: bEnv.env }, { id: 'test-feature.echoBService' }); + const api2 = communication2.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); + const result1 = api1.echo(); + const result2 = api2.echo(); - expect(await api.echoChained()).to.equal('a'); + expect(await result1).to.equal('b'); + expect(await result2).to.equal('a'); + }); }); - it('should handle two communication with the same', async () => { - // setup new com instance with the same id - const communication2 = new Communication(new BaseHost(), testCommunicationId); - const host = new WsClientHost('http://localhost:' + nodeEnvsPort, {}); + describe('NodeEnvManager with 2 node envs, one remote the other in a worker thread', () => { + let closeEnvA: () => Promise; + let nodeEnvsManager: NodeEnvManager; + let communication: Communication; - communication2.registerEnv(aEnv.env, host); - communication2.registerEnv(bEnv.env, host); - communication2.registerMessageHandler(host); + beforeEach(async () => { + const { port: aPort, socketServer, close } = await launchEngineHttpServer(); + closeEnvA = close; - const api1 = communication.apiProxy({ id: bEnv.env }, { id: 'test-feature.echoBService' }); - const api2 = communication2.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); - const result1 = api1.echo(); - const result2 = api2.echo(); + await runAEnv({ + Feature: testFeature, + topLevelConfig: [ + COM.configure({ + config: { + host: new WsServerHost(socketServer), + id: aEnv.env, + }, + }), + ], + }); - expect(await result1).to.equal('b'); - expect(await result2).to.equal('a'); + const featureEnvironmentsMapping: NodeEnvsFeatureMapping = { + featureToEnvironments: { + 'test-feature': [aEnv.env, bEnv.env], + }, + availableEnvironments: { + a: { + env: aEnv.env, + endpointType: 'single', + envType: 'remote', + remoteUrl: `http://localhost:${aPort}`, + }, + b: { + env: bEnv.env, + endpointType: 'single', + envType: 'node', + }, + }, + }; + + nodeEnvsManager = new NodeEnvManager(meta, featureEnvironmentsMapping); + const { port } = await nodeEnvsManager.autoLaunch(new Map([['feature', 'test-feature']])); + communication = getClientCom(port); + }); + afterEach(async () => { + await communication.dispose(); + await closeEnvA(); + await nodeEnvsManager.dispose(); + }); + + it('should reach env "a"', async () => { + const api = communication.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); + + expect(await api.echo()).to.equal('a'); + }); + + it('should reach env "a", env "a" should reach env "b"', async () => { + const api = communication.apiProxy({ id: aEnv.env }, { id: 'test-feature.echoAService' }); + + expect(await api.echoChained()).to.equal('b'); + }); + it('should reach env "b", env "b" should reach env "a"', async () => { + const api = communication.apiProxy({ id: bEnv.env }, { id: 'test-feature.echoBService' }); + + expect(await api.echoChained()).to.equal('a'); + }); }); + + function getClientCom(port: number) { + const host = new WsClientHost('http://localhost:' + port, {}); + const com = new Communication(new BaseHost(), testCommunicationId); + com.registerEnv(aEnv.env, host); + com.registerEnv(bEnv.env, host); + com.registerMessageHandler(host); + return com; + } });