Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions packages/workflow/src/DurableQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* @since 1.0.0
*/
import * as PersistedQueue from "@effect/experimental/PersistedQueue"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Schedule from "effect/Schedule"
Expand Down Expand Up @@ -130,11 +131,17 @@ const getQueueSchema = <Payload extends Schema.Schema.Any>(
): Schema.Struct<{
token: typeof Schema.String
payload: Payload
traceId: typeof Schema.String
spanId: typeof Schema.String
sampled: typeof Schema.Boolean
}> => {
let schema = queueSchemas.get(payload)
if (!schema) {
schema = Schema.Struct({
token: Schema.String,
traceId: Schema.String,
spanId: Schema.String,
sampled: Schema.Boolean,
payload
})
queueSchemas.set(payload, schema)
Expand Down Expand Up @@ -182,14 +189,18 @@ export const process: <
yield* Activity.make({
name: `DurableQueue/${self.name}/${key}`,
execute: Effect.gen(function*() {
const span = yield* Effect.orDie(Effect.currentSpan)
const queue = yield* PersistedQueue.make({
name: `DurableQueue/${self.name}`,
schema: getQueueSchema(self.payloadSchema)
})
const token = yield* DurableDeferred.token(deferred)
yield* queue.offer({
token,
payload
payload,
traceId: span.traceId,
spanId: span.spanId,
sampled: span.sampled
} as any).pipe(
Effect.tapErrorCause(Effect.logWarning),
Effect.catchTag("ParseError", Effect.die),
Expand Down Expand Up @@ -253,7 +264,13 @@ export const makeWorker: <
const concurrency = options?.concurrency ?? 1

const worker = queue.take((item_) => {
const item = item_ as any as { token: DurableDeferred.Token; payload: Payload["Type"] }
const item = item_ as any as {
token: DurableDeferred.Token
payload: Payload["Type"]
traceId: string
spanId: string
sampled: boolean
}
return f(item.payload).pipe(
Effect.exit,
Effect.flatMap((exit) =>
Expand All @@ -262,7 +279,17 @@ export const makeWorker: <
exit
})
),
Effect.asVoid
Effect.asVoid,
Effect.withSpan(`DurableQueue/${self.name}/worker`, {
captureStackTrace: false,
parent: {
_tag: "ExternalSpan",
traceId: item.traceId,
spanId: item.spanId,
sampled: item.sampled,
context: Context.empty()
}
})
)
}).pipe(
Effect.catchAllCause(Effect.logWarning),
Expand Down