Skip to content

Commit 137d2da

Browse files
Support Tail Workers in local dev (#9083)
* Tail Workers support in local ded * tests * fix snapshots * Create wild-boxes-invite.md * Apply suggestions from code review Co-authored-by: Carmen Popoviciu <cpopoviciu@cloudflare.com> * fix snapshots * tails -> tailConsumers * Update packages/wrangler/e2e/__snapshots__/pages-dev.test.ts.snap Co-authored-by: Carmen Popoviciu <cpopoviciu@cloudflare.com> * fix snapshots * fix snapshots --------- Co-authored-by: Carmen Popoviciu <cpopoviciu@cloudflare.com>
1 parent 478d79d commit 137d2da

File tree

26 files changed

+520
-199
lines changed

26 files changed

+520
-199
lines changed

.changeset/wild-boxes-invite.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"miniflare": minor
3+
"wrangler": minor
4+
---
5+
6+
Support Tail Workers in local dev

fixtures/worker-app/src/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,7 @@ export default {
101101
ctx.waitUntil(Promise.resolve(event.scheduledTime));
102102
ctx.waitUntil(Promise.resolve(event.cron));
103103
},
104+
tail(events) {
105+
console.log("tails", { events });
106+
},
104107
};

packages/miniflare/src/plugins/core/index.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ const CoreOptionsSchemaInput = z.intersection(
155155
Router Worker)
156156
*/
157157
hasAssetsAndIsVitest: z.boolean().optional(),
158+
159+
tails: z.array(ServiceDesignatorSchema).optional(),
158160
})
159161
);
160162
export const CoreOptionsSchema = CoreOptionsSchemaInput.transform((value) => {
@@ -590,6 +592,7 @@ export const CORE_PLUGIN: Plugin<
590592

591593
const services: Service[] = [];
592594
const extensions: Extension[] = [];
595+
593596
if (isWrappedBinding) {
594597
const stringName = JSON.stringify(name);
595598
function invalidWrapped(reason: string): never {
@@ -700,6 +703,19 @@ export const CORE_PLUGIN: Plugin<
700703
sharedOptions.unsafeModuleFallbackService !== undefined
701704
? `localhost:${loopbackPort}`
702705
: undefined,
706+
tails:
707+
options.tails === undefined
708+
? undefined
709+
: options.tails.map<ServiceDesignator>((service) => {
710+
return getCustomServiceDesignator(
711+
/* referrer */ options.name,
712+
workerIndex,
713+
CustomServiceKind.UNKNOWN,
714+
name,
715+
service,
716+
options.hasAssetsAndIsVitest
717+
);
718+
}),
703719
},
704720
});
705721
}
@@ -716,6 +732,18 @@ export const CORE_PLUGIN: Plugin<
716732
if (maybeService !== undefined) services.push(maybeService);
717733
}
718734
}
735+
736+
if (options.tails !== undefined) {
737+
for (const service of options.tails) {
738+
const maybeService = maybeGetCustomServiceService(
739+
workerIndex,
740+
CustomServiceKind.UNKNOWN,
741+
name,
742+
service
743+
);
744+
if (maybeService !== undefined) services.push(maybeService);
745+
}
746+
}
719747
if (options.outboundService !== undefined) {
720748
const maybeService = maybeGetCustomServiceService(
721749
workerIndex,

packages/miniflare/src/runtime/config/workerd.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export type Worker = (
6363
durableObjectUniqueKeyModifier?: string;
6464
durableObjectStorage?: Worker_DurableObjectStorage;
6565
moduleFallback?: string;
66+
tails?: ServiceDesignator[];
6667
};
6768

6869
export type Worker_DurableObjectStorage =

packages/miniflare/test/index.spec.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,58 @@ test("Miniflare: service binding to named entrypoint that implements a method re
921921
t.deepEqual(rpcTarget.id, "test-id");
922922
});
923923

924+
test("Miniflare: tail consumer called", async (t) => {
925+
const mf = new Miniflare({
926+
workers: [
927+
{
928+
name: "a",
929+
tails: ["b"],
930+
compatibilityDate: "2025-04-28",
931+
modules: true,
932+
script: `
933+
934+
export default {
935+
async fetch(request, env) {
936+
if(request.url.includes("b")) { return env.B.fetch(request)}
937+
console.log("log event")
938+
939+
return new Response("hello from a");
940+
}
941+
}
942+
`,
943+
serviceBindings: {
944+
B: "b",
945+
},
946+
},
947+
{
948+
name: "b",
949+
modules: true,
950+
compatibilityDate: "2025-04-28",
951+
952+
script: `
953+
let event;
954+
export default {
955+
fetch() {return Response.json(event)},
956+
tail(e) {event = e }
957+
};
958+
`,
959+
},
960+
],
961+
});
962+
t.teardown(() => mf.dispose());
963+
964+
const res = await mf.dispatchFetch("http://placeholder");
965+
t.deepEqual(await res.text(), "hello from a");
966+
t.deepEqual(
967+
(
968+
(await (await mf.dispatchFetch("http://placeholder/b")).json()) as {
969+
logs: { message: string[] }[];
970+
}[]
971+
)[0].logs[0].message,
972+
["log event"]
973+
);
974+
});
975+
924976
test("Miniflare: custom outbound service", async (t) => {
925977
const mf = new Miniflare({
926978
workers: [

packages/wrangler/e2e/__snapshots__/pages-dev.test.ts.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
exports[`Pages 'wrangler pages dev' > should merge (with override) \`wrangler.toml\` configuration with configuration provided via the command line, with command line args taking precedence 1`] = `
44
"✨ Compiled Worker successfully
55
Your Worker and resources are simulated locally via Miniflare. For more information, see: https://developers.cloudflare.com/workers/testing/local-development.
6-
Your worker has access to the following bindings:
6+
Your Worker has access to the following bindings:
77
- Durable Objects:
88
- DO_BINDING_1_TOML: NEW_DO_1 (defined in NEW_DO_SCRIPT_1 [not connected])
99
- DO_BINDING_2_TOML: DO_2_TOML (defined in DO_SCRIPT_2_TOML [not connected])
@@ -30,7 +30,7 @@ Your worker has access to the following bindings:
3030
- VAR1: "(hidden)"
3131
- VAR2: "VAR_2_TOML"
3232
- VAR3: "(hidden)"
33-
Service bindings & durable object bindings connect to other \`wrangler dev\` processes running locally, with their connection status indicated by [connected] or [not connected]. For more details, refer to https://developers.cloudflare.com/workers/runtime-apis/bindings/service-bindings/#local-development
33+
Service bindings, Durable Object bindings, and Tail consumers connect to other \`wrangler dev\` processes running locally, with their connection status indicated by [connected] or [not connected]. For more details, refer to https://developers.cloudflare.com/workers/runtime-apis/bindings/service-bindings/#local-development
3434
▲ [WARNING] Using Workers AI always accesses your Cloudflare account in order to run AI models, and so will incur usage charges even in local development.
3535
"
3636
`;

packages/wrangler/e2e/deployments.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ Uploaded 2 of 3 assets
373373
Uploaded 3 of 3 assets
374374
✨ Success! Uploaded 3 files (TIMINGS)
375375
Total Upload: xx KiB / gzip: xx KiB
376-
Your worker has access to the following bindings:
376+
Your Worker has access to the following bindings:
377377
- Assets:
378378
- Binding: ASSETS
379379
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)
@@ -540,7 +540,7 @@ Uploaded 2 of 3 assets
540540
Uploaded 3 of 3 assets
541541
✨ Success! Uploaded 3 files (TIMINGS)
542542
Total Upload: xx KiB / gzip: xx KiB
543-
Your worker has access to the following bindings:
543+
Your Worker has access to the following bindings:
544544
- Assets:
545545
- Binding: ASSETS
546546
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)
@@ -662,7 +662,7 @@ Current Version ID: 00000000-0000-0000-0000-000000000000`);
662662
);
663663
normalizedStdout = normalize(output.stdout);
664664
expect(normalizedStdout).toEqual(`Total Upload: xx KiB / gzip: xx KiB
665-
Your worker has access to the following bindings:
665+
Your Worker has access to the following bindings:
666666
- Dispatch Namespaces:
667667
- DISPATCH: tmp-e2e-dispatch-00000000-0000-0000-0000-000000000000
668668
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)
@@ -762,7 +762,7 @@ Uploaded 2 of 3 assets
762762
Uploaded 3 of 3 assets
763763
✨ Success! Uploaded 3 files (TIMINGS)
764764
Total Upload: xx KiB / gzip: xx KiB
765-
Your worker has access to the following bindings:
765+
Your Worker has access to the following bindings:
766766
- Assets:
767767
- Binding: ASSETS
768768
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)
@@ -775,7 +775,7 @@ Current Version ID: 00000000-0000-0000-0000-000000000000`);
775775
);
776776
normalizedStdout = normalize(output.stdout);
777777
expect(normalizedStdout).toEqual(`Total Upload: xx KiB / gzip: xx KiB
778-
Your worker has access to the following bindings:
778+
Your Worker has access to the following bindings:
779779
- Dispatch Namespaces:
780780
- DISPATCH: tmp-e2e-dispatch-00000000-0000-0000-0000-000000000000
781781
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)
@@ -873,7 +873,7 @@ Uploaded 2 of 3 assets
873873
Uploaded 3 of 3 assets
874874
✨ Success! Uploaded 3 files (TIMINGS)
875875
Total Upload: xx KiB / gzip: xx KiB
876-
Your worker has access to the following bindings:
876+
Your Worker has access to the following bindings:
877877
- Assets:
878878
- Binding: ASSETS
879879
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)
@@ -886,7 +886,7 @@ Current Version ID: 00000000-0000-0000-0000-000000000000`);
886886
);
887887
normalizedStdout = normalize(output.stdout);
888888
expect(normalizedStdout).toEqual(`Total Upload: xx KiB / gzip: xx KiB
889-
Your worker has access to the following bindings:
889+
Your Worker has access to the following bindings:
890890
- Dispatch Namespaces:
891891
- DISPATCH: tmp-e2e-dispatch-00000000-0000-0000-0000-000000000000
892892
Uploaded tmp-e2e-worker-00000000-0000-0000-0000-000000000000 (TIMINGS)

packages/wrangler/e2e/dev-registry.test.ts

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ describe.each([{ cmd: "wrangler dev" }])("dev registry $cmd", ({ cmd }) => {
302302
);
303303

304304
expect(normalizeOutput(workerA.currentOutput)).toContain(
305-
"bindings connect to other `wrangler dev` processes running locally"
305+
"connect to other `wrangler dev` processes running locally"
306306
);
307307
});
308308

@@ -375,6 +375,69 @@ describe.each([{ cmd: "wrangler dev" }])("dev registry $cmd", ({ cmd }) => {
375375
);
376376
});
377377

378+
describe("Tail consumers", () => {
379+
beforeEach(async () => {
380+
await baseSeed(a, {
381+
"wrangler.toml": dedent`
382+
name = "${workerName}"
383+
main = "src/index.ts"
384+
compatibility_date = "2025-04-28"
385+
386+
[[tail_consumers]]
387+
service = "${workerName2}"
388+
`,
389+
"src/index.ts": dedent/* javascript */ `
390+
export default {
391+
async fetch(req, env) {
392+
console.log("log something")
393+
return new Response("hello from a")
394+
},
395+
};
396+
`,
397+
});
398+
399+
b = await makeRoot();
400+
await baseSeed(b, {
401+
"wrangler.toml": dedent`
402+
name = "${workerName2}"
403+
main = "src/index.ts"
404+
compatibility_date = "2025-04-28"
405+
`,
406+
"src/index.ts": dedent/* javascript */ `
407+
export default {
408+
async tail(event) {
409+
console.log("received tail event", event)
410+
},
411+
};
412+
`,
413+
});
414+
});
415+
416+
it("can fetch a without b running", async () => {
417+
const workerA = helper.runLongLived(cmd, { cwd: a });
418+
const { url } = await workerA.waitForReady(5_000);
419+
420+
await expect(fetchText(`${url}`)).resolves.toBe("hello from a");
421+
});
422+
423+
it("tail event sent to b", async () => {
424+
const workerA = helper.runLongLived(cmd, { cwd: a });
425+
const { url } = await workerA.waitForReady(5_000);
426+
427+
const workerB = helper.runLongLived(cmd, { cwd: b });
428+
429+
await workerA.readUntil(/connected/);
430+
431+
await expect(fetchText(`${url}`)).resolves.toBe("hello from a");
432+
433+
await vi.waitFor(
434+
async () =>
435+
expect(workerB.currentOutput).includes("received tail event"),
436+
{ interval: 1000, timeout: 10_000 }
437+
);
438+
});
439+
});
440+
378441
describe("durable objects", () => {
379442
beforeEach(async () => {
380443
await baseSeed(a, {
@@ -526,7 +589,7 @@ describe.each([{ cmd: "wrangler dev" }])("dev registry $cmd", ({ cmd }) => {
526589
);
527590

528591
expect(normalizeOutput(workerA.currentOutput)).toContain(
529-
"bindings connect to other `wrangler dev` processes running locally"
592+
"connect to other `wrangler dev` processes running locally"
530593
);
531594
});
532595

packages/wrangler/e2e/multiworker-dev.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,69 @@ describe("multiworker", () => {
389389
});
390390
});
391391

392+
describe("Tail consumers", () => {
393+
beforeEach(async () => {
394+
await baseSeed(a, {
395+
"wrangler.toml": dedent`
396+
name = "${workerName}"
397+
main = "src/index.ts"
398+
compatibility_date = "2025-04-28"
399+
400+
[[tail_consumers]]
401+
service = "${workerName2}"
402+
`,
403+
"src/index.ts": dedent/* javascript */ `
404+
export default {
405+
async fetch(req, env) {
406+
console.log("log something")
407+
return new Response("hello from a")
408+
},
409+
};
410+
`,
411+
});
412+
413+
b = await makeRoot();
414+
await baseSeed(b, {
415+
"wrangler.toml": dedent`
416+
name = "${workerName2}"
417+
main = "src/index.ts"
418+
compatibility_date = "2025-04-28"
419+
`,
420+
"src/index.ts": dedent/* javascript */ `
421+
export default {
422+
async tail(event) {
423+
console.log("received tail event", event)
424+
},
425+
};
426+
`,
427+
});
428+
});
429+
430+
it("can fetch a without b running", async () => {
431+
const worker = helper.runLongLived(`wrangler dev`, { cwd: a });
432+
433+
const { url } = await worker.waitForReady(5_000);
434+
435+
await expect(fetchText(`${url}`)).resolves.toBe("hello from a");
436+
});
437+
438+
it("tail event sent to b", async () => {
439+
const worker = helper.runLongLived(
440+
`wrangler dev -c wrangler.toml -c ${b}/wrangler.toml`,
441+
{ cwd: a }
442+
);
443+
const { url } = await worker.waitForReady(5_000);
444+
445+
await expect(fetchText(`${url}`)).resolves.toBe("hello from a");
446+
447+
await vi.waitFor(
448+
async () =>
449+
expect(worker.currentOutput).includes("received tail event"),
450+
{ interval: 1000, timeout: 10_000 }
451+
);
452+
});
453+
});
454+
392455
describe("pages", () => {
393456
beforeEach(async () => {
394457
await baseSeed(a, {

packages/wrangler/e2e/pages-dev.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ describe.sequential.each([{ cmd: "wrangler pages dev" }])(
119119
);
120120
await worker.waitForReady();
121121
expect(normalizeOutput(worker.currentOutput)).toContain(
122-
dedent`Your worker has access to the following bindings:
122+
dedent`Your Worker has access to the following bindings:
123123
- Durable Objects:
124124
- TEST_DO: TestDurableObject (defined in a [not connected])
125125
- KV Namespaces:
@@ -325,7 +325,7 @@ describe.sequential.each([{ cmd: "wrangler pages dev" }])(
325325

326326
expect(text).toBe("⚡️ Pages ⚡️ supports wrangler.toml");
327327
expect(normalizeOutput(worker.currentOutput)).toContain(
328-
dedent`Your worker has access to the following bindings:
328+
dedent`Your Worker has access to the following bindings:
329329
- KV Namespaces:
330330
- KV_BINDING_TOML: KV_ID_TOML [simulated locally]
331331
- Vars:

0 commit comments

Comments
 (0)