Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Refactor worker rpc #2645

Merged
merged 2 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/toolpad-app/cli/appServer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { parentPort, workerData } from 'worker_threads';
import { parentPort, workerData, MessagePort } from 'worker_threads';
import invariant from 'invariant';
import { createServer, Plugin } from 'vite';
import { createRpcClient } from '@mui/toolpad-utils/workerRpc';
import {
getHtmlContent,
postProcessHtml,
Expand All @@ -10,7 +11,6 @@ import {
import type { RuntimeConfig } from '../src/config';
import type * as appDom from '../src/appDom';
import type { ComponentEntry } from '../src/server/localMode';
import { createWorkerRpcClient } from '../src/server/workerRpc';

export type Command = { kind: 'reload-components' } | { kind: 'exit' };

Expand All @@ -20,7 +20,9 @@ export type WorkerRpc = {
getComponents: () => Promise<ComponentEntry[]>;
};

const { notifyReady, loadDom, getComponents } = createWorkerRpcClient<WorkerRpc>();
const { notifyReady, loadDom, getComponents } = createRpcClient<WorkerRpc>(
workerData.mainThreadRpcPort,
);

invariant(
process.env.NODE_ENV === 'development',
Expand Down Expand Up @@ -84,6 +86,7 @@ export interface AppViteServerConfig {
root: string;
port: number;
config: RuntimeConfig;
mainThreadRpcPort: MessagePort;
}

export async function main({ outDir, base, config, root, port }: AppViteServerConfig) {
Expand Down
10 changes: 7 additions & 3 deletions packages/toolpad-app/cli/server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as path from 'path';
import { IncomingMessage, createServer } from 'http';
import * as fs from 'fs/promises';
import { Worker } from 'worker_threads';
import { Worker, MessageChannel } from 'worker_threads';
import express from 'express';
import invariant from 'invariant';
import getPort from 'get-port';
Expand All @@ -15,6 +15,7 @@ import { listen } from '@mui/toolpad-utils/http';
import openBrowser from 'react-dev-utils/openBrowser';
import { folderExists } from '@mui/toolpad-utils/fs';
import chalk from 'chalk';
import { serveRpc } from '@mui/toolpad-utils/workerRpc';
import { asyncHandler } from '../src/utils/express';
import { createProdHandler } from '../src/server/toolpadAppServer';
import {
Expand All @@ -27,7 +28,6 @@ import type { Command as AppDevServerCommand, AppViteServerConfig, WorkerRpc } f
import { createRpcHandler } from '../src/server/rpc';
import { RUNTIME_CONFIG_WINDOW_PROPERTY } from '../src/constants';
import type { RuntimeConfig } from '../src/config';
import { createWorkerRpcServer } from '../src/server/workerRpc';
import { createRpcServer } from '../src/server/rpcServer';
import { createRpcRuntimeServer } from '../src/server/rpcRuntimeServer';

Expand All @@ -54,14 +54,18 @@ async function createDevHandler(
const appServerPath = path.resolve(__dirname, './appServer.js');
const devPort = await getPort();

const mainThreadRpcChannel = new MessageChannel();

const worker = new Worker(appServerPath, {
workerData: {
outDir: getAppOutputFolder(project.getRoot()),
base,
config: runtimeConfig,
root: project.getRoot(),
port: devPort,
mainThreadRpcPort: mainThreadRpcChannel.port1,
} satisfies AppViteServerConfig,
transferList: [mainThreadRpcChannel.port1],
env: {
...process.env,
NODE_ENV: 'development',
Expand All @@ -78,7 +82,7 @@ async function createDevHandler(
resolveReadyPromise = resolve;
});

createWorkerRpcServer<WorkerRpc>(worker, {
serveRpc<WorkerRpc>(mainThreadRpcChannel.port2, {
notifyReady: async () => resolveReadyPromise?.(),
loadDom: async () => project.loadDom(),
getComponents: async () => getComponents(project.getRoot()),
Expand Down
140 changes: 55 additions & 85 deletions packages/toolpad-app/src/server/functionsDevWorker.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,16 @@
import { once } from 'node:events';
import { Worker, MessageChannel, MessagePort, isMainThread, parentPort } from 'worker_threads';
import { Worker, MessageChannel, isMainThread, parentPort } from 'worker_threads';
import * as path from 'path';
import { createRequire } from 'node:module';
import * as fs from 'fs/promises';
import * as vm from 'vm';
import * as url from 'url';
import fetch, { Headers, Request, Response } from 'node-fetch';
import { errorFrom, serializeError } from '@mui/toolpad-utils/errors';
import { getCircularReplacer, replaceRecursive } from '@mui/toolpad-utils/json';
import { ServerContext, getServerContext, withContext } from '@mui/toolpad-core/serverRuntime';
import { isWebContainer } from '@webcontainer/env';

function getCircularReplacer() {
const ancestors: object[] = [];
return function replacer(this: object, key: string, value: unknown) {
if (typeof value !== 'object' || value === null) {
return value;
}
// `this` is the object that value is contained in,
// i.e., its direct parent.
while (ancestors.length > 0 && ancestors.at(-1) !== this) {
ancestors.pop();
}
if (ancestors.includes(value)) {
return '[Circular]';
}
ancestors.push(value);
return value;
};
}

type IntrospectedFiles = Map<string, { file: string }>;

interface IntrospectMessage {
kind: 'introspect';
files: IntrospectedFiles;
}

interface ExecuteMessage {
kind: 'execute';
filePath: string;
name: string;
parameters: unknown[];
cookies?: Record<string, string>;
}

type WorkerMessage = IntrospectMessage | ExecuteMessage;

type TransferredMessage = WorkerMessage & { port: MessagePort };
import SuperJSON from 'superjson';
import { createRpcClient, serveRpc } from '@mui/toolpad-utils/workerRpc';
import { workerData } from 'node:worker_threads';

interface ModuleObject {
exports: Record<string, unknown>;
Expand Down Expand Up @@ -91,52 +55,65 @@ async function resolveFunctions(filePath: string): Promise<Record<string, Functi
);
}

async function execute(msg: ExecuteMessage) {
interface ExecuteParams {
filePath: string;
name: string;
parameters: unknown[];
cookies?: Record<string, string>;
}

interface ExecuteResult {
result: string;
newCookies: [string, string][];
}

async function execute(msg: ExecuteParams): Promise<ExecuteResult> {
const fns = await resolveFunctions(msg.filePath);

const fn = fns[msg.name];
if (typeof fn !== 'function') {
throw new Error(`Function "${msg.name}" not found`);
}

const newCookies = new Map<string, string>();
let functionFinished = false;
const setCookie = (name: string, value: string) => {
if (functionFinished) {
throw new Error(`setCookie can't be called after the function has finished executing.`);
}
newCookies.set(name, value);
};
const ctx: ServerContext = {
cookies: msg.cookies || {},
setCookie,
};

try {
const newCookies = new Map<string, string>();

const ctx: ServerContext = {
cookies: msg.cookies || {},
setCookie(name: string, value: string) {
if (functionFinished) {
throw new Error(`setCookie can't be called after the function has finished executing.`);
}
newCookies.set(name, value);
},
};

const shouldBypassContext = isWebContainer();

if (shouldBypassContext) {
console.warn(
'Bypassing server context in web containers, see https://github.com/stackblitz/core/issues/2711',
);
}

const result = shouldBypassContext
const rawResult = shouldBypassContext
? await fn(...msg.parameters)
: await withContext(ctx, async () => fn(...msg.parameters));

return { result, newCookies: Array.from(newCookies.entries()) };
const withoutCircularRefs = replaceRecursive(rawResult, getCircularReplacer());
const serializedResult = SuperJSON.stringify(withoutCircularRefs);

return { result: serializedResult, newCookies: Array.from(newCookies.entries()) };
} finally {
functionFinished = true;
}
}

async function handleMessage(msg: WorkerMessage) {
switch (msg.kind) {
case 'execute':
return execute(msg);
default:
throw new Error(`Unknown kind "${(msg as any).kind}"`);
}
}
type WorkerRpcServer = {
execute: typeof execute;
};

if (!isMainThread && parentPort) {
// Polyfill fetch() in the Node.js environment
Expand All @@ -151,32 +128,22 @@ if (!isMainThread && parentPort) {
global.Response = Response;
}

parentPort.on('message', (msg: TransferredMessage) => {
(async () => {
try {
const result = await handleMessage(msg);
msg.port.postMessage({ result: JSON.stringify(result, getCircularReplacer()) });
} catch (rawError) {
msg.port.postMessage({ error: serializeError(errorFrom(rawError)) });
}
})();
serveRpc<WorkerRpcServer>(workerData.workerRpcPort, {
execute,
});
}

export function createWorker(env: Record<string, any>) {
const worker = new Worker(path.join(__dirname, 'functionsDevWorker.js'), { env });

const runOnWorker = async (msg: WorkerMessage) => {
const { port1, port2 } = new MessageChannel();
worker.postMessage({ port: port1, ...msg } satisfies TransferredMessage, [port1]);
const [{ error, result }] = await once(port2, 'message');

if (error) {
throw errorFrom(error);
}
const workerRpcChannel = new MessageChannel();
const worker = new Worker(path.join(__dirname, 'functionsDevWorker.js'), {
env,
workerData: {
workerRpcPort: workerRpcChannel.port1,
},
transferList: [workerRpcChannel.port1],
});

return result ? JSON.parse(result) : undefined;
};
const client = createRpcClient(workerRpcChannel.port2);

return {
async terminate() {
Expand All @@ -185,7 +152,8 @@ export function createWorker(env: Record<string, any>) {

async execute(filePath: string, name: string, parameters: unknown[]): Promise<any> {
const ctx = getServerContext();
const { result, newCookies } = await runOnWorker({

const { result: serializedResult, newCookies } = await client.execute({
kind: 'execute',
filePath,
name,
Expand All @@ -199,6 +167,8 @@ export function createWorker(env: Record<string, any>) {
}
}

const result = SuperJSON.parse(serializedResult);

return result;
},
};
Expand Down
68 changes: 0 additions & 68 deletions packages/toolpad-app/src/server/workerRpc.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/toolpad-core/src/serverRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export function createServerContext(req: IncomingMessage, res: ServerResponse):
return {
cookies,
setCookie(name, value) {
res.setHeader('Set-Cookie', cookie.serialize(name, value));
res.setHeader('Set-Cookie', cookie.serialize(name, value, { path: '/' }));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be possible to set options like httpOnly, secure and maxAge?
Or should we set any of those options for cookies to be more secure for authentication, for example?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but it's not within the scope of this PR.

},
};
}
Expand Down
Loading