From 1f2b6099e16eb7fb870eaea30355ae6b8e2d522b Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Fri, 28 Nov 2025 11:12:12 +1300 Subject: [PATCH] propagate tracing info with DurableQueue --- packages/workflow/src/DurableQueue.ts | 33 ++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/packages/workflow/src/DurableQueue.ts b/packages/workflow/src/DurableQueue.ts index dc3e522cec0..d02f91f6150 100644 --- a/packages/workflow/src/DurableQueue.ts +++ b/packages/workflow/src/DurableQueue.ts @@ -2,6 +2,7 @@ * @since 1.0.0 */ import * as PersistedQueue from "@effect/experimental/PersistedQueue" +import * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" import * as Schedule from "effect/Schedule" @@ -130,11 +131,17 @@ const getQueueSchema = ( ): Schema.Struct<{ token: typeof Schema.String payload: Payload + traceId: typeof Schema.String + spanId: typeof Schema.String + sampled: typeof Schema.Boolean }> => { let schema = queueSchemas.get(payload) if (!schema) { schema = Schema.Struct({ token: Schema.String, + traceId: Schema.String, + spanId: Schema.String, + sampled: Schema.Boolean, payload }) queueSchemas.set(payload, schema) @@ -182,6 +189,7 @@ export const process: < yield* Activity.make({ name: `DurableQueue/${self.name}/${key}`, execute: Effect.gen(function*() { + const span = yield* Effect.orDie(Effect.currentSpan) const queue = yield* PersistedQueue.make({ name: `DurableQueue/${self.name}`, schema: getQueueSchema(self.payloadSchema) @@ -189,7 +197,10 @@ export const process: < const token = yield* DurableDeferred.token(deferred) yield* queue.offer({ token, - payload + payload, + traceId: span.traceId, + spanId: span.spanId, + sampled: span.sampled } as any).pipe( Effect.tapErrorCause(Effect.logWarning), Effect.catchTag("ParseError", Effect.die), @@ -253,7 +264,13 @@ export const makeWorker: < const concurrency = options?.concurrency ?? 1 const worker = queue.take((item_) => { - const item = item_ as any as { token: DurableDeferred.Token; payload: Payload["Type"] } + const item = item_ as any as { + token: DurableDeferred.Token + payload: Payload["Type"] + traceId: string + spanId: string + sampled: boolean + } return f(item.payload).pipe( Effect.exit, Effect.flatMap((exit) => @@ -262,7 +279,17 @@ export const makeWorker: < exit }) ), - Effect.asVoid + Effect.asVoid, + Effect.withSpan(`DurableQueue/${self.name}/worker`, { + captureStackTrace: false, + parent: { + _tag: "ExternalSpan", + traceId: item.traceId, + spanId: item.spanId, + sampled: item.sampled, + context: Context.empty() + } + }) ) }).pipe( Effect.catchAllCause(Effect.logWarning),