Skip to content

Commit 726ad2c

Browse files
authored
propagate tracing info with DurableQueue (#5823)
1 parent 2519056 commit 726ad2c

File tree

1 file changed

+30
-3
lines changed

1 file changed

+30
-3
lines changed

packages/workflow/src/DurableQueue.ts

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* @since 1.0.0
33
*/
44
import * as PersistedQueue from "@effect/experimental/PersistedQueue"
5+
import * as Context from "effect/Context"
56
import * as Effect from "effect/Effect"
67
import * as Layer from "effect/Layer"
78
import * as Schedule from "effect/Schedule"
@@ -130,11 +131,17 @@ const getQueueSchema = <Payload extends Schema.Schema.Any>(
130131
): Schema.Struct<{
131132
token: typeof Schema.String
132133
payload: Payload
134+
traceId: typeof Schema.String
135+
spanId: typeof Schema.String
136+
sampled: typeof Schema.Boolean
133137
}> => {
134138
let schema = queueSchemas.get(payload)
135139
if (!schema) {
136140
schema = Schema.Struct({
137141
token: Schema.String,
142+
traceId: Schema.String,
143+
spanId: Schema.String,
144+
sampled: Schema.Boolean,
138145
payload
139146
})
140147
queueSchemas.set(payload, schema)
@@ -182,14 +189,18 @@ export const process: <
182189
yield* Activity.make({
183190
name: `DurableQueue/${self.name}/${key}`,
184191
execute: Effect.gen(function*() {
192+
const span = yield* Effect.orDie(Effect.currentSpan)
185193
const queue = yield* PersistedQueue.make({
186194
name: `DurableQueue/${self.name}`,
187195
schema: getQueueSchema(self.payloadSchema)
188196
})
189197
const token = yield* DurableDeferred.token(deferred)
190198
yield* queue.offer({
191199
token,
192-
payload
200+
payload,
201+
traceId: span.traceId,
202+
spanId: span.spanId,
203+
sampled: span.sampled
193204
} as any).pipe(
194205
Effect.tapErrorCause(Effect.logWarning),
195206
Effect.catchTag("ParseError", Effect.die),
@@ -253,7 +264,13 @@ export const makeWorker: <
253264
const concurrency = options?.concurrency ?? 1
254265

255266
const worker = queue.take((item_) => {
256-
const item = item_ as any as { token: DurableDeferred.Token; payload: Payload["Type"] }
267+
const item = item_ as any as {
268+
token: DurableDeferred.Token
269+
payload: Payload["Type"]
270+
traceId: string
271+
spanId: string
272+
sampled: boolean
273+
}
257274
return f(item.payload).pipe(
258275
Effect.exit,
259276
Effect.flatMap((exit) =>
@@ -262,7 +279,17 @@ export const makeWorker: <
262279
exit
263280
})
264281
),
265-
Effect.asVoid
282+
Effect.asVoid,
283+
Effect.withSpan(`DurableQueue/${self.name}/worker`, {
284+
captureStackTrace: false,
285+
parent: {
286+
_tag: "ExternalSpan",
287+
traceId: item.traceId,
288+
spanId: item.spanId,
289+
sampled: item.sampled,
290+
context: Context.empty()
291+
}
292+
})
266293
)
267294
}).pipe(
268295
Effect.catchAllCause(Effect.logWarning),

0 commit comments

Comments
 (0)