Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miniflare 3] Re-enable concurrent dispatchFetch()s and batch proxy heap frees #713

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"lint:fix": "npm run lint -- --fix",
"prepublishOnly": "npm run lint && npm run clean && npm run build && npm run types:bundle && npm run test",
"release": "./scripts/release.sh",
"test": "npm run build && ava --serial && rimraf ./.tmp",
"test": "npm run build && ava && rimraf ./.tmp",
"types:build": "tsc && tsc -p packages/miniflare/src/workers/tsconfig.json",
"types:bundle": "npm run types:build && node scripts/types.mjs"
},
Expand Down
12 changes: 6 additions & 6 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import type {
import exitHook from "exit-hook";
import { $ as colors$ } from "kleur/colors";
import stoppable from "stoppable";
import { Client } from "undici";
import { Dispatcher, Pool } from "undici";
import SCRIPT_MINIFLARE_SHARED from "worker:shared/index";
import SCRIPT_MINIFLARE_ZOD from "worker:shared/zod";
import { WebSocketServer } from "ws";
Expand Down Expand Up @@ -556,7 +556,7 @@ export class Miniflare {
readonly #removeRuntimeExitHook?: () => void;
#runtimeEntryURL?: URL;
#socketPorts?: SocketPorts;
#runtimeClient?: Client;
#runtimeDispatcher?: Dispatcher;
#proxyClient?: ProxyClient;

// Path to temporary directory for use as scratch space/"in-memory" Durable
Expand Down Expand Up @@ -1136,10 +1136,10 @@ export class Miniflare {
`${secure ? "https" : "http"}://${accessibleHost}:${entryPort}`
);
if (previousEntryURL?.toString() !== this.#runtimeEntryURL.toString()) {
this.#runtimeClient = new Client(this.#runtimeEntryURL, {
this.#runtimeDispatcher = new Pool(this.#runtimeEntryURL, {
connect: { rejectUnauthorized: false },
});
registerAllowUnauthorizedDispatcher(this.#runtimeClient);
registerAllowUnauthorizedDispatcher(this.#runtimeDispatcher);
}
if (this.#proxyClient === undefined) {
this.#proxyClient = new ProxyClient(
Expand Down Expand Up @@ -1290,7 +1290,7 @@ export class Miniflare {
await this.ready;

assert(this.#runtimeEntryURL !== undefined);
assert(this.#runtimeClient !== undefined);
assert(this.#runtimeDispatcher !== undefined);

const forward = new Request(input, init);
const url = new URL(forward.url);
Expand All @@ -1312,7 +1312,7 @@ export class Miniflare {
}

const forwardInit = forward as RequestInit;
forwardInit.dispatcher = this.#runtimeClient;
forwardInit.dispatcher = this.#runtimeDispatcher;
const response = await fetch(url, forwardInit);

// If the Worker threw an uncaught exception, propagate it to the caller
Expand Down
35 changes: 27 additions & 8 deletions packages/miniflare/src/plugins/core/proxy/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ class ProxyClientBridge {
// as the references will be invalid, and a new object with the same address
// may be added to the "heap".
readonly #finalizationRegistry: FinalizationRegistry<NativeTargetHeldValue>;
// Garbage collection passes will free lots of objects at once. Rather than
// sending a `DELETE` request for each address, we batch finalisations within
// 100ms of each other into one request. This ensures we don't create *lots*
// of TCP connections to `workerd` in `dispatchFetch()` for all the concurrent
// requests.
readonly #finalizeBatch: NativeTargetHeldValue[] = [];
#finalizeBatchTimeout?: NodeJS.Timeout;

readonly sync = new SynchronousFetcher();

constructor(public url: URL, readonly dispatchFetch: DispatchFetch) {
Expand All @@ -129,26 +137,37 @@ class ProxyClientBridge {
return this.#version;
}

#finalizeProxy = async (held: NativeTargetHeldValue) => {
// Sanity check: make sure the proxy hasn't been poisoned. We should
// unregister all proxies from the finalisation registry when poisoning,
// but it doesn't hurt to be careful.
if (held.version !== this.#version) return;

#finalizeProxy = (held: NativeTargetHeldValue) => {
// Called when the `Proxy` with address `targetAddress` gets garbage
// collected. This removes the target from the `ProxyServer` "heap".
this.#finalizeBatch.push(held);
clearTimeout(this.#finalizeBatchTimeout);
this.#finalizeBatchTimeout = setTimeout(this.#finalizeProxyBatch, 100);
};

#finalizeProxyBatch = async () => {
const addresses: number[] = [];
for (const held of this.#finalizeBatch.splice(0)) {
// Sanity check: make sure the proxy hasn't been poisoned. We should
// unregister all proxies from the finalisation registry when poisoning,
// but it doesn't hurt to be careful.
if (held.version === this.#version) addresses.push(held.address);
}
// If there are no addresses to free, we don't need to send a request
if (addresses.length === 0) return;
try {
await this.dispatchFetch(this.url, {
method: "DELETE",
headers: {
[CoreHeaders.OP]: ProxyOps.FREE,
[CoreHeaders.OP_TARGET]: held.address.toString(),
[CoreHeaders.OP_TARGET]: addresses.join(","),
},
});
} catch {
// Ignore network errors when freeing. If this `dispatchFetch()` throws,
// it's likely the runtime has shutdown, so the entire "heap" has been
// destroyed anyway.
// destroyed anyway. There's a small chance of a memory leak here if this
// threw for another reason.
}
};

Expand Down
10 changes: 6 additions & 4 deletions packages/miniflare/src/workers/core/proxy.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ export class ProxyServer implements DurableObject {
// Get target to perform operations on
if (targetHeader === null) return new Response(null, { status: 400 });

// If this is a FREE operation, remove the target from the heap
// If this is a FREE operation, remove the target(s) from the heap
if (opHeader === ProxyOps.FREE) {
const targetAddress = parseInt(targetHeader);
assert(!Number.isNaN(targetAddress));
this.heap.delete(targetAddress);
for (const targetValue of targetHeader.split(",")) {
const targetAddress = parseInt(targetValue);
assert(!Number.isNaN(targetAddress));
this.heap.delete(targetAddress);
}
return new Response(null, { status: 204 });
}

Expand Down