From c0c992b4831895b91bd1b1379639358f55baa017 Mon Sep 17 00:00:00 2001 From: bcoll Date: Fri, 12 May 2023 12:24:42 +0100 Subject: [PATCH 1/2] Add support for multiple, weak, wildcard etags in R2 gateway cloudflare/workerd#563 added support to R2 bindings for these --- packages/tre/src/plugins/r2/schemas.ts | 12 +- packages/tre/src/plugins/r2/validator.ts | 24 +++- .../tre/test/plugins/r2/validator.spec.ts | 108 +++++++++++++----- 3 files changed, 113 insertions(+), 31 deletions(-) diff --git a/packages/tre/src/plugins/r2/schemas.ts b/packages/tre/src/plugins/r2/schemas.ts index f2f99afd9..78f9b9534 100644 --- a/packages/tre/src/plugins/r2/schemas.ts +++ b/packages/tre/src/plugins/r2/schemas.ts @@ -94,12 +94,20 @@ export const R2RangeSchema = z.object({ }); export type R2Range = z.infer; +export const R2EtagSchema = z.discriminatedUnion("type", [ + z.object({ type: z.literal("strong"), value: z.string() }), + z.object({ type: z.literal("weak"), value: z.string() }), + z.object({ type: z.literal("wildcard") }), +]); +export type R2Etag = z.infer; +export const R2EtagMatchSchema = R2EtagSchema.array().min(1).optional(); + // For more information, refer to https://datatracker.ietf.org/doc/html/rfc7232 export const R2ConditionalSchema = z.object({ // Performs the operation if the object's ETag matches the given string - etagMatches: z.ostring(), // "If-Match" + etagMatches: R2EtagMatchSchema, // "If-Match" // Performs the operation if the object's ETag does NOT match the given string - etagDoesNotMatch: z.ostring(), // "If-None-Match" + etagDoesNotMatch: R2EtagMatchSchema, // "If-None-Match" // Performs the operation if the object was uploaded BEFORE the given date uploadedBefore: DateSchema.optional(), // "If-Unmodified-Since" // Performs the operation if the object was uploaded AFTER the given date diff --git a/packages/tre/src/plugins/r2/validator.ts b/packages/tre/src/plugins/r2/validator.ts index a7e346bbc..2cea15e76 100644 --- a/packages/tre/src/plugins/r2/validator.ts +++ b/packages/tre/src/plugins/r2/validator.ts @@ -12,7 +12,7 @@ import { PreconditionFailed, } from "./errors"; import { R2Object } from "./r2Object"; -import { R2Conditional, R2GetOptions } from "./schemas"; +import { R2Conditional, R2Etag, R2GetOptions } from "./schemas"; export const MAX_LIST_KEYS = 1_000; const MAX_KEY_SIZE = 1024; @@ -27,6 +27,21 @@ function truncateToSeconds(ms: number) { return Math.floor(ms / 1000) * 1000; } +function includesEtag( + conditions: R2Etag[], + etag: string, + comparison: "strong" | "weak" +) { + // Adapted from internal R2 gateway implementation. + for (const condition of conditions) { + if (condition.type === "wildcard") return true; + if (condition.value === etag) { + if (condition.type === "strong" || comparison === "weak") return true; + } + } + return false; +} + // Returns `true` iff the condition passed /** @internal */ export function _testR2Conditional( @@ -43,9 +58,12 @@ export function _testR2Conditional( } const { etag, uploaded: lastModifiedRaw } = metadata; - const ifMatch = cond.etagMatches === undefined || cond.etagMatches === etag; + const ifMatch = + cond.etagMatches === undefined || + includesEtag(cond.etagMatches, etag, "strong"); const ifNoneMatch = - cond.etagDoesNotMatch === undefined || cond.etagDoesNotMatch !== etag; + cond.etagDoesNotMatch === undefined || + !includesEtag(cond.etagDoesNotMatch, etag, "weak"); const maybeTruncate = cond.secondsGranularity ? truncateToSeconds : identity; const lastModified = maybeTruncate(lastModifiedRaw); diff --git a/packages/tre/test/plugins/r2/validator.spec.ts b/packages/tre/test/plugins/r2/validator.spec.ts index cea38aeeb..0549ab1e3 100644 --- a/packages/tre/test/plugins/r2/validator.spec.ts +++ b/packages/tre/test/plugins/r2/validator.spec.ts @@ -1,5 +1,4 @@ -import { R2Conditional } from "@cloudflare/workers-types/experimental"; -import { R2Object, _testR2Conditional } from "@miniflare/tre"; +import { R2Conditional, R2Object, _testR2Conditional } from "@miniflare/tre"; import test from "ava"; test("testR2Conditional: matches various conditions", (t) => { @@ -20,11 +19,11 @@ test("testR2Conditional: matches various conditions", (t) => { const usingMissing = (cond: R2Conditional) => _testR2Conditional(cond); // Check single conditions - t.true(using({ etagMatches: etag })); - t.false(using({ etagMatches: badEtag })); + t.true(using({ etagMatches: [{ type: "strong", value: etag }] })); + t.false(using({ etagMatches: [{ type: "strong", value: badEtag }] })); - t.true(using({ etagDoesNotMatch: badEtag })); - t.false(using({ etagDoesNotMatch: etag })); + t.true(using({ etagDoesNotMatch: [{ type: "strong", value: badEtag }] })); + t.false(using({ etagDoesNotMatch: [{ type: "strong", value: etag }] })); t.false(using({ uploadedBefore: pastDate })); t.true(using({ uploadedBefore: futureDate })); @@ -32,48 +31,85 @@ test("testR2Conditional: matches various conditions", (t) => { t.true(using({ uploadedAfter: pastDate })); t.false(using({ uploadedBefore: pastDate })); + // Check with weaker etags + t.false(using({ etagMatches: [{ type: "weak", value: etag }] })); + t.false(using({ etagDoesNotMatch: [{ type: "weak", value: etag }] })); + t.true(using({ etagDoesNotMatch: [{ type: "weak", value: badEtag }] })); + t.true(using({ etagMatches: [{ type: "wildcard" }] })); + t.false(using({ etagDoesNotMatch: [{ type: "wildcard" }] })); + // Check multiple conditions that evaluate to false - t.false(using({ etagMatches: etag, etagDoesNotMatch: etag })); - t.false(using({ etagMatches: etag, uploadedAfter: futureDate })); + t.false( + using({ + etagMatches: [{ type: "strong", value: etag }], + etagDoesNotMatch: [{ type: "strong", value: etag }], + }) + ); + t.false( + using({ + etagMatches: [{ type: "strong", value: etag }], + uploadedAfter: futureDate, + }) + ); t.false( using({ // `etagMatches` pass makes `uploadedBefore` pass, but `uploadedAfter` fails - etagMatches: etag, + etagMatches: [{ type: "strong", value: etag }], uploadedAfter: futureDate, uploadedBefore: pastDate, }) ); - t.false(using({ etagDoesNotMatch: badEtag, uploadedBefore: pastDate })); + t.false( + using({ + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedBefore: pastDate, + }) + ); t.false( using({ // `etagDoesNotMatch` pass makes `uploadedAfter` pass, but `uploadedBefore` fails - etagDoesNotMatch: badEtag, + etagDoesNotMatch: [{ type: "strong", value: badEtag }], uploadedAfter: futureDate, uploadedBefore: pastDate, }) ); t.false( using({ - etagMatches: badEtag, - etagDoesNotMatch: badEtag, + etagMatches: [{ type: "strong", value: badEtag }], + etagDoesNotMatch: [{ type: "strong", value: badEtag }], uploadedAfter: pastDate, uploadedBefore: futureDate, }) ); // Check multiple conditions that evaluate to true - t.true(using({ etagMatches: etag, etagDoesNotMatch: badEtag })); + t.true( + using({ + etagMatches: [{ type: "strong", value: etag }], + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + }) + ); // `etagMatches` pass makes `uploadedBefore` pass - t.true(using({ etagMatches: etag, uploadedBefore: pastDate })); + t.true( + using({ + etagMatches: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); // `etagDoesNotMatch` pass makes `uploadedAfter` pass - t.true(using({ etagDoesNotMatch: badEtag, uploadedAfter: futureDate })); + t.true( + using({ + etagDoesNotMatch: [{ type: "strong", value: badEtag }], + uploadedAfter: futureDate, + }) + ); t.true( using({ // `etagMatches` pass makes `uploadedBefore` pass - etagMatches: etag, + etagMatches: [{ type: "strong", value: etag }], uploadedBefore: pastDate, // `etagDoesNotMatch` pass makes `uploadedAfter` pass - etagDoesNotMatch: badEtag, + etagDoesNotMatch: [{ type: "strong", value: badEtag }], uploadedAfter: futureDate, }) ); @@ -81,7 +117,7 @@ test("testR2Conditional: matches various conditions", (t) => { using({ uploadedBefore: futureDate, // `etagDoesNotMatch` pass makes `uploadedAfter` pass - etagDoesNotMatch: badEtag, + etagDoesNotMatch: [{ type: "strong", value: badEtag }], uploadedAfter: futureDate, }) ); @@ -89,20 +125,40 @@ test("testR2Conditional: matches various conditions", (t) => { using({ uploadedAfter: pastDate, // `etagMatches` pass makes `uploadedBefore` pass - etagMatches: etag, + etagMatches: [{ type: "strong", value: etag }], uploadedBefore: pastDate, }) ); // Check missing metadata fails with either `etagMatches` and `uploadedAfter` - t.false(usingMissing({ etagMatches: etag })); + t.false(usingMissing({ etagMatches: [{ type: "strong", value: etag }] })); t.false(usingMissing({ uploadedAfter: pastDate })); - t.false(usingMissing({ etagMatches: etag, uploadedAfter: pastDate })); - t.true(usingMissing({ etagDoesNotMatch: etag })); + t.false( + usingMissing({ + etagMatches: [{ type: "strong", value: etag }], + uploadedAfter: pastDate, + }) + ); + t.true(usingMissing({ etagDoesNotMatch: [{ type: "strong", value: etag }] })); t.true(usingMissing({ uploadedBefore: pastDate })); - t.true(usingMissing({ etagDoesNotMatch: etag, uploadedBefore: pastDate })); - t.false(usingMissing({ etagMatches: etag, uploadedBefore: pastDate })); - t.false(usingMissing({ etagDoesNotMatch: etag, uploadedAfter: pastDate })); + t.true( + usingMissing({ + etagDoesNotMatch: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); + t.false( + usingMissing({ + etagMatches: [{ type: "strong", value: etag }], + uploadedBefore: pastDate, + }) + ); + t.false( + usingMissing({ + etagDoesNotMatch: [{ type: "strong", value: etag }], + uploadedAfter: pastDate, + }) + ); // Check with second granularity const justPastDate = new Date(uploadedDate.getTime() - 250); From 34f810ac05b77c0862cb003c668f88372034651c Mon Sep 17 00:00:00 2001 From: bcoll Date: Fri, 12 May 2023 17:51:03 +0100 Subject: [PATCH 2/2] Get listening port from `workerd`, closes #532 Wait for control messages using the new `--control-fd` flag for signalling when `workerd` is ready to receive requests, and the port it's listening on. This allows us to pass port `0` directly to workerd, and have it allocate a free port, preventing races. It also allows us to remove the repeated readiness probes. :tada: --- package-lock.json | 181 +++++------------- package.json | 2 - packages/tre/package.json | 3 +- packages/tre/src/index.ts | 168 ++++++---------- packages/tre/src/plugins/core/index.ts | 3 - packages/tre/src/runtime/index.ts | 47 ++++- packages/tre/src/shared/sync.ts | 8 + packages/tre/src/wait.ts | 60 ------ packages/tre/src/workers/core/constants.ts | 2 - packages/tre/src/workers/core/entry.worker.ts | 17 -- packages/tre/test/index.spec.ts | 5 - packages/tre/test/plugins/do/index.spec.ts | 5 +- .../tre/test/plugins/queues/index.spec.ts | 13 +- packages/tre/test/shared/sync.spec.ts | 32 ++++ packages/tre/test/test-shared/http.ts | 14 -- packages/tre/test/test-shared/miniflare.ts | 2 - 16 files changed, 197 insertions(+), 365 deletions(-) delete mode 100644 packages/tre/src/wait.ts diff --git a/package-lock.json b/package-lock.json index 066f873f6..71fb34107 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,6 @@ "packages/*" ], "devDependencies": { - "@ava/get-port": "^2.0.0", "@ava/typescript": "^4.0.0", "@microsoft/api-extractor": "^7.33.6", "@types/node": "^18.11.9", @@ -30,7 +29,6 @@ "eslint-plugin-es": "^4.1.0", "eslint-plugin-import": "^2.24.2", "eslint-plugin-prettier": "^3.4.1", - "esm": "^3.2.25", "expect-type": "^0.15.0", "patch-package": "^6.4.7", "prettier": "^2.3.2", @@ -42,34 +40,6 @@ "node": ">=16.13" } }, - "node_modules/@ava/cooperate": { - "version": "1.0.0", - "dev": true, - "license": "MIT", - "dependencies": { - "never": "^1.0.3" - }, - "engines": { - "node": ">=12.22 <13 || >=14.17 <15 || >=16.4 <17 || >=17" - }, - "peerDependencies": { - "ava": "*" - } - }, - "node_modules/@ava/get-port": { - "version": "2.0.0", - "dev": true, - "license": "MIT", - "dependencies": { - "@ava/cooperate": "^1.0.0" - }, - "engines": { - "node": ">=14.19 <15 || >=16.15 <17 || >=18" - }, - "peerDependencies": { - "ava": "*" - } - }, "node_modules/@ava/typescript": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@ava/typescript/-/typescript-4.0.0.tgz", @@ -84,9 +54,9 @@ } }, "node_modules/@cloudflare/workerd-darwin-64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20230419.0.tgz", - "integrity": "sha512-d2yId8NOkbdEC81PV9fWvihFcysukjTZVkEeiBwne/8HXy80QDfp7nOCzBENUC5KexB2eJyEjMus9mL/QO9DFQ==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20230512.0.tgz", + "integrity": "sha512-V80DswMTu0hiVue5BmjlC0cVufXLKk0KbkesbJ0IywHiuGk0f9uEOgwwL91ioOhPu+3Ss/ka5BNxwPXDxKkG3g==", "cpu": [ "x64" ], @@ -99,9 +69,9 @@ } }, "node_modules/@cloudflare/workerd-darwin-arm64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20230419.0.tgz", - "integrity": "sha512-U3JdRPvMaHVjlAGslXb4Vlfk1iIGbzj1q5QU2ml6htQSuqZ2Ie5cPTPLsA+9LJPqOXcXYUgXUkY3AIDja2Mh9g==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20230512.0.tgz", + "integrity": "sha512-HojEqgtCW8FCRQq/ENPsBVv1YoxJVp2kDrC27D7xfwOa2+LCmxh55c2cckxZuGTNAsBIqk6lczq4yQx9xcfSdg==", "cpu": [ "arm64" ], @@ -114,9 +84,9 @@ } }, "node_modules/@cloudflare/workerd-linux-64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20230419.0.tgz", - "integrity": "sha512-XssdB19TaiNh5tQMj+8gUafshVqlIkpleoekGEdzFzKotzPNkTn27E+DZ5HnavfSPMonjWTQYosDoPr5Hx3I0Q==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20230512.0.tgz", + "integrity": "sha512-zhu61wFAyjbO+MtiQjcKDv+HUXYnW3GhGCKW8xKUsCktaXKr/l2Vp/t3VFzF+M8CuFMML5xmE/1gopHB9pIUcA==", "cpu": [ "x64" ], @@ -129,9 +99,9 @@ } }, "node_modules/@cloudflare/workerd-linux-arm64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20230419.0.tgz", - "integrity": "sha512-fQ3wwGvQVWA8YtKsSio0VyWphoLUY3YSw6C7Gs0x6TuLBzO5XWN04IH9BDYlaQCtlBKQpVzzDC8dhIaKgMehLg==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20230512.0.tgz", + "integrity": "sha512-LvW/DFz35ISnkgagE6qra1CyFmak5sPJcOZ01fovtHIQdwtgUrU5Q+mTAoDZy+8yQnVIM8HCXJxe5gKxM9hnxQ==", "cpu": [ "arm64" ], @@ -144,9 +114,9 @@ } }, "node_modules/@cloudflare/workerd-windows-64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20230419.0.tgz", - "integrity": "sha512-lbPIvpdd3j0V1Y8jOgnDiYgGrKFzm6IEXPCvG/ZPnQfYT3oGb/nkZ2aSGVQUVZUgaRMbTWPegdIzTmn1OvRVMA==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20230512.0.tgz", + "integrity": "sha512-OgRmn5FjroPSha/2JgcM8AQg5NpTig8TqDXgdM61Y/7DCCCEuOuMsZSqU1IrYkPU7gtOFvZSQZLgFA4XxbePbA==", "cpu": [ "x64" ], @@ -2427,14 +2397,6 @@ "node": ">=8" } }, - "node_modules/esm": { - "version": "3.2.25", - "dev": true, - "license": "MIT", - "engines": { - "node": ">=6" - } - }, "node_modules/espree": { "version": "9.4.1", "dev": true, @@ -2780,16 +2742,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/get-port": { - "version": "5.1.1", - "license": "MIT", - "engines": { - "node": ">=8" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/get-source": { "version": "2.0.12", "license": "Unlicense", @@ -3717,14 +3669,6 @@ "dev": true, "license": "MIT" }, - "node_modules/never": { - "version": "1.1.0", - "dev": true, - "license": "ISC", - "engines": { - "node": ">=10.18.0 <11 || >=12.14.0 <13 || >=13.5.0" - } - }, "node_modules/nice-try": { "version": "1.0.5", "dev": true, @@ -5208,9 +5152,9 @@ } }, "node_modules/workerd": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/workerd/-/workerd-1.20230419.0.tgz", - "integrity": "sha512-A4/PhSgBqlne16cha1s3P3g2tXVKk0uC9pFyNYZTUc2i6f/uv/LKApTo0pPsNde+yBsPYKdK52uaBoGSs20CfA==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/workerd/-/workerd-1.20230512.0.tgz", + "integrity": "sha512-rueIsVxLTVlqWyaSVHlDKFZRLkDAMmUhxiKXE+guMR3fauwPPsuzs/VKWUqX2sqR2UKF+1JxrUtH9OvaIqoHhA==", "hasInstallScript": true, "bin": { "workerd": "bin/workerd" @@ -5219,11 +5163,11 @@ "node": ">=16" }, "optionalDependencies": { - "@cloudflare/workerd-darwin-64": "1.20230419.0", - "@cloudflare/workerd-darwin-arm64": "1.20230419.0", - "@cloudflare/workerd-linux-64": "1.20230419.0", - "@cloudflare/workerd-linux-arm64": "1.20230419.0", - "@cloudflare/workerd-windows-64": "1.20230419.0" + "@cloudflare/workerd-darwin-64": "1.20230512.0", + "@cloudflare/workerd-darwin-arm64": "1.20230512.0", + "@cloudflare/workerd-linux-64": "1.20230512.0", + "@cloudflare/workerd-linux-arm64": "1.20230512.0", + "@cloudflare/workerd-windows-64": "1.20230512.0" } }, "node_modules/wrap-ansi": { @@ -5483,14 +5427,13 @@ "better-sqlite3": "^8.1.0", "capnp-ts": "^0.7.0", "exit-hook": "^2.2.1", - "get-port": "^5.1.1", "glob-to-regexp": "^0.4.1", "http-cache-semantics": "^4.1.0", "kleur": "^4.1.5", "source-map-support": "0.5.21", "stoppable": "^1.1.0", "undici": "^5.13.0", - "workerd": "^1.20230419.0", + "workerd": "^1.20230512.0", "ws": "^8.11.0", "youch": "^3.2.2", "zod": "^3.20.6" @@ -5514,20 +5457,6 @@ } }, "dependencies": { - "@ava/cooperate": { - "version": "1.0.0", - "dev": true, - "requires": { - "never": "^1.0.3" - } - }, - "@ava/get-port": { - "version": "2.0.0", - "dev": true, - "requires": { - "@ava/cooperate": "^1.0.0" - } - }, "@ava/typescript": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@ava/typescript/-/typescript-4.0.0.tgz", @@ -5539,33 +5468,33 @@ } }, "@cloudflare/workerd-darwin-64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20230419.0.tgz", - "integrity": "sha512-d2yId8NOkbdEC81PV9fWvihFcysukjTZVkEeiBwne/8HXy80QDfp7nOCzBENUC5KexB2eJyEjMus9mL/QO9DFQ==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20230512.0.tgz", + "integrity": "sha512-V80DswMTu0hiVue5BmjlC0cVufXLKk0KbkesbJ0IywHiuGk0f9uEOgwwL91ioOhPu+3Ss/ka5BNxwPXDxKkG3g==", "optional": true }, "@cloudflare/workerd-darwin-arm64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20230419.0.tgz", - "integrity": "sha512-U3JdRPvMaHVjlAGslXb4Vlfk1iIGbzj1q5QU2ml6htQSuqZ2Ie5cPTPLsA+9LJPqOXcXYUgXUkY3AIDja2Mh9g==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20230512.0.tgz", + "integrity": "sha512-HojEqgtCW8FCRQq/ENPsBVv1YoxJVp2kDrC27D7xfwOa2+LCmxh55c2cckxZuGTNAsBIqk6lczq4yQx9xcfSdg==", "optional": true }, "@cloudflare/workerd-linux-64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20230419.0.tgz", - "integrity": "sha512-XssdB19TaiNh5tQMj+8gUafshVqlIkpleoekGEdzFzKotzPNkTn27E+DZ5HnavfSPMonjWTQYosDoPr5Hx3I0Q==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20230512.0.tgz", + "integrity": "sha512-zhu61wFAyjbO+MtiQjcKDv+HUXYnW3GhGCKW8xKUsCktaXKr/l2Vp/t3VFzF+M8CuFMML5xmE/1gopHB9pIUcA==", "optional": true }, "@cloudflare/workerd-linux-arm64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20230419.0.tgz", - "integrity": "sha512-fQ3wwGvQVWA8YtKsSio0VyWphoLUY3YSw6C7Gs0x6TuLBzO5XWN04IH9BDYlaQCtlBKQpVzzDC8dhIaKgMehLg==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20230512.0.tgz", + "integrity": "sha512-LvW/DFz35ISnkgagE6qra1CyFmak5sPJcOZ01fovtHIQdwtgUrU5Q+mTAoDZy+8yQnVIM8HCXJxe5gKxM9hnxQ==", "optional": true }, "@cloudflare/workerd-windows-64": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20230419.0.tgz", - "integrity": "sha512-lbPIvpdd3j0V1Y8jOgnDiYgGrKFzm6IEXPCvG/ZPnQfYT3oGb/nkZ2aSGVQUVZUgaRMbTWPegdIzTmn1OvRVMA==", + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20230512.0.tgz", + "integrity": "sha512-OgRmn5FjroPSha/2JgcM8AQg5NpTig8TqDXgdM61Y/7DCCCEuOuMsZSqU1IrYkPU7gtOFvZSQZLgFA4XxbePbA==", "optional": true }, "@cloudflare/workers-types": { @@ -5829,7 +5758,6 @@ "capnp-ts": "^0.7.0", "devalue": "^4.3.0", "exit-hook": "^2.2.1", - "get-port": "^5.1.1", "glob-to-regexp": "^0.4.1", "http-cache-semantics": "^4.1.0", "kleur": "^4.1.5", @@ -5837,7 +5765,7 @@ "source-map-support": "0.5.21", "stoppable": "^1.1.0", "undici": "^5.13.0", - "workerd": "^1.20230419.0", + "workerd": "^1.20230512.0", "ws": "^8.11.0", "youch": "^3.2.2", "zod": "^3.20.6" @@ -6989,10 +6917,6 @@ "version": "3.3.0", "dev": true }, - "esm": { - "version": "3.2.25", - "dev": true - }, "espree": { "version": "9.4.1", "dev": true, @@ -7211,9 +7135,6 @@ "has-symbols": "^1.0.3" } }, - "get-port": { - "version": "5.1.1" - }, "get-source": { "version": "2.0.12", "requires": { @@ -7742,10 +7663,6 @@ "version": "1.4.0", "dev": true }, - "never": { - "version": "1.1.0", - "dev": true - }, "nice-try": { "version": "1.0.5", "dev": true @@ -8591,15 +8508,15 @@ "dev": true }, "workerd": { - "version": "1.20230419.0", - "resolved": "https://registry.npmjs.org/workerd/-/workerd-1.20230419.0.tgz", - "integrity": "sha512-A4/PhSgBqlne16cha1s3P3g2tXVKk0uC9pFyNYZTUc2i6f/uv/LKApTo0pPsNde+yBsPYKdK52uaBoGSs20CfA==", - "requires": { - "@cloudflare/workerd-darwin-64": "1.20230419.0", - "@cloudflare/workerd-darwin-arm64": "1.20230419.0", - "@cloudflare/workerd-linux-64": "1.20230419.0", - "@cloudflare/workerd-linux-arm64": "1.20230419.0", - "@cloudflare/workerd-windows-64": "1.20230419.0" + "version": "1.20230512.0", + "resolved": "https://registry.npmjs.org/workerd/-/workerd-1.20230512.0.tgz", + "integrity": "sha512-rueIsVxLTVlqWyaSVHlDKFZRLkDAMmUhxiKXE+guMR3fauwPPsuzs/VKWUqX2sqR2UKF+1JxrUtH9OvaIqoHhA==", + "requires": { + "@cloudflare/workerd-darwin-64": "1.20230512.0", + "@cloudflare/workerd-darwin-arm64": "1.20230512.0", + "@cloudflare/workerd-linux-64": "1.20230512.0", + "@cloudflare/workerd-linux-arm64": "1.20230512.0", + "@cloudflare/workerd-windows-64": "1.20230512.0" } }, "wrap-ansi": { diff --git a/package.json b/package.json index 2616fe336..88531ce72 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,6 @@ "types:bundle": "npm run types:build && node scripts/types.mjs" }, "devDependencies": { - "@ava/get-port": "^2.0.0", "@ava/typescript": "^4.0.0", "@microsoft/api-extractor": "^7.33.6", "@types/node": "^18.11.9", @@ -48,7 +47,6 @@ "eslint-plugin-es": "^4.1.0", "eslint-plugin-import": "^2.24.2", "eslint-plugin-prettier": "^3.4.1", - "esm": "^3.2.25", "expect-type": "^0.15.0", "patch-package": "^6.4.7", "prettier": "^2.3.2", diff --git a/packages/tre/package.json b/packages/tre/package.json index 5631f145d..714e0b547 100644 --- a/packages/tre/package.json +++ b/packages/tre/package.json @@ -32,14 +32,13 @@ "better-sqlite3": "^8.1.0", "capnp-ts": "^0.7.0", "exit-hook": "^2.2.1", - "get-port": "^5.1.1", "glob-to-regexp": "^0.4.1", "http-cache-semantics": "^4.1.0", "kleur": "^4.1.5", "source-map-support": "0.5.21", "stoppable": "^1.1.0", "undici": "^5.13.0", - "workerd": "^1.20230419.0", + "workerd": "^1.20230512.0", "ws": "^8.11.0", "youch": "^3.2.2", "zod": "^3.20.6" diff --git a/packages/tre/src/index.ts b/packages/tre/src/index.ts index 0d5b5a798..a72c7feba 100644 --- a/packages/tre/src/index.ts +++ b/packages/tre/src/index.ts @@ -9,7 +9,6 @@ import { Duplex } from "stream"; import { ReadableStream } from "stream/web"; import type { RequestInitCfProperties } from "@cloudflare/workers-types/experimental"; import exitHook from "exit-hook"; -import getPort from "get-port"; import stoppable from "stoppable"; import { WebSocketServer } from "ws"; import { z } from "zod"; @@ -70,9 +69,7 @@ import { defaultTimers, formatResponse, } from "./shared"; -import { anyAbortSignal } from "./shared/signal"; import { NewStorage } from "./storage2"; -import { waitForRequest } from "./wait"; import { CoreHeaders } from "./workers"; // ===== `Miniflare` User Options ===== @@ -307,11 +304,12 @@ function safeReadableStreamFrom(iterable: AsyncIterable) { export class Miniflare { readonly #gatewayFactories: PluginGatewayFactories; readonly #routers: PluginRouters; - #optionsVersion: number; #sharedOpts: PluginSharedOptions; #workerOpts: PluginWorkerOptions[]; #log: Log; readonly #timers: Timers; + readonly #host: string; + readonly #accessibleHost: string; #runtime?: Runtime; #removeRuntimeExitHook?: () => void; @@ -328,21 +326,9 @@ export class Miniflare { // in a queue, ensuring they're performed in calling order. readonly #runtimeMutex: Mutex; - // Additionally, store `Promise`s for the call to `#init()` and the last call - // to `setOptions()`. We need the `#init()` `Promise`, so we can propagate - // initialisation errors in `ready`. We would have no way of catching these - // otherwise. - // - // We store the last `setOptions()` `Promise` as well, so we can avoid - // disposing or resolving `ready` until all pending `setOptions()` have - // completed. Note we only need to store the latest one, as the mutex queue - // will ensure all previous calls complete before starting the latest. - // - // We could just wait on the mutex when disposing/resolving `ready`, but we - // use the presence of waiters on the mutex to avoid logging ready/updated - // messages to the console if there are future updates. + // Store `#init()` `Promise`, so we can propagate initialisation errors in + // `ready`. We would have no way of catching these otherwise. readonly #initPromise: Promise; - #lastUpdatePromise?: Promise; // Aborted when dispose() is called readonly #disposeController: AbortController; @@ -359,11 +345,13 @@ export class Miniflare { // Split and validate options const [sharedOpts, workerOpts] = validateOptions(opts); - this.#optionsVersion = 1; this.#sharedOpts = sharedOpts; this.#workerOpts = workerOpts; this.#log = this.#sharedOpts.core.log ?? new NoOpLog(); this.#timers = this.#sharedOpts.core.timers ?? defaultTimers; + this.#host = this.#sharedOpts.core.host ?? "127.0.0.1"; + this.#accessibleHost = + this.#host === "*" || this.#host === "0.0.0.0" ? "127.0.0.1" : this.#host; this.#initPlugins(); this.#liveReloadServer = new WebSocketServer({ noServer: true }); @@ -442,8 +430,7 @@ export class Miniflare { // Start loopback server (how the runtime accesses with Miniflare's storage) // using the same host as the main runtime server. This means we can use the // loopback server for live reload updates too. - const host = this.#sharedOpts.core.host ?? "127.0.0.1"; - this.#loopbackServer = await this.#startLoopbackServer(0, host); + this.#loopbackServer = await this.#startLoopbackServer(0, this.#host); const address = this.#loopbackServer.address(); // Note address would be string with unix socket assert(address !== null && typeof address === "object"); @@ -451,12 +438,10 @@ export class Miniflare { this.#loopbackPort = address.port; // Start runtime - let entryPort = this.#sharedOpts.core.port; - if (entryPort === 0) entryPort = await getPort(); - else if (entryPort === undefined) entryPort = await getPort({ port: 8787 }); + const port = this.#sharedOpts.core.port ?? 0; const opts: RuntimeOptions = { - entryHost: host, - entryPort, + entryHost: this.#host, + entryPort: port, loopbackPort: this.#loopbackPort, inspectorPort: this.#sharedOpts.core.inspectorPort, verbose: this.#sharedOpts.core.verbose, @@ -464,21 +449,8 @@ export class Miniflare { this.#runtime = new Runtime(opts); this.#removeRuntimeExitHook = exitHook(() => void this.#runtime?.dispose()); - const accessibleHost = - host === "*" || host === "0.0.0.0" ? "127.0.0.1" : host; - this.#runtimeEntryURL = new URL(`http://${accessibleHost}:${entryPort}`); - - const config = await this.#assembleConfig(); - assert(config !== undefined); - const configBuffer = serializeConfig(config); - await this.#runtime.updateConfig(configBuffer); - - // Wait for runtime to start - if ((await this.#waitForRuntime()) && !this.#runtimeMutex.hasWaiting) { - // Only log and trigger reload if there aren't pending updates - this.#log.info(`Ready on ${this.#runtimeEntryURL}`); - this.#handleReload(); - } + // Update config and wait for runtime to start + await this.#assembleAndUpdateConfig(/* initial */ true); } async #handleLoopbackCustomService( @@ -685,46 +657,7 @@ export class Miniflare { }); } - async #waitForRuntime() { - assert(this.#runtime !== undefined); - - // Setup controller aborted when runtime exits - const exitController = new AbortController(); - this.#runtime.exitPromise?.then(() => exitController.abort()); - - // Wait for the runtime to start by repeatedly sending probe HTTP requests - // until either: - // 1) The runtime responds with an OK response - // 2) The runtime exits - // 3) The Miniflare instance is disposed - const signal = anyAbortSignal( - exitController.signal, - this.#disposeController.signal - ); - const url = this.#runtimeEntryURL!; - await waitForRequest({ - hostname: url.hostname, - port: url.port, - headers: { [CoreHeaders.PROBE]: this.#optionsVersion.toString() }, - signal, - }); - - // If we stopped waiting because of reason 2), something's gone wrong - const disposeAborted = this.#disposeController.signal.aborted; - const exitAborted = exitController.signal.aborted; - if (!disposeAborted && exitAborted) { - throw new MiniflareCoreError( - "ERR_RUNTIME_FAILURE", - "The Workers runtime failed to start. " + - "There is likely additional logging output above." - ); - } - - return !(disposeAborted || exitAborted); - } - async #assembleConfig(): Promise { - const optionsVersion = this.#optionsVersion; const allWorkerOpts = this.#workerOpts; const sharedOpts = this.#sharedOpts; const loopbackPort = this.#loopbackPort; @@ -740,7 +673,6 @@ export class Miniflare { // Use Map to dedupe services by name const services = new Map(); const globalServices = getGlobalServices({ - optionsVersion, sharedOptions: sharedOpts.core, allWorkerRoutes, fallbackWorkerName: this.#workerOpts[0].core.name, @@ -819,16 +751,53 @@ export class Miniflare { return { services: Array.from(services.values()), sockets }; } - get ready(): Promise { + async #assembleAndUpdateConfig(initial = false) { + assert(this.#runtime !== undefined); + const config = await this.#assembleConfig(); + const configBuffer = serializeConfig(config); + const maybePort = await this.#runtime.updateConfig(configBuffer, { + signal: this.#disposeController.signal, + }); + if (this.#disposeController.signal.aborted) return; + if (maybePort === undefined) { + throw new MiniflareCoreError( + "ERR_RUNTIME_FAILURE", + "The Workers runtime failed to start. " + + "There is likely additional logging output above." + ); + } + // noinspection HttpUrlsUsage + this.#runtimeEntryURL = new URL( + `http://${this.#accessibleHost}:${maybePort}` + ); + + if (!this.#runtimeMutex.hasWaiting) { + // Only log and trigger reload if there aren't pending updates + const ready = initial ? "Ready" : "Updated and ready"; + this.#log.info(`${ready} on ${this.#runtimeEntryURL}`); + this.#handleReload(); + } + } + + async #waitForReady() { // If `#init()` threw, we'd like to propagate the error here, so `await` it. // Note we can't use `async`/`await` with getters. We'd also like to wait // for `setOptions` calls to complete before resolving. - // - // Safety of `!`: `#runtimeEntryURL` is assigned in `#init()`. - // `#initPromise` doesn't resolve until `#init()` returns. - return this.#initPromise - .then(() => this.#lastUpdatePromise) - .then(() => this.#runtimeEntryURL!); + await this.#initPromise; + // We'd also like to wait for `setOptions` calls to complete before, so wait + // for runtime mutex to drain (i.e. all options updates applied). + // (NOTE: can't just repeatedly wait on the mutex as use the presence of + // waiters on the mutex to avoid logging ready/updated messages to the + // console if there are future updates) + await this.#runtimeMutex.drained(); + // `#runtimeEntryURL` is assigned in `#assembleAndUpdateConfig()`, which is + // called by `#init()`, and `#initPromise` doesn't resolve until `#init()` + // returns. + assert(this.#runtimeEntryURL !== undefined); + return this.#runtimeEntryURL; + } + get ready(): Promise { + return this.#waitForReady(); } #checkDisposed() { @@ -844,36 +813,20 @@ export class Miniflare { // This function must be run with `#runtimeMutex` held // Split and validate options - // TODO: merge with previous config const [sharedOpts, workerOpts] = validateOptions(opts); this.#sharedOpts = sharedOpts; this.#workerOpts = workerOpts; this.#log = this.#sharedOpts.core.log ?? this.#log; - // Increment version, so we know when the runtime has processed updates - this.#optionsVersion++; - // Assemble and serialize config using new version - const config = await this.#assembleConfig(); - const configBuffer = serializeConfig(config); - // Send to runtime and wait for updates to process - assert(this.#runtime !== undefined); - await this.#runtime.updateConfig(configBuffer); - - if ((await this.#waitForRuntime()) && !this.#runtimeMutex.hasWaiting) { - // Only log and trigger reload if this was the last pending update - this.#log.info(`Updated and ready on ${this.#runtimeEntryURL}`); - this.#handleReload(); - } + await this.#assembleAndUpdateConfig(); } setOptions(opts: MiniflareOptions): Promise { this.#checkDisposed(); // Wait for initial initialisation and other setOptions to complete before // changing options - const promise = this.#runtimeMutex.runWith(() => this.#setOptions(opts)); - this.#lastUpdatePromise = promise; - return promise; + return this.#runtimeMutex.runWith(() => this.#setOptions(opts)); } dispatchFetch: DispatchFetch = async (input, init) => { @@ -923,8 +876,7 @@ export class Miniflare { async dispose(): Promise { this.#disposeController.abort(); try { - await this.#initPromise; - await this.#lastUpdatePromise; + await this.ready; } finally { // Remove exit hooks, we're cleaning up what they would've cleaned up now this.#removeTmpPathExitHook(); diff --git a/packages/tre/src/plugins/core/index.ts b/packages/tre/src/plugins/core/index.ts index 3a34db941..e14eccfe6 100644 --- a/packages/tre/src/plugins/core/index.ts +++ b/packages/tre/src/plugins/core/index.ts @@ -305,7 +305,6 @@ export const CORE_PLUGIN: Plugin< }; export interface GlobalServicesOptions { - optionsVersion: number; sharedOptions: z.infer; allWorkerRoutes: Map; fallbackWorkerName: string | undefined; @@ -313,7 +312,6 @@ export interface GlobalServicesOptions { log: Log; } export function getGlobalServices({ - optionsVersion, sharedOptions, allWorkerRoutes, fallbackWorkerName, @@ -327,7 +325,6 @@ export function getGlobalServices({ // Define core/shared services. const serviceEntryBindings: Worker_Binding[] = [ WORKER_BINDING_SERVICE_LOOPBACK, // For converting stack-traces to pretty-error pages - { name: CoreBindings.JSON_VERSION, json: optionsVersion.toString() }, { name: CoreBindings.JSON_ROUTES, json: JSON.stringify(routes) }, { name: CoreBindings.JSON_CF_BLOB, json: JSON.stringify(sharedOptions.cf) }, { name: CoreBindings.JSON_LOG_LEVEL, json: JSON.stringify(log.level) }, diff --git a/packages/tre/src/runtime/index.ts b/packages/tre/src/runtime/index.ts index 4b3a708b8..0dff60f4c 100644 --- a/packages/tre/src/runtime/index.ts +++ b/packages/tre/src/runtime/index.ts @@ -1,12 +1,44 @@ +import assert from "assert"; import childProcess from "child_process"; +import type { Abortable } from "events"; import rl from "readline"; +import { Readable } from "stream"; import { red } from "kleur/colors"; import workerdPath, { compatibilityDate as supportedCompatibilityDate, } from "workerd"; +import { z } from "zod"; import { SERVICE_LOOPBACK, SOCKET_ENTRY } from "../plugins"; import { Awaitable } from "../shared"; +const ControlMessageSchema = z.object({ + event: z.literal("listen"), + socket: z.string(), + port: z.number(), +}); + +async function waitForPort( + socket: string, + stream: Readable, + options?: Abortable +): Promise { + if (options?.signal?.aborted) return; + const lines = rl.createInterface(stream); + // Calling `close()` will end the async iterator below and return undefined + const abortListener = () => lines.close(); + options?.signal?.addEventListener("abort", abortListener, { once: true }); + try { + for await (const line of lines) { + const message = ControlMessageSchema.safeParse(JSON.parse(line)); + if (message.success && message.data.socket === socket) { + return message.data.port; + } + } + } finally { + options?.signal?.removeEventListener("abort", abortListener); + } +} + function waitForExit(process: childProcess.ChildProcess): Promise { return new Promise((resolve) => { process.once("exit", () => resolve()); @@ -53,6 +85,8 @@ export class Runtime { "--experimental", `--socket-addr=${SOCKET_ENTRY}=${this.opts.entryHost}:${this.opts.entryPort}`, `--external-addr=${SERVICE_LOOPBACK}=127.0.0.1:${this.opts.loopbackPort}`, + // Configure extra pipe for receiving control messages (e.g. when ready) + "--control-fd=3", // Read config from stdin "-", ]; @@ -68,23 +102,32 @@ export class Runtime { this.#args = args; } - async updateConfig(configBuffer: Buffer) { + async updateConfig( + configBuffer: Buffer, + options?: Abortable + ): Promise { // 1. Stop existing process (if any) and wait for exit await this.dispose(); // TODO: what happens if runtime crashes? // 2. Start new process const runtimeProcess = childProcess.spawn(this.#command, this.#args, { - stdio: "pipe", + stdio: ["pipe", "pipe", "pipe", "pipe"], env: process.env, }); this.#process = runtimeProcess; this.#processExitPromise = waitForExit(runtimeProcess); pipeOutput(runtimeProcess); + const controlPipe = runtimeProcess.stdio[3]; + assert(controlPipe instanceof Readable); + // 3. Write config runtimeProcess.stdin.write(configBuffer); runtimeProcess.stdin.end(); + + // 4. Wait for socket to start listening + return waitForPort(SOCKET_ENTRY, controlPipe, options); } get exitPromise(): Promise | undefined { diff --git a/packages/tre/src/shared/sync.ts b/packages/tre/src/shared/sync.ts index dc4e5b4e3..c2ff5324d 100644 --- a/packages/tre/src/shared/sync.ts +++ b/packages/tre/src/shared/sync.ts @@ -30,6 +30,7 @@ export class DeferredPromise extends Promise { export class Mutex { private locked = false; private resolveQueue: (() => void)[] = []; + private drainQueue: (() => void)[] = []; private lock(): Awaitable { if (!this.locked) { @@ -45,6 +46,8 @@ export class Mutex { this.resolveQueue.shift()?.(); } else { this.locked = false; + let resolve: (() => void) | undefined; + while ((resolve = this.drainQueue.shift()) !== undefined) resolve(); } } @@ -63,6 +66,11 @@ export class Mutex { this.unlock(); } } + + async drained(): Promise { + if (this.resolveQueue.length === 0) return; + return new Promise((resolve) => this.drainQueue.push(resolve)); + } } export class WaitGroup { diff --git a/packages/tre/src/wait.ts b/packages/tre/src/wait.ts deleted file mode 100644 index 2d2e4dcd5..000000000 --- a/packages/tre/src/wait.ts +++ /dev/null @@ -1,60 +0,0 @@ -import http from "http"; -import type { TimerOptions } from "timers"; -import { setTimeout } from "timers/promises"; - -function attemptDelay(attempts: number) { - if (attempts < 10) return 10; - if (attempts < 20) return 50; - if (attempts < 30) return 100; - return 1000; -} - -// Disable keep-alive for polling requests -const agent = new http.Agent({ keepAlive: false, maxSockets: 1 }); - -function request(options: http.RequestOptions) { - return new Promise((resolve, reject) => { - const req = http.request(options, (res) => { - resolve(res.statusCode ?? 0); - res.destroy(); - }); - req.on("error", (err) => reject(err)); - req.end(); - }); -} - -export async function waitForRequest(options: http.RequestOptions) { - options = { ...options, agent }; - - let attempts = 0; - const signal = options.signal; - const timeoutOptions: TimerOptions = { signal }; - - while (!signal?.aborted) { - try { - const code = await request(options); - if (code !== undefined && 200 <= code && code < 300) return; - } catch (e: any) { - const code = e.code; - if (code === "ABORT_ERR") return; - if ( - // Adapted from https://github.com/dwmkerr/wait-port/blob/0d58d29a6d6b8ea996de9c6829706bb3b0952ee8/lib/wait-port.js - code !== "ECONNREFUSED" && - code !== "ECONNTIMEOUT" && - code !== "ECONNRESET" && - code !== "ENOTFOUND" - ) { - throw e; - } - } - attempts++; - - if (signal?.aborted) return; - try { - await setTimeout(attemptDelay(attempts), undefined, timeoutOptions); - } catch (e: any) { - if (e.code === "ABORT_ERR") return; - throw e; - } - } -} diff --git a/packages/tre/src/workers/core/constants.ts b/packages/tre/src/workers/core/constants.ts index aeebc4436..e3c5d8595 100644 --- a/packages/tre/src/workers/core/constants.ts +++ b/packages/tre/src/workers/core/constants.ts @@ -1,5 +1,4 @@ export const CoreHeaders = { - PROBE: "MF-Probe", CUSTOM_SERVICE: "MF-Custom-Service", ORIGINAL_URL: "MF-Original-URL", ERROR_STACK: "MF-Experimental-Error-Stack", @@ -9,7 +8,6 @@ export const CoreHeaders = { export const CoreBindings = { SERVICE_LOOPBACK: "MINIFLARE_LOOPBACK", - JSON_VERSION: "MINIFLARE_VERSION", SERVICE_USER_ROUTE_PREFIX: "MINIFLARE_USER_ROUTE_", SERVICE_USER_FALLBACK: "MINIFLARE_USER_FALLBACK", TEXT_CUSTOM_SERVICE: "MINIFLARE_CUSTOM_SERVICE", diff --git a/packages/tre/src/workers/core/entry.worker.ts b/packages/tre/src/workers/core/entry.worker.ts index 4af1b8e93..dab719c78 100644 --- a/packages/tre/src/workers/core/entry.worker.ts +++ b/packages/tre/src/workers/core/entry.worker.ts @@ -9,7 +9,6 @@ import { WorkerRoute, matchRoutes } from "./routing"; type Env = { [CoreBindings.SERVICE_LOOPBACK]: Fetcher; - [CoreBindings.JSON_VERSION]: number; [CoreBindings.SERVICE_USER_FALLBACK]: Fetcher; [CoreBindings.TEXT_CUSTOM_SERVICE]: string; [CoreBindings.JSON_CF_BLOB]: IncomingRequestCfProperties; @@ -22,17 +21,6 @@ type Env = { | undefined; // Won't have a `Fetcher` for every possible `string` }; -function maybeCreateProbeResponse(request: Request, env: Env) { - const probe = request.headers.get(CoreHeaders.PROBE); - if (probe === null) return; - - const probeMin = parseInt(probe); - // Using `>=` for version check to handle multiple `setOptions` calls - // before reload complete. - const status = env[CoreBindings.JSON_VERSION] >= probeMin ? 204 : 412; - return new Response(null, { status }); -} - function getUserRequest( request: Request, env: Env @@ -172,12 +160,7 @@ async function handleQueue( export default >{ async fetch(request, env, ctx) { const startTime = Date.now(); - - const maybeProbeResponse = maybeCreateProbeResponse(request, env); - if (maybeProbeResponse !== undefined) return maybeProbeResponse; - request = getUserRequest(request, env); - const url = new URL(request.url); const service = getTargetService(request, url, env); if (service === undefined) { diff --git a/packages/tre/test/index.spec.ts b/packages/tre/test/index.spec.ts index 90d5ca98f..f168c7cd1 100644 --- a/packages/tre/test/index.spec.ts +++ b/packages/tre/test/index.spec.ts @@ -15,7 +15,6 @@ import { MessageEvent as StandardMessageEvent, WebSocketServer, } from "ws"; -import { getPort } from "./test-shared"; test("Miniflare: validates options", async (t) => { // Check empty workers array rejected @@ -57,7 +56,6 @@ test("Miniflare: validates options", async (t) => { test("Miniflare: routes to multiple workers with fallback", async (t) => { const opts: MiniflareOptions = { - port: await getPort(), workers: [ { name: "a", @@ -126,7 +124,6 @@ test("Miniflare: web socket kitchen sink", async (t) => { // Create Miniflare instance with WebSocket worker and custom service binding // fetching from WebSocket origin server const mf = new Miniflare({ - port: await getPort(), script: `addEventListener("fetch", (event) => { event.respondWith(CUSTOM.fetch(event.request)); })`, @@ -176,7 +173,6 @@ test("Miniflare: web socket kitchen sink", async (t) => { test("Miniflare: custom service binding to another Miniflare instance", async (t) => { const mfOther = new Miniflare({ - port: await getPort(), modules: true, script: `export default { async fetch(request) { @@ -189,7 +185,6 @@ test("Miniflare: custom service binding to another Miniflare instance", async (t t.teardown(() => mfOther.dispose()); const mf = new Miniflare({ - port: await getPort(), script: `addEventListener("fetch", (event) => { event.respondWith(CUSTOM.fetch(event.request)); })`, diff --git a/packages/tre/test/plugins/do/index.spec.ts b/packages/tre/test/plugins/do/index.spec.ts index ccb5868ca..5c83f5163 100644 --- a/packages/tre/test/plugins/do/index.spec.ts +++ b/packages/tre/test/plugins/do/index.spec.ts @@ -2,7 +2,7 @@ import fs from "fs/promises"; import path from "path"; import { Miniflare, MiniflareOptions } from "@miniflare/tre"; import test from "ava"; -import { getPort, useTmp } from "../../test-shared"; +import { useTmp } from "../../test-shared"; const COUNTER_SCRIPT = (responsePrefix = "") => `export class Counter { constructor(state) { @@ -26,7 +26,6 @@ export default { test("persists Durable Object data in-memory between options reloads", async (t) => { const opts: MiniflareOptions = { - port: await getPort(), modules: true, script: COUNTER_SCRIPT("Options #1: "), durableObjects: { COUNTER: "Counter" }, @@ -67,7 +66,6 @@ test("persists Durable Object data on file-system", async (t) => { const tmp = await useTmp(t); const opts: MiniflareOptions = { name: "worker", - port: await getPort(), modules: true, script: COUNTER_SCRIPT(), durableObjects: { COUNTER: "Counter" }, @@ -108,7 +106,6 @@ test("persists Durable Object data on file-system", async (t) => { test("multiple Workers access same Durable Object data", async (t) => { const tmp = await useTmp(t); const mf = new Miniflare({ - port: await getPort(), durableObjectsPersist: tmp, workers: [ { diff --git a/packages/tre/test/plugins/queues/index.spec.ts b/packages/tre/test/plugins/queues/index.spec.ts index c8641131d..dac0413bb 100644 --- a/packages/tre/test/plugins/queues/index.spec.ts +++ b/packages/tre/test/plugins/queues/index.spec.ts @@ -8,7 +8,7 @@ import { } from "@miniflare/tre"; import anyTest from "ava"; import { z } from "zod"; -import { LogEntry, TestLog, TestTimers, getPort } from "../../test-shared"; +import { LogEntry, TestLog, TestTimers } from "../../test-shared"; // Only run Queues tests if we're using a supported V8 version const test = _QUEUES_COMPATIBLE_V8_VERSION ? anyTest : anyTest.skip; @@ -21,10 +21,8 @@ const MessageArraySchema = z test("flushes partial and full batches", async (t) => { let batches: string[][] = []; - const port = await getPort(); const timers = new TestTimers(); const mf = new Miniflare({ - port, timers, verbose: true, @@ -157,13 +155,11 @@ test("flushes partial and full batches", async (t) => { }); test("sends all structured cloneable types", async (t) => { - const port = await getPort(); const timers = new TestTimers(); const errorPromise = new DeferredPromise(); const mf = new Miniflare({ - port, timers, verbose: true, @@ -286,11 +282,9 @@ test("retries messages", async (t) => { let errorAll = false; let retryMessages: string[] = []; - const port = await getPort(); const log = new TestLog(t); const timers = new TestTimers(); const mf = new Miniflare({ - port, log, timers, @@ -502,11 +496,9 @@ test("moves to dead letter queue", async (t) => { const batches: z.infer[] = []; let retryMessages: string[] = []; - const port = await getPort(); const log = new TestLog(t); const timers = new TestTimers(); const mf = new Miniflare({ - port, log, timers, verbose: true, @@ -614,7 +606,6 @@ test("moves to dead letter queue", async (t) => { // Check rejects queue as own dead letter queue const promise = mf.setOptions({ - port, log, timers, queueConsumers: { bad: { deadLetterQueue: "bad" } }, @@ -629,11 +620,9 @@ test("moves to dead letter queue", async (t) => { test("operations permit strange queue names", async (t) => { const promise = new DeferredPromise>(); - const port = await getPort(); const timers = new TestTimers(); const id = "my/ Queue"; const mf = new Miniflare({ - port, timers, verbose: true, queueProducers: { QUEUE: id }, diff --git a/packages/tre/test/shared/sync.spec.ts b/packages/tre/test/shared/sync.spec.ts index 9802c958f..17dd8c67e 100644 --- a/packages/tre/test/shared/sync.spec.ts +++ b/packages/tre/test/shared/sync.spec.ts @@ -40,6 +40,38 @@ test("Mutex: lock can be acquired synchronously", (t) => { mutex.runWith(() => (acquired = true)); t.true(acquired); }); +test("Mutex: maintains separate drain queue", async (t) => { + const mutex = new Mutex(); + const deferred1 = new DeferredPromise(); + void mutex.runWith(() => deferred1); + let drained = false; + mutex.drained().then(() => (drained = true)); + t.false(drained); + deferred1.resolve(); + await setTimeout(); + t.true(drained); + + // Check drains don't count as waiters + const deferred2 = new DeferredPromise(); + const deferred3 = new DeferredPromise(); + void mutex.runWith(async () => { + await deferred2; + t.true(mutex.hasWaiting); // next `runWith()` is a waiter + }); + void mutex.runWith(async () => { + await deferred3; + t.false(mutex.hasWaiting); // but `drain()` isn't + }); + drained = false; + mutex.drained().then(() => (drained = true)); + t.false(drained); + deferred2.resolve(); + await setTimeout(); + t.false(drained); + deferred3.resolve(); + await setTimeout(); + t.true(drained); +}); test("WaitGroup: waits for all tasks to complete", async (t) => { const group = new WaitGroup(); diff --git a/packages/tre/test/test-shared/http.ts b/packages/tre/test/test-shared/http.ts index 94631fbc8..4b3a1da51 100644 --- a/packages/tre/test/test-shared/http.ts +++ b/packages/tre/test/test-shared/http.ts @@ -1,23 +1,9 @@ import http from "http"; import { AddressInfo } from "net"; import { URL } from "url"; -import { isMainThread } from "worker_threads"; import { ExecutionContext } from "ava"; import NodeWebSocket, { WebSocketServer } from "ws"; -// Returns a free port, whilst preventing races between test files -export async function getPort(): Promise { - if (isMainThread) { - // eslint-disable-next-line es/no-dynamic-import - return (await import("get-port")).default(); - } else { - // `@ava/get-port` is ESM-only, but we bundle to CommonJS, so use a dynamic - // `import` here. - // eslint-disable-next-line es/no-dynamic-import - return (await import("@ava/get-port")).default(); - } -} - export async function useServer( t: ExecutionContext, listener: http.RequestListener, diff --git a/packages/tre/test/test-shared/miniflare.ts b/packages/tre/test/test-shared/miniflare.ts index d17a6aebc..12679b992 100644 --- a/packages/tre/test/test-shared/miniflare.ts +++ b/packages/tre/test/test-shared/miniflare.ts @@ -6,7 +6,6 @@ import type { } from "@cloudflare/workers-types/experimental"; import { Awaitable, Miniflare, MiniflareOptions, Timers } from "@miniflare/tre"; import anyTest, { TestFn } from "ava"; -import { getPort } from "./http"; import { TestLog } from "./log"; export type TestMiniflareHandler = ( @@ -135,7 +134,6 @@ export function miniflareTest< const opts: Partial = { ...scriptOpts, - port: await getPort(), log, timers, verbose: true,