Skip to content

Commit

Permalink
Allow host/port to be updated in Miniflare#setOptions()
Browse files Browse the repository at this point in the history
If `port: 0` is passed to `Miniflare#setOptions()`, a new random port
will be allocated. Previously, Miniflare would always try to reuse
the existing port (even if a non-zero port was passed). That
behaviour can be retained by passing `port: (await mf.ready).port` to
`setOptions()`. We'd like this for Wrangler, so we can guarantee we
always reload on a fresh port.
  • Loading branch information
mrbbot committed Oct 5, 2023
1 parent 9dda016 commit 8374993
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 125 deletions.
140 changes: 68 additions & 72 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import assert from "assert";
import crypto from "crypto";
import { Abortable } from "events";
import fs from "fs";
import http from "http";
import net from "net";
Expand Down Expand Up @@ -98,15 +99,24 @@ import {
LogLevel,
Mutex,
SharedHeaders,
maybeApply,
} from "./workers";
import { _formatZodError } from "./zod-format";

const DEFAULT_HOST = "127.0.0.1";
function getURLSafeHost(host: string) {
return net.isIPv6(host) ? `[${host}]` : host;
}
function getAccessibleHost(host: string) {
return host === "*" || host === "0.0.0.0" || host === "::"
? "127.0.0.1"
: host;
const accessibleHost =
host === "*" || host === "0.0.0.0" || host === "::" ? "127.0.0.1" : host;
return getURLSafeHost(accessibleHost);
}

function getServerPort(server: http.Server) {
const address = server.address();
// Note address would be string with unix socket
assert(address !== null && typeof address === "object");
return address.port;
}

// ===== `Miniflare` User Options =====
Expand Down Expand Up @@ -539,11 +549,9 @@ export class Miniflare {
#sharedOpts: PluginSharedOptions;
#workerOpts: PluginWorkerOptions[];
#log: Log;
readonly #host: string;
readonly #accessibleHost: string;

#runtime?: Runtime;
#removeRuntimeExitHook?: () => void;
readonly #runtime?: Runtime;
readonly #removeRuntimeExitHook?: () => void;
#runtimeEntryURL?: URL;
#socketPorts?: Map<SocketIdentifier, number>;
#runtimeClient?: Client;
Expand All @@ -567,7 +575,7 @@ export class Miniflare {
// Aborted when dispose() is called
readonly #disposeController: AbortController;
#loopbackServer?: StoppableServer;
#loopbackPort?: number;
#loopbackHost?: string;
readonly #liveReloadServer: WebSocketServer;
readonly #webSocketServer: WebSocketServer;
readonly #webSocketExtraHeaders: WeakMap<http.IncomingMessage, Headers>;
Expand All @@ -587,15 +595,6 @@ export class Miniflare {
}

this.#log = this.#sharedOpts.core.log ?? new NoOpLog();
this.#host = this.#sharedOpts.core.host ?? DEFAULT_HOST;
// TODO: maybe remove `#accessibleHost` field, and just get whenever
// constructing entry URL, then extract constructing entry URL into
// function used `getUnsafeGetDirectURL()` too?
this.#accessibleHost = getAccessibleHost(this.#host);

if (net.isIPv6(this.#accessibleHost)) {
this.#accessibleHost = `[${this.#accessibleHost}]`;
}

this.#liveReloadServer = new WebSocketServer({ noServer: true });
this.#webSocketServer = new WebSocketServer({
Expand Down Expand Up @@ -630,10 +629,14 @@ export class Miniflare {
fs.rmSync(this.#tmpPath, { force: true, recursive: true });
});

// Setup runtime
this.#runtime = new Runtime();
this.#removeRuntimeExitHook = exitHook(() => void this.#runtime?.dispose());

this.#disposeController = new AbortController();
this.#runtimeMutex = new Mutex();
this.#initPromise = this.#runtimeMutex
.runWith(() => this.#init())
.runWith(() => this.#assembleAndUpdateConfig())
.catch((e) => {
// If initialisation failed, attempting to `dispose()` this instance
// will too. Therefore, remove from the instance registry now, so we
Expand All @@ -655,35 +658,6 @@ export class Miniflare {
}
}

async #init() {
// This function must be run with `#runtimeMutex` held

// Start loopback server (how the runtime accesses with Miniflare's storage)
// using the same host as the main runtime server. This means we can use the
// loopback server for live reload updates too.
this.#loopbackServer = await this.#startLoopbackServer(0, this.#host);
const address = this.#loopbackServer.address();
// Note address would be string with unix socket
assert(address !== null && typeof address === "object");
// noinspection JSObjectNullOrUndefined
this.#loopbackPort = address.port;

// Start runtime
const port = this.#sharedOpts.core.port ?? 0;
const opts: RuntimeOptions = {
entryHost: net.isIPv6(this.#host) ? `[${this.#host}]` : this.#host,
entryPort: port,
loopbackPort: this.#loopbackPort,
inspectorPort: this.#sharedOpts.core.inspectorPort,
verbose: this.#sharedOpts.core.verbose,
};
this.#runtime = new Runtime(opts);
this.#removeRuntimeExitHook = exitHook(() => void this.#runtime?.dispose());

// Update config and wait for runtime to start
await this.#assembleAndUpdateConfig();
}

async #handleLoopbackCustomService(
request: Request,
customService: string
Expand Down Expand Up @@ -862,21 +836,37 @@ export class Miniflare {
await writeResponse(response, res);
};

#startLoopbackServer(
port: number,
hostname: string
): Promise<StoppableServer> {
if (hostname === "*") {
hostname = "::";
async #getLoopbackPort(): Promise<number> {
// This function must be run with `#runtimeMutex` held

// Start loopback server (how the runtime accesses Node.js) using the same
// host as the main runtime server. This means we can use the loopback
// server for live reload updates too.
const loopbackHost = this.#sharedOpts.core.host ?? DEFAULT_HOST;
// If we've already started the loopback server...
if (this.#loopbackServer !== undefined) {
// ...and it's using the correct host, reuse it
if (this.#loopbackHost === loopbackHost) {
return getServerPort(this.#loopbackServer);
}
// Otherwise, stop it, and create a new one
await this.#stopLoopbackServer();
}
this.#loopbackServer = await this.#startLoopbackServer(loopbackHost);
this.#loopbackHost = loopbackHost;
return getServerPort(this.#loopbackServer);
}

#startLoopbackServer(hostname: string): Promise<StoppableServer> {
if (hostname === "*") hostname = "::";

return new Promise((resolve) => {
const server = stoppable(
http.createServer(this.#handleLoopback),
/* grace */ 0
);
server.on("upgrade", this.#handleLoopbackUpgrade);
server.listen(port, hostname, () => resolve(server));
server.listen(0, hostname, () => resolve(server));
});
}

Expand All @@ -887,12 +877,9 @@ export class Miniflare {
});
}

async #assembleConfig(): Promise<Config> {
async #assembleConfig(loopbackPort: number): Promise<Config> {
const allWorkerOpts = this.#workerOpts;
const sharedOpts = this.#sharedOpts;
const loopbackPort = this.#loopbackPort;
// #assembleConfig is always called after the loopback server is created
assert(loopbackPort !== undefined);

sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);

Expand Down Expand Up @@ -1049,9 +1036,11 @@ export class Miniflare {
}

async #assembleAndUpdateConfig() {
// This function must be run with `#runtimeMutex` held
const initial = !this.#runtimeEntryURL;
assert(this.#runtime !== undefined);
const config = await this.#assembleConfig();
const loopbackPort = await this.#getLoopbackPort();
const config = await this.#assembleConfig(loopbackPort);
const configBuffer = serializeConfig(config);

// Get all socket names we expect to get ports for
Expand All @@ -1062,18 +1051,26 @@ export class Miniflare {
return name;
}
);
// TODO(now): there's a bug here if the inspector was not enabled initially,
// fixed by a later commit in this PR
if (this.#sharedOpts.core.inspectorPort !== undefined) {
requiredSockets.push(kInspectorSocket);
}

// Reload runtime
const host = this.#sharedOpts.core.host ?? DEFAULT_HOST;
const urlSafeHost = getURLSafeHost(host);
const accessibleHost = getAccessibleHost(host);
const runtimeOpts: Abortable & RuntimeOptions = {
signal: this.#disposeController.signal,
entryHost: urlSafeHost,
entryPort: this.#sharedOpts.core.port ?? 0,
loopbackPort,
requiredSockets,
inspectorPort: this.#sharedOpts.core.inspectorPort,
verbose: this.#sharedOpts.core.verbose,
};
const maybeSocketPorts = await this.#runtime.updateConfig(
configBuffer,
requiredSockets,
{
signal: this.#disposeController.signal,
entryPort: maybeApply(parseInt, this.#runtimeEntryURL?.port),
}
runtimeOpts
);
if (this.#disposeController.signal.aborted) return;
if (maybeSocketPorts === undefined) {
Expand All @@ -1094,7 +1091,7 @@ export class Miniflare {
const entryPort = maybeSocketPorts.get(SOCKET_ENTRY);
assert(entryPort !== undefined);
this.#runtimeEntryURL = new URL(
`${secure ? "https" : "http"}://${this.#accessibleHost}:${entryPort}`
`${secure ? "https" : "http"}://${accessibleHost}:${entryPort}`
);
if (previousEntryURL?.toString() !== this.#runtimeEntryURL.toString()) {
this.#runtimeClient = new Client(this.#runtimeEntryURL, {
Expand All @@ -1118,16 +1115,15 @@ export class Miniflare {
// Only log and trigger reload if there aren't pending updates
const ready = initial ? "Ready" : "Updated and ready";

const host = net.isIPv6(this.#host) ? `[${this.#host}]` : this.#host;
this.#log.info(
`${ready} on ${secure ? "https" : "http"}://${host}:${entryPort} `
`${ready} on ${secure ? "https" : "http"}://${urlSafeHost}:${entryPort}`
);

if (initial) {
let hosts: string[];
if (this.#host === "::" || this.#host === "*") {
if (host === "::" || host === "*") {
hosts = getAccessibleHosts(false);
} else if (this.#host === "0.0.0.0") {
} else if (host === "0.0.0.0") {
hosts = getAccessibleHosts(true);
} else {
hosts = [];
Expand Down
93 changes: 44 additions & 49 deletions packages/miniflare/src/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@ const ControlMessageSchema = z.discriminatedUnion("event", [
export const kInspectorSocket = Symbol("kInspectorSocket");
export type SocketIdentifier = string | typeof kInspectorSocket;

export interface RuntimeOptions {
entryHost: string;
entryPort: number;
loopbackPort: number;
requiredSockets: SocketIdentifier[];
inspectorPort?: number;
verbose?: boolean;
}

async function waitForPorts(
requiredSockets: SocketIdentifier[],
stream: Readable,
options?: Abortable
options: Abortable & Pick<RuntimeOptions, "requiredSockets">
): Promise<Map<SocketIdentifier, number> | undefined> {
if (options?.signal?.aborted) return;
const lines = rl.createInterface(stream);
// Calling `close()` will end the async iterator below and return undefined
const abortListener = () => lines.close();
options?.signal?.addEventListener("abort", abortListener, { once: true });
// We're going to be mutating `sockets`, so shallow copy it
requiredSockets = Array.from(requiredSockets);
const requiredSockets = Array.from(options.requiredSockets);
const socketPorts = new Map<SocketIdentifier, number>();
try {
for await (const line of lines) {
Expand Down Expand Up @@ -82,65 +90,52 @@ function pipeOutput(runtime: childProcess.ChildProcessWithoutNullStreams) {
// runtime.stderr.pipe(process.stderr);
}

export interface RuntimeOptions {
entryHost: string;
entryPort: number;
loopbackPort: number;
inspectorPort?: number;
verbose?: boolean;
function getRuntimeCommand() {
return process.env.MINIFLARE_WORKERD_PATH ?? workerdPath;
}

export class Runtime {
readonly #command: string;

#process?: childProcess.ChildProcess;
#processExitPromise?: Promise<void>;

constructor(private opts: RuntimeOptions) {
this.#command = process.env.MINIFLARE_WORKERD_PATH ?? workerdPath;
function getRuntimeArgs(options: RuntimeOptions) {
const args: string[] = [
"serve",
// Required to use binary capnp config
"--binary",
// Required to use compatibility flags without a default-on date,
// (e.g. "streams_enable_constructors"), see https://github.com/cloudflare/workerd/pull/21
"--experimental",
`--socket-addr=${SOCKET_ENTRY}=${options.entryHost}:${options.entryPort}`,
`--external-addr=${SERVICE_LOOPBACK}=localhost:${options.loopbackPort}`,
// Configure extra pipe for receiving control messages (e.g. when ready)
"--control-fd=3",
// Read config from stdin
"-",
];
if (options.inspectorPort !== undefined) {
// Required to enable the V8 inspector
args.push(`--inspector-addr=localhost:${options.inspectorPort}`);
}
if (options.verbose) {
args.push("--verbose");
}

get #args() {
const args: string[] = [
"serve",
// Required to use binary capnp config
"--binary",
// Required to use compatibility flags without a default-on date,
// (e.g. "streams_enable_constructors"), see https://github.com/cloudflare/workerd/pull/21
"--experimental",
`--socket-addr=${SOCKET_ENTRY}=${this.opts.entryHost}:${this.opts.entryPort}`,
`--external-addr=${SERVICE_LOOPBACK}=localhost:${this.opts.loopbackPort}`,
// Configure extra pipe for receiving control messages (e.g. when ready)
"--control-fd=3",
// Read config from stdin
"-",
];
if (this.opts.inspectorPort !== undefined) {
// Required to enable the V8 inspector
args.push(`--inspector-addr=localhost:${this.opts.inspectorPort}`);
}
if (this.opts.verbose) {
args.push("--verbose");
}
return args;
}

return args;
}
export class Runtime {
#process?: childProcess.ChildProcess;
#processExitPromise?: Promise<void>;

async updateConfig(
configBuffer: Buffer,
requiredSockets: SocketIdentifier[],
options?: Abortable & Partial<Pick<RuntimeOptions, "entryPort">>
options: Abortable & RuntimeOptions
): Promise<Map<SocketIdentifier, number /* port */> | undefined> {
// 1. Stop existing process (if any) and wait for exit
await this.dispose();
// TODO: what happens if runtime crashes?

if (options?.entryPort !== undefined) {
this.opts.entryPort = options.entryPort;
}

// 2. Start new process
const runtimeProcess = childProcess.spawn(this.#command, this.#args, {
const command = getRuntimeCommand();
const args = getRuntimeArgs(options);
const runtimeProcess = childProcess.spawn(command, args, {
stdio: ["pipe", "pipe", "pipe", "pipe"],
env: process.env,
});
Expand All @@ -156,7 +151,7 @@ export class Runtime {
runtimeProcess.stdin.end();

// 4. Wait for sockets to start listening
return waitForPorts(requiredSockets, controlPipe, options);
return waitForPorts(controlPipe, options);
}

dispose(): Awaitable<void> {
Expand Down
Loading

0 comments on commit 8374993

Please sign in to comment.