Skip to content

Commit

Permalink
Extend error handling of proxy request errors in ProxyWorker (#4867)
Browse files Browse the repository at this point in the history
* ignore stale proxy errors

* attempt to recover from ProxyWorker fetch errors
by requeuing the request (only if it is GET or HEAD)

* add test

* add changeset

* refactor to flatten nested if-else blocks

* requeue request for retry at front of queue

* sort batch of requests in queue by order of arrival

* Revert "sort batch of requests in queue by order of arrival"

This reverts commit 3329f19.

* Revert "requeue request for retry at front of queue"

This reverts commit f0377b7.

* prioritise requests being retried
over requests being proxied for the first time

* better comments

* update changeset to match recommeded format
  • Loading branch information
RamIdeas authored Feb 8, 2024
1 parent d96bc7d commit d637bd5
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 35 deletions.
9 changes: 9 additions & 0 deletions .changeset/few-houses-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"wrangler": patch
---

fix: inflight requests to UserWorker which failed across reloads are now retried

Previously, when running `wrangler dev`, requests inflight during a UserWorker reload (due to config or source file changes) would fail.

Now, if those inflight requests are GET or HEAD requests, they will be reproxied against the new UserWorker. This adds to the guarantee that requests made during local development reach the latest worker.
53 changes: 51 additions & 2 deletions fixtures/dev-env/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ describe("startDevWorker: ProxyController", () => {
fireAndForgetFakeUserWorkerChanges({
mfOpts: run.mfOpts,
config: run.config,
script: run.mfOpts.script.replace("1", "2"),
script: run.mfOpts.script.replace("body:1", "body:2"),
});

res = await run.worker.fetch("http://dummy");
Expand Down Expand Up @@ -295,7 +295,7 @@ describe("startDevWorker: ProxyController", () => {
fireAndForgetFakeUserWorkerChanges({
mfOpts: run.mfOpts,
config: run.config,
script: run.mfOpts.script.replace("1", "2"),
script: run.mfOpts.script.replace("body:1", "body:2"),
});
await executionContextClearedPromise;
});
Expand Down Expand Up @@ -599,4 +599,53 @@ describe("startDevWorker: ProxyController", () => {
"URL: https://mybank.co.uk/test/path/2"
);
});

test("inflight requests are retried during UserWorker reloads", async () => {
// to simulate inflight requests failing during UserWorker reloads,
// we will use a UserWorker with a longish `await setTimeout(...)`
// so that we can guarantee the race condition is hit
// when workerd is eventually terminated

const run = await fakeStartUserWorker({
script: `
export default {
async fetch(request) {
const url = new URL(request.url);
if (url.pathname === '/long') {
await new Promise(r => setTimeout(r, 30_000));
}
return new Response("UserWorker:1");
}
}
`,
});

res = await run.worker.fetch("http://dummy/short"); // implicitly waits for UserWorker:1 to be ready
await expect(res.text()).resolves.toBe("UserWorker:1");

const inflightDuringReloads = run.worker.fetch("http://dummy/long");

// this will cause workerd for UserWorker:1 to terminate (eventually, but soon)
fireAndForgetFakeUserWorkerChanges({
mfOpts: run.mfOpts,
config: run.config,
script: run.mfOpts.script.replace("UserWorker:1", "UserWorker:2"), // change response so it can be identified
});

res = await run.worker.fetch("http://dummy/short"); // implicitly waits for UserWorker:2 to be ready
await expect(res.text()).resolves.toBe("UserWorker:2");

// this will cause workerd for UserWorker:2 to terminate (eventually, but soon)
fireAndForgetFakeUserWorkerChanges({
mfOpts: run.mfOpts,
config: run.config,
script: run.mfOpts.script
.replace("UserWorker:1", "UserWorker:3") // change response so it can be identified
.replace("30_000", "0"), // remove the long wait as we won't reload this UserWorker
});

res = await inflightDuringReloads;
await expect(res.text()).resolves.toBe("UserWorker:3");
});
});
106 changes: 73 additions & 33 deletions packages/wrangler/templates/startDevWorker/ProxyWorker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from "node:assert";
import {
createDeferred,
DeferredPromise,
urlFromParts,
} from "../../src/api/startDevWorker/utils";
import type {
ProxyData,
Expand Down Expand Up @@ -37,6 +37,7 @@ export class ProxyWorker implements DurableObject {

proxyData?: ProxyData;
requestQueue = new Map<Request, DeferredPromise<Response>>();
requestRetryQueue = new Map<Request, DeferredPromise<Response>>();

fetch(request: Request) {
if (isRequestForLiveReloadWebsocket(request)) {
Expand Down Expand Up @@ -94,11 +95,22 @@ export class ProxyWorker implements DurableObject {
return new Response(null, { status: 204 });
}

/**
* Process requests that are being retried first, then process newer requests.
* Requests that are being retried are, by definition, older than requests which haven't been processed yet.
* We don't need to be more accurate than this re ordering, since the requests are being fired off synchronously.
*/
*getOrderedQueue() {
yield* this.requestRetryQueue;
yield* this.requestQueue;
}

processQueue() {
const { proxyData } = this; // destructuring is required to keep the type-narrowing (not undefined) in the .then callback and to ensure the same proxyData is used throughout each request
const { proxyData } = this; // store proxyData at the moment this function was called
if (proxyData === undefined) return;

for (const [request, deferredResponse] of this.requestQueue) {
for (const [request, deferredResponse] of this.getOrderedQueue()) {
this.requestRetryQueue.delete(request);
this.requestQueue.delete(request);

const userWorkerUrl = new URL(request.url);
Expand All @@ -115,6 +127,8 @@ export class ProxyWorker implements DurableObject {

// merge proxyData headers with the request headers
for (const [key, value] of Object.entries(proxyData.headers ?? {})) {
if (value === undefined) continue;

if (key.toLowerCase() === "cookie") {
const existing = request.headers.get("cookie") ?? "";
headers.set("cookie", `${existing};${value}`);
Expand All @@ -123,8 +137,7 @@ export class ProxyWorker implements DurableObject {
}
}

// explicitly NOT await-ing this promise, we are in a loop and want to process the whole queue quickly
// if we decide to await, we should include a timeout (~100ms) in case the user worker has long-running/parellel requests
// explicitly NOT await-ing this promise, we are in a loop and want to process the whole queue quickly + synchronously
void fetch(userWorkerUrl, new Request(request, { headers }))
.then((res) => {
if (isHtmlResponse(res)) {
Expand All @@ -137,17 +150,55 @@ export class ProxyWorker implements DurableObject {
// errors here are network errors or from response post-processing
// to catch only network errors, use the 2nd param of the fetch.then()

void sendMessageToProxyController(this.env, {
type: "error",
error: {
name: error.name,
message: error.message,
stack: error.stack,
cause: error.cause,
},
});

deferredResponse.reject(error);
// we have crossed an async boundary, so proxyData may have changed
// if proxyData.userWorkerUrl has changed, it means there is a new downstream UserWorker
// and that this error is stale since it was for a request to the old UserWorker
// so here we construct a newUserWorkerUrl so we can compare it to the (old) userWorkerUrl
const newUserWorkerUrl =
this.proxyData && urlFromParts(this.proxyData.userWorkerUrl);

// only report errors if the downstream proxy has NOT changed
if (userWorkerUrl.href === newUserWorkerUrl?.href) {
void sendMessageToProxyController(this.env, {
type: "error",
error: {
name: error.name,
message: error.message,
stack: error.stack,
cause: error.cause,
},
});

deferredResponse.reject(error);
}

// if the request can be retried (subset of idempotent requests which have no body), requeue it
else if (request.method === "GET" || request.method === "HEAD") {
this.requestRetryQueue.set(request, deferredResponse);
// we would only end up here if the downstream UserWorker is chang*ing*
// i.e. we are in a `pause`d state and expecting a `play` message soon
// this request will be processed (retried) when the `play` message arrives
// for that reason, we do not need to call `this.processQueue` here
// (but, also, it can't hurt to call it since it bails when
// in a `pause`d state i.e. `this.proxyData` is undefined)
}

// if the request cannot be retried, respond with 503 Service Unavailable
// important to note, this is not an (unexpected) error -- it is an acceptable flow of local development
// it would be incorrect to retry non-idempotent requests
// and would require cloning all body streams to avoid stream reuse (which is inefficient but not out of the question in the future)
// this is a good enough UX for now since it solves the most common GET use-case
else {
deferredResponse.resolve(
new Response(
"Your worker restarted mid-request. Please try sending the request again. Only GET or HEAD requests are retried automatically.",
{
status: 503,
headers: { "Retry-After": "0" },
}
)
);
}
});
}
}
Expand All @@ -166,25 +217,14 @@ function isRequestForLiveReloadWebsocket(req: Request): boolean {
return isWebSocketUpgrade && websocketProtocol === LIVE_RELOAD_PROTOCOL;
}

async function sendMessageToProxyController(
function sendMessageToProxyController(
env: Env,
message: ProxyWorkerOutgoingRequestBody,
retries = 3
message: ProxyWorkerOutgoingRequestBody
) {
try {
await env.PROXY_CONTROLLER.fetch("http://dummy", {
method: "POST",
body: JSON.stringify(message),
});
} catch (cause) {
if (retries > 0) {
return sendMessageToProxyController(env, message, retries - 1);
}

// no point sending an error message if we can't send this message

throw cause;
}
return env.PROXY_CONTROLLER.fetch("http://dummy", {
method: "POST",
body: JSON.stringify(message),
});
}

function insertLiveReloadScript(
Expand Down

0 comments on commit d637bd5

Please sign in to comment.