diff --git a/.changeset/cute-vans-read.md b/.changeset/cute-vans-read.md new file mode 100644 index 00000000000..b0c150f3ce2 --- /dev/null +++ b/.changeset/cute-vans-read.md @@ -0,0 +1,6 @@ +--- +"@effect/experimental": patch +"@effect/sql": patch +--- + +support idempotent offers to PersistedQueue diff --git a/.changeset/full-breads-rule.md b/.changeset/full-breads-rule.md new file mode 100644 index 00000000000..4cfdffe2843 --- /dev/null +++ b/.changeset/full-breads-rule.md @@ -0,0 +1,5 @@ +--- +"@effect/workflow": minor +--- + +add Activity.idempotencyKey diff --git a/packages/experimental/src/PersistedQueue.ts b/packages/experimental/src/PersistedQueue.ts index 7ab80160464..f70ba7074f4 100644 --- a/packages/experimental/src/PersistedQueue.ts +++ b/packages/experimental/src/PersistedQueue.ts @@ -31,8 +31,13 @@ export interface PersistedQueue { /** * Adds an element to the queue. Returns the id of the enqueued element. + * + * If an element with the same id already exists in the queue, it will not be + * added again. */ - readonly offer: (value: A) => Effect.Effect + readonly offer: (value: A, options?: { + readonly id: string | undefined + }) => Effect.Effect /** * Takes an element from the queue. @@ -40,7 +45,7 @@ export interface PersistedQueue { * * If the returned effect succeeds, the element is marked as processed, * otherwise it will be retried according to the provided options. - * ake + * * By default, max attempts is set to 10. */ readonly take: ( @@ -98,12 +103,20 @@ export const makeFactory = Effect.gen(function*() { return Effect.succeed>({ [TypeId]: TypeId, - offer: (value) => + offer: (value, opts) => Effect.flatMap( encodeUnknown(value), (element) => { - const id = crypto.randomUUID() - return Effect.as(store.offer(options.name, id, element), id) + const id = opts?.id ?? crypto.randomUUID() + return Effect.as( + store.offer({ + name: options.name, + id, + element, + isCustomId: opts?.id !== undefined + }), + id + ) } ), take: (f, opts) => @@ -174,7 +187,14 @@ export class PersistedQueueError extends Schema.TaggedError export class PersistedQueueStore extends Context.Tag("@effect/experimental/PersistedQueue/PersistedQueueStore")< PersistedQueueStore, { - readonly offer: (name: string, id: string, element: unknown) => Effect.Effect + readonly offer: ( + options: { + readonly name: string + readonly id: string + readonly element: unknown + readonly isCustomId: boolean + } + ) => Effect.Effect readonly take: (options: { readonly name: string @@ -203,6 +223,7 @@ export const layerStoreMemory: Layer.Layer< attempts: number readonly element: unknown } + const ids = new Set() const queues = new Map @@ -220,10 +241,12 @@ export const layerStoreMemory: Layer.Layer< } return PersistedQueueStore.of({ - offer: (name, id, element) => + offer: (options) => Effect.sync(() => { - const queue = getOrCreateQueue(name) - queue.items.add({ id, attempts: 0, element }) + if (ids.has(options.id)) return + ids.add(options.id) + const queue = getOrCreateQueue(options.name) + queue.items.add({ id: options.id, attempts: 0, element: options.element }) queue.latch.unsafeOpen() }), take: Effect.fnUntraced(function*(options) { diff --git a/packages/experimental/src/PersistedQueue/Redis.ts b/packages/experimental/src/PersistedQueue/Redis.ts index 8b91e17433d..678dc0f0662 100644 --- a/packages/experimental/src/PersistedQueue/Redis.ts +++ b/packages/experimental/src/PersistedQueue/Redis.ts @@ -17,6 +17,12 @@ import { Redis } from "ioredis" import * as PersistedQueue from "../PersistedQueue.js" interface RedisWithQueue extends Redis { + offer( + keyQueue: string, + keyIds: string, + id: string, + payload: string + ): Promise resetQueue( keyQueue: string, keyPending: string, @@ -70,6 +76,22 @@ export const make = Effect.fnUntraced(function*( (redis) => Effect.promise(() => redis.quit()) ) + redis.defineCommand("offer", { + lua: ` +local key_queue = KEYS[1] +local key_ids = KEYS[2] +local id = ARGV[1] +local payload = ARGV[2] + +local result = redis.call("SADD", key_ids, id) +if result == 1 then + redis.call("RPUSH", key_queue, payload) +end +`, + numberOfKeys: 2, + readOnly: false + }) + redis.defineCommand("resetQueue", { lua: ` local key_queue = KEYS[1] @@ -273,9 +295,17 @@ return payloads ) return PersistedQueue.PersistedQueueStore.of({ - offer: (name, id, element) => + offer: ({ element, id, isCustomId, name }) => Effect.tryPromise({ - try: () => redis.lpush(`${prefix}${name}`, JSON.stringify({ id, element, attempts: 0 })), + try: (): Promise => + isCustomId + ? redis.offer( + `${prefix}${name}`, + `${prefix}${name}:ids`, + id, + JSON.stringify({ id, element, attempts: 0 }) + ) + : redis.lpush(`${prefix}${name}`, JSON.stringify({ id, element, attempts: 0 })), catch: (cause) => new PersistedQueue.PersistedQueueError({ message: "Failed to offer element to persisted queue", diff --git a/packages/experimental/test/PersistedQueue.test.ts b/packages/experimental/test/PersistedQueue.test.ts index 512a290b608..0fd65fcd77a 100644 --- a/packages/experimental/test/PersistedQueue.test.ts +++ b/packages/experimental/test/PersistedQueue.test.ts @@ -15,7 +15,7 @@ const layer = PersistedQueue.layer.pipe( Layer.provide(RedisContainer.layer) ) -it.layer(layer, { timeout: "30 seconds" })("SqlPersistedQueue", (it) => { +it.layer(layer, { timeout: "30 seconds" })("PersistedQueue", (it) => { it.effect("offer + take", () => Effect.gen(function*() { const queue = yield* PersistedQueue.make({ @@ -81,6 +81,30 @@ it.layer(layer, { timeout: "30 seconds" })("SqlPersistedQueue", (it) => { }) assert.strictEqual(value.n, 42n) })) + + it.effect("idempotent offer", () => + Effect.gen(function*() { + const queue = yield* PersistedQueue.make({ + name: "idempotent-offer", + schema: Item + }) + + yield* queue.offer({ n: 42n }, { id: "custom-id" }) + yield* queue.offer({ n: 42n }, { id: "custom-id" }) + yield* queue.take(Effect.fnUntraced(function*(value) { + assert.strictEqual(value.n, 42n) + })) + const fiber = yield* queue.take(Effect.fnUntraced(function*(value) { + assert.strictEqual(value.n, 42n) + })).pipe(Effect.fork) + + yield* TestClock.adjust(1000) + yield* Effect.sleep(1000).pipe( + TestServices.provideLive + ) + + assert.isNull(fiber.unsafePoll()) + })) }) const Item = Schema.Struct({ diff --git a/packages/sql/src/SqlPersistedQueue.ts b/packages/sql/src/SqlPersistedQueue.ts index b5895485e26..b3cea756cd0 100644 --- a/packages/sql/src/SqlPersistedQueue.ts +++ b/packages/sql/src/SqlPersistedQueue.ts @@ -79,7 +79,7 @@ export const make: ( pg: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( sequence SERIAL PRIMARY KEY, - id UUID NOT NULL, + id VARCHAR(36) NOT NULL, queue_name VARCHAR(100) NOT NULL, element TEXT NOT NULL, completed BOOLEAN NOT NULL, @@ -94,7 +94,7 @@ export const make: ( sql`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name=${tableNameSql} AND xtype='U') CREATE TABLE ${tableNameSql} ( sequence INT IDENTITY(1,1) PRIMARY KEY, - id UNIQUEIDENTIFIER NOT NULL, + id NVARCHAR(36) NOT NULL, queue_name NVARCHAR(100) NOT NULL, element NVARCHAR(MAX) NOT NULL, completed BIT NOT NULL, @@ -122,6 +122,14 @@ export const make: ( )` }) + yield* sql.onDialectOrElse({ + mssql: () => + sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_id') + CREATE UNIQUE INDEX idx_${tableNameSql}_id ON ${tableNameSql} (id)`, + mysql: () => sql`CREATE UNIQUE INDEX ${sql(`idx_${tableName}_id`)} ON ${tableNameSql} (id)`.pipe(Effect.ignore), + orElse: () => sql`CREATE UNIQUE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_id`)} ON ${tableNameSql} (id)` + }) + yield* sql.onDialectOrElse({ mssql: () => sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_take') @@ -149,6 +157,34 @@ export const make: ( sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (sequence, acquired_by)` }) + const offer = sql.onDialectOrElse({ + pg: () => (id: string, name: string, element: string) => + sql` + INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) + VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow}) + ON CONFLICT (id) DO NOTHING + `, + mysql: () => (id: string, name: string, element: string) => + sql` + INSERT IGNORE INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) + VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow}) + `, + mssql: () => (id: string, name: string, element: string) => + sql` + IF NOT EXISTS (SELECT 1 FROM ${tableNameSql} WHERE id = ${id}) + BEGIN + INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) + VALUES (${id}, ${name}, ${element}, 0, 0, ${sqlNow}, ${sqlNow}) + END + `, + // sqlite + orElse: () => (id: string, name: string, element: string) => + sql` + INSERT OR IGNORE INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) + VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow}) + ` + }) + const wrapString = sql.onDialectOrElse({ mssql: () => (s: string) => `N'${s}'`, orElse: () => (s: string) => `'${s}'` @@ -159,10 +195,6 @@ export const make: ( sqlite: () => sql.literal("1"), orElse: () => sql.literal("TRUE") }) - const sqlFalse = sql.onDialectOrElse({ - sqlite: () => sql.literal("0"), - orElse: () => sql.literal("FALSE") - }) const workerIdSql = stringLiteral(workerId) const elementIds = new Set() @@ -361,22 +393,14 @@ export const make: ( }) return PersistedQueue.PersistedQueueStore.of({ - offer: (name, id, element) => - Effect.suspend(() => - sql` - INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) - VALUES (${id}, ${name}, ${JSON.stringify(element)}, ${sqlFalse}, 0, ${sqlNow}, ${sqlNow}) - ` - ).pipe( - Effect.catchAllCause((cause) => - Effect.fail( - new PersistedQueue.PersistedQueueError({ - message: "Failed to offer element to persisted queue", - cause: Cause.squash(cause) - }) - ) - ) - ), + offer: ({ element, id, name }) => + Effect.catchAllCause(Effect.suspend(() => offer(id, name, JSON.stringify(element))), (cause) => + Effect.fail( + new PersistedQueue.PersistedQueueError({ + message: "Failed to offer element to persisted queue", + cause: Cause.squash(cause) + }) + )), take: ({ maxAttempts, name }) => Effect.uninterruptibleMask((restore) => RcMap.get(mailboxes, new QueueKey({ name, maxAttempts })).pipe( diff --git a/packages/sql/test/SqlPersistedQueueTest.ts b/packages/sql/test/SqlPersistedQueueTest.ts index 823efceef3f..94cdc9cd862 100644 --- a/packages/sql/test/SqlPersistedQueueTest.ts +++ b/packages/sql/test/SqlPersistedQueueTest.ts @@ -76,6 +76,30 @@ export const suite = (client: Layer.Layer) => { }) assert.strictEqual(value.n, 42n) })) + + it.effect("idempotent offer", () => + Effect.gen(function*() { + const queue = yield* PersistedQueue.make({ + name: "idempotent-offer", + schema: Item + }) + + yield* queue.offer({ n: 42n }, { id: "custom-id" }) + yield* queue.offer({ n: 42n }, { id: "custom-id" }) + yield* queue.take(Effect.fnUntraced(function*(value) { + assert.strictEqual(value.n, 42n) + })) + const fiber = yield* queue.take(Effect.fnUntraced(function*(value) { + assert.strictEqual(value.n, 42n) + })).pipe(Effect.fork) + + yield* TestClock.adjust(1000) + yield* Effect.sleep(1000).pipe( + TestServices.provideLive + ) + + assert.isNull(fiber.unsafePoll()) + })) }) } diff --git a/packages/workflow/src/Activity.ts b/packages/workflow/src/Activity.ts index f93820582f4..60146d4d7d0 100644 --- a/packages/workflow/src/Activity.ts +++ b/packages/workflow/src/Activity.ts @@ -178,16 +178,24 @@ export class CurrentAttempt extends Context.Reference()("@effect /** * @since 1.0.0 - * @category Execution ID + * @category Idempotency */ -export const executionIdWithAttempt: Effect.Effect< - string, - never, - WorkflowInstance -> = Effect.gen(function*() { +export const idempotencyKey: ( + name: string, + options?: { + readonly includeAttempt?: boolean | undefined + } | undefined +) => Effect.Effect = Effect.fnUntraced(function*(name: string, options?: { + readonly includeAttempt?: boolean | undefined +}) { const instance = yield* InstanceTag - const attempt = yield* CurrentAttempt - return yield* makeHashDigest(`${instance.executionId}-${attempt}`) + let key = `${instance.executionId}` + if (options?.includeAttempt) { + const attempt = yield* CurrentAttempt + key += `-${attempt}` + } + key += `-${name}` + return yield* makeHashDigest(key) }) /** diff --git a/packages/workflow/src/DurableQueue.ts b/packages/workflow/src/DurableQueue.ts index d02f91f6150..f7a76e951bd 100644 --- a/packages/workflow/src/DurableQueue.ts +++ b/packages/workflow/src/DurableQueue.ts @@ -9,7 +9,6 @@ import * as Schedule from "effect/Schedule" import * as Schema from "effect/Schema" import * as Activity from "./Activity.js" import * as DurableDeferred from "./DurableDeferred.js" -import { makeHashDigest } from "./internal/crypto.js" import type * as WorkflowEngine from "./WorkflowEngine.js" /** @@ -179,42 +178,41 @@ export const process: < >(self: DurableQueue, payload: Payload["Type"], options?: { readonly retrySchedule?: Schedule.Schedule | undefined }) { - const key = yield* makeHashDigest(self.idempotencyKey(payload)) + const queueName = `DurableQueue/${self.name}` + const queue = yield* PersistedQueue.make({ + name: queueName, + schema: getQueueSchema(self.payloadSchema) + }) + const id = yield* Activity.idempotencyKey(`${queueName}/${self.idempotencyKey(payload)}`) - const deferred = DurableDeferred.make(`${self.deferred.name}/${key}`, { + const deferred = DurableDeferred.make(`${self.deferred.name}/${id}`, { success: self.deferred.successSchema, error: self.deferred.errorSchema }) + const token = yield* DurableDeferred.token(deferred) - yield* Activity.make({ - name: `DurableQueue/${self.name}/${key}`, - execute: Effect.gen(function*() { - const span = yield* Effect.orDie(Effect.currentSpan) - const queue = yield* PersistedQueue.make({ - name: `DurableQueue/${self.name}`, - schema: getQueueSchema(self.payloadSchema) + yield* Effect.useSpan(`DurableQueue/${self.name}/process`, { + captureStackTrace: false, + attributes: { id } + }, (span) => + queue.offer({ + token, + payload, + traceId: span.traceId, + spanId: span.spanId, + sampled: span.sampled + } as any, { id }).pipe( + Effect.tapErrorCause(Effect.logWarning), + Effect.catchTag("ParseError", Effect.die), + Effect.retry(options?.retrySchedule ?? defaultRetrySchedule), + Effect.orDie, + Effect.annotateLogs({ + package: "@effect/workflow", + module: "DurableQueue", + fiber: "process", + queueName: self.name }) - const token = yield* DurableDeferred.token(deferred) - yield* queue.offer({ - token, - payload, - traceId: span.traceId, - spanId: span.spanId, - sampled: span.sampled - } as any).pipe( - Effect.tapErrorCause(Effect.logWarning), - Effect.catchTag("ParseError", Effect.die), - Effect.retry(options?.retrySchedule ?? defaultRetrySchedule), - Effect.orDie, - Effect.annotateLogs({ - package: "@effect/workflow", - module: "DurableQueue", - fiber: "process", - queueName: self.name - }) - ) - }) - }) + )) return yield* DurableDeferred.await(deferred) })