Skip to content

Commit

Permalink
[Miniflare 3] Re-enable concurrent dispatchFetch()s and batch proxy…
Browse files Browse the repository at this point in the history
… heap frees (#713)

* Re-enable concurrent `dispatchFetch()`s and batch proxy heap frees

In order to fix tests when adding the magic proxy, we restricted
`dispatchFetch()` to one concurrent TCP connection. Unfortunately,
this prevented long-lived `dispatchFetch()` requests.

When proxies are garbage collected, we send a network request to
`workerd` to free the corresponding entry in the `ProxyServer` heap.
Garbage collection often happens in phases though, freeing lots of
objects at once. This caused many concurrent requests to `workerd`
(~60 in some tests), leading to many TCP connections being created.

This change re-enables using multiple TCP connections, and attempts
to address the root cause of the issues we were seeing, by batching
frees before sending the network request.

* Re-enable parallel tests
  • Loading branch information
mrbbot authored Oct 10, 2023
1 parent ff17f0c commit 6e0926d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"lint:fix": "npm run lint -- --fix",
"prepublishOnly": "npm run lint && npm run clean && npm run build && npm run types:bundle && npm run test",
"release": "./scripts/release.sh",
"test": "npm run build && ava --serial && rimraf ./.tmp",
"test": "npm run build && ava && rimraf ./.tmp",
"types:build": "tsc && tsc -p packages/miniflare/src/workers/tsconfig.json",
"types:bundle": "npm run types:build && node scripts/types.mjs"
},
Expand Down
12 changes: 6 additions & 6 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type {
import exitHook from "exit-hook";
import { $ as colors$ } from "kleur/colors";
import stoppable from "stoppable";
import { Client } from "undici";
import { Dispatcher, Pool } from "undici";
import SCRIPT_MINIFLARE_SHARED from "worker:shared/index";
import SCRIPT_MINIFLARE_ZOD from "worker:shared/zod";
import { WebSocketServer } from "ws";
Expand Down Expand Up @@ -557,7 +557,7 @@ export class Miniflare {
readonly #removeRuntimeExitHook?: () => void;
#runtimeEntryURL?: URL;
#socketPorts?: SocketPorts;
#runtimeClient?: Client;
#runtimeDispatcher?: Dispatcher;
#proxyClient?: ProxyClient;

// Path to temporary directory for use as scratch space/"in-memory" Durable
Expand Down Expand Up @@ -1137,10 +1137,10 @@ export class Miniflare {
`${secure ? "https" : "http"}://${accessibleHost}:${entryPort}`
);
if (previousEntryURL?.toString() !== this.#runtimeEntryURL.toString()) {
this.#runtimeClient = new Client(this.#runtimeEntryURL, {
this.#runtimeDispatcher = new Pool(this.#runtimeEntryURL, {
connect: { rejectUnauthorized: false },
});
registerAllowUnauthorizedDispatcher(this.#runtimeClient);
registerAllowUnauthorizedDispatcher(this.#runtimeDispatcher);
}
if (this.#proxyClient === undefined) {
this.#proxyClient = new ProxyClient(
Expand Down Expand Up @@ -1291,7 +1291,7 @@ export class Miniflare {
await this.ready;

assert(this.#runtimeEntryURL !== undefined);
assert(this.#runtimeClient !== undefined);
assert(this.#runtimeDispatcher !== undefined);

const forward = new Request(input, init);
const url = new URL(forward.url);
Expand All @@ -1313,7 +1313,7 @@ export class Miniflare {
}

const forwardInit = forward as RequestInit;
forwardInit.dispatcher = this.#runtimeClient;
forwardInit.dispatcher = this.#runtimeDispatcher;
const response = await fetch(url, forwardInit);

// If the Worker threw an uncaught exception, propagate it to the caller
Expand Down
35 changes: 27 additions & 8 deletions packages/miniflare/src/plugins/core/proxy/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ class ProxyClientBridge {
// as the references will be invalid, and a new object with the same address
// may be added to the "heap".
readonly #finalizationRegistry: FinalizationRegistry<NativeTargetHeldValue>;
// Garbage collection passes will free lots of objects at once. Rather than
// sending a `DELETE` request for each address, we batch finalisations within
// 100ms of each other into one request. This ensures we don't create *lots*
// of TCP connections to `workerd` in `dispatchFetch()` for all the concurrent
// requests.
readonly #finalizeBatch: NativeTargetHeldValue[] = [];
#finalizeBatchTimeout?: NodeJS.Timeout;

readonly sync = new SynchronousFetcher();

constructor(public url: URL, readonly dispatchFetch: DispatchFetch) {
Expand All @@ -129,26 +137,37 @@ class ProxyClientBridge {
return this.#version;
}

#finalizeProxy = async (held: NativeTargetHeldValue) => {
// Sanity check: make sure the proxy hasn't been poisoned. We should
// unregister all proxies from the finalisation registry when poisoning,
// but it doesn't hurt to be careful.
if (held.version !== this.#version) return;

#finalizeProxy = (held: NativeTargetHeldValue) => {
// Called when the `Proxy` with address `targetAddress` gets garbage
// collected. This removes the target from the `ProxyServer` "heap".
this.#finalizeBatch.push(held);
clearTimeout(this.#finalizeBatchTimeout);
this.#finalizeBatchTimeout = setTimeout(this.#finalizeProxyBatch, 100);
};

#finalizeProxyBatch = async () => {
const addresses: number[] = [];
for (const held of this.#finalizeBatch.splice(0)) {
// Sanity check: make sure the proxy hasn't been poisoned. We should
// unregister all proxies from the finalisation registry when poisoning,
// but it doesn't hurt to be careful.
if (held.version === this.#version) addresses.push(held.address);
}
// If there are no addresses to free, we don't need to send a request
if (addresses.length === 0) return;
try {
await this.dispatchFetch(this.url, {
method: "DELETE",
headers: {
[CoreHeaders.OP]: ProxyOps.FREE,
[CoreHeaders.OP_TARGET]: held.address.toString(),
[CoreHeaders.OP_TARGET]: addresses.join(","),
},
});
} catch {
// Ignore network errors when freeing. If this `dispatchFetch()` throws,
// it's likely the runtime has shutdown, so the entire "heap" has been
// destroyed anyway.
// destroyed anyway. There's a small chance of a memory leak here if this
// threw for another reason.
}
};

Expand Down
10 changes: 6 additions & 4 deletions packages/miniflare/src/workers/core/proxy.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ export class ProxyServer implements DurableObject {
// Get target to perform operations on
if (targetHeader === null) return new Response(null, { status: 400 });

// If this is a FREE operation, remove the target from the heap
// If this is a FREE operation, remove the target(s) from the heap
if (opHeader === ProxyOps.FREE) {
const targetAddress = parseInt(targetHeader);
assert(!Number.isNaN(targetAddress));
this.heap.delete(targetAddress);
for (const targetValue of targetHeader.split(",")) {
const targetAddress = parseInt(targetValue);
assert(!Number.isNaN(targetAddress));
this.heap.delete(targetAddress);
}
return new Response(null, { status: 204 });
}

Expand Down

0 comments on commit 6e0926d

Please sign in to comment.