Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert worker-manager TCP server to an HTTP server for DX #2063

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all`
| :4205 | `/test` realm for matrix client tests (playwright controlled) | 🚫 | 🚫 |
| :4210 | Development Worker Manager (spins up 1 worker by default) | ✅ | 🚫 |
| :4211 | Test Worker Manager (spins up 1 worker by default) | ✅ | 🚫 |
| :4212 | Test Worker Manager for matrix client tests (playwright controlled - 1 worker) | ✅ | 🚫 |
| :4213 | Test Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) | ✅ | 🚫 |
| :4212 | Worker Manager for matrix client tests (playwright controlled - 1 worker) | ✅ | 🚫 |
| :4213 | Worker Manager for matrix client tests - base realm server (playwright controlled - 1 worker) | ✅ | 🚫 |
| :5001 | Mail user interface for viewing emails sent to local SMTP | ✅ | 🚫 |
| :5435 | Postgres DB | ✅ | 🚫 |
| :8008 | Matrix synapse server | ✅ | 🚫 |
Expand Down Expand Up @@ -223,7 +223,7 @@ There is a ember-freestyle component explorer available to assist with developme

1. `cd packages/boxel-ui/test-app`
2. `pnpm start`
3. Visit http://localhost:4210/ in your browser
3. Visit http://localhost:4220/ in your browser

## Boxel Motion Demo App

Expand Down Expand Up @@ -290,7 +290,7 @@ To run the `packages/realm-server/` workspace tests start:
### Boxel UI

1. `cd packages/boxel-ui/test-app`
2. `pnpm test` (or `pnpm start` and visit http://localhost:4210/tests to run tests in the browser)
2. `pnpm test` (or `pnpm start` and visit http://localhost:4220/tests to run tests in the browser)

### Boxel Motion

Expand Down
2 changes: 1 addition & 1 deletion packages/boxel-ui/test-app/.ember-cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module.exports = {

Setting `disableAnalytics` to true will prevent any data from being sent.
*/
port: 4210,
port: 4220,
testPort: 7356,
disableAnalytics: false,
};
54 changes: 12 additions & 42 deletions packages/realm-server/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { NodeAdapter } from './node-realm';
import yargs from 'yargs';
import { RealmServer } from './server';
import { resolve } from 'path';
import { createConnection, type Socket } from 'net';
import { makeFastBootIndexRunner } from './fastboot';
import { shimExternals } from './lib/externals';
import * as Sentry from '@sentry/node';
Expand Down Expand Up @@ -331,49 +330,20 @@ let autoMigrate = migrateDB || undefined;
process.exit(-3);
});

let workerReadyDeferred: Deferred<boolean> | undefined;
async function waitForWorkerManager(port: number) {
const workerManager = await new Promise<Socket>((r) => {
let socket = createConnection({ port }, () => {
log.info(`Connected to worker manager on port ${port}`);
r(socket);
});
});

workerManager.on('data', (data) => {
let res = data.toString();
if (!workerReadyDeferred) {
throw new Error(
`received unsolicited message from worker manager on port ${port}`,
);
let isReady = false;
let timeout = Date.now() + 30_000;
do {
let response = await fetch(`http://localhost:${port}/`);
if (response.ok) {
let json = await response.json();
isReady = json.ready;
}
switch (res) {
case 'ready':
case 'not-ready':
workerReadyDeferred.fulfill(res === 'ready' ? true : false);
break;
default:
workerReadyDeferred.reject(
`unexpected response from worker manager: ${res}`,
);
}
});

try {
let isReady = false;
let timeout = Date.now() + 30_000;
do {
workerReadyDeferred = new Deferred();
workerManager.write('ready?');
isReady = await workerReadyDeferred.promise;
} while (!isReady && Date.now() < timeout);
if (!isReady) {
throw new Error(
`timed out trying to connect to worker manager on port ${port}`,
);
}
} finally {
workerManager.end();
} while (!isReady && Date.now() < timeout);
if (!isReady) {
throw new Error(
`timed out trying to waiting for worker manager to be ready on port ${port}`,
);
}
log.info('workers are ready');
}
140 changes: 88 additions & 52 deletions packages/realm-server/worker-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@ import {
} from '@cardstack/runtime-common';
import yargs from 'yargs';
import * as Sentry from '@sentry/node';
import { createServer } from 'net';
import flattenDeep from 'lodash/flattenDeep';
import { spawn } from 'child_process';
import pluralize from 'pluralize';
import Koa from 'koa';
import Router from '@koa/router';
import { ecsMetadata, fullRequestURL, livenessCheck } from './middleware';
import { Server } from 'http';

let log = logger('worker');
/* About the Worker Manager
*
* This process runs on each queue worker container and is responsible starting and monitoring the worker processes. It does this via IPC (inter-process communication).
* In test and development environments, the worker manager is also responsible for providing a readiness check HTTP endpoint so that tests can wait until the worker
* manager is ready before proceeding.
*/

let log = logger('worker-manager');

const REALM_SECRET_SEED = process.env.REALM_SECRET_SEED;
if (!REALM_SECRET_SEED) {
Expand All @@ -34,7 +44,8 @@ let {
.usage('Start worker manager')
.options({
port: {
description: 'TCP port for worker to communicate readiness (for tests)',
description:
'HTTP port for worker manager to communicate readiness and status',
type: 'number',
},
highPriorityCount: {
Expand Down Expand Up @@ -75,63 +86,88 @@ let isExiting = false;
process.on('SIGINT', () => (isExiting = true));
process.on('SIGTERM', () => (isExiting = true));

if (port != null) {
// in tests we start a simple TCP server to communicate to the realm when
// the worker is ready to start processing jobs
let server = createServer((socket) => {
log.info(`realm connected to worker manager`);
socket.on('data', (data) => {
if (data.toString() === 'ready?') {
socket.write(isReady ? 'ready' : 'not-ready');
}
});
socket.on('close', (hadError) => {
log.info(`realm has disconnected${hadError ? ' due to an error' : ''}.`);
});
socket.on('error', (err: any) => {
console.error(`realm disconnected from worker manager: ${err.message}`);
});
});
server.unref();
let webServerInstance: Server | undefined;

server.listen(port, () => {
log.info(`worker manager listening for realm on port ${port}`);
if (port) {
let webServer = new Koa<Koa.DefaultState, Koa.Context>();
let router = new Router();
router.head('/', livenessCheck);
router.get('/', async (ctxt: Koa.Context, _next: Koa.Next) => {
let result = {
ready: isReady,
} as Record<string, boolean | number>;
if (isReady) {
result = {
...result,
highPriorityWorkers: highPriorityCount,
allPriorityWorkers: allPriorityCount,
};
}
ctxt.set('Content-Type', 'application/json');
ctxt.body = JSON.stringify(result);
ctxt.status = isReady ? 200 : 503;
});

const shutdown = () => {
log.info(`Shutting down server for worker manager...`);
server.close((err) => {
if (err) {
log.error(`Error while closing the server for worker manager:`, err);
process.exit(1);
}
log.info(`Server closed for worker manager.`);
process.exit(0);
});
};
webServer
.use(router.routes())
.use((ctxt: Koa.Context, next: Koa.Next) => {
log.info(
`<-- ${ctxt.method} ${ctxt.req.headers.accept} ${
fullRequestURL(ctxt).href
}`,
);

ctxt.res.on('finish', () => {
log.info(
`--> ${ctxt.method} ${ctxt.req.headers.accept} ${
fullRequestURL(ctxt).href
}: ${ctxt.status}`,
);
log.debug(JSON.stringify(ctxt.req.headers));
});
return next();
})
.use(ecsMetadata);

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
process.on('uncaughtException', (err) => {
log.error(`Uncaught exception in worker manager:`, err);
shutdown();
webServer.on('error', (err: any) => {
log.error(`worker manager HTTP server error: ${err.message}`);
});

process.on('message', (message) => {
if (message === 'stop') {
console.log(`stopping realm server on port ${port}...`);
server.close(() => {
console.log(`worker manager on port ${port} has stopped`);
if (process.send) {
process.send('stopped');
}
});
} else if (message === 'kill') {
console.log(`Ending worker manager process for ${port}...`);
process.exit(0);
webServerInstance = webServer.listen(port);
log.info(`worker manager HTTP listening on port ${port}`);
}

const shutdown = (onShutdown?: () => void) => {
log.info(`Shutting down server for worker manager...`);
webServerInstance?.closeAllConnections();
webServerInstance?.close((err?: Error) => {
if (err) {
log.error(`Error while closing the server for worker manager HTTP:`, err);
process.exit(1);
}
log.info(`worker manager HTTP on port ${port} has stopped.`);
onShutdown?.();
lukemelia marked this conversation as resolved.
Show resolved Hide resolved
process.exit(0);
lukemelia marked this conversation as resolved.
Show resolved Hide resolved
});
}
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
process.on('uncaughtException', (err) => {
log.error(`Uncaught exception in worker manager:`, err);
shutdown();
});

process.on('message', (message) => {
if (message === 'stop') {
shutdown(() => {
process.send?.('stopped');
});
} else if (message === 'kill') {
log.info(`Ending worker manager process for ${port}...`);
process.exit(0);
}
});

(async () => {
log.info(
Expand Down
Loading