Skip to content

Commit 811852a

Browse files
authored
support idempotent offers to PersistedQueue (#5837)
1 parent d719788 commit 811852a

File tree

9 files changed

+215
-73
lines changed

9 files changed

+215
-73
lines changed

.changeset/cute-vans-read.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@effect/experimental": patch
3+
"@effect/sql": patch
4+
---
5+
6+
support idempotent offers to PersistedQueue

.changeset/full-breads-rule.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/workflow": minor
3+
---
4+
5+
add Activity.idempotencyKey

packages/experimental/src/PersistedQueue.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,21 @@ export interface PersistedQueue<in out A, out R = never> {
3131

3232
/**
3333
* Adds an element to the queue. Returns the id of the enqueued element.
34+
*
35+
* If an element with the same id already exists in the queue, it will not be
36+
* added again.
3437
*/
35-
readonly offer: (value: A) => Effect.Effect<string, PersistedQueueError | ParseResult.ParseError, R>
38+
readonly offer: (value: A, options?: {
39+
readonly id: string | undefined
40+
}) => Effect.Effect<string, PersistedQueueError | ParseResult.ParseError, R>
3641

3742
/**
3843
* Takes an element from the queue.
3944
* If the queue is empty, it will wait until an element is available.
4045
*
4146
* If the returned effect succeeds, the element is marked as processed,
4247
* otherwise it will be retried according to the provided options.
43-
* ake
48+
*
4449
* By default, max attempts is set to 10.
4550
*/
4651
readonly take: <XA, XE, XR>(
@@ -98,12 +103,20 @@ export const makeFactory = Effect.gen(function*() {
98103

99104
return Effect.succeed<PersistedQueue<A, R>>({
100105
[TypeId]: TypeId,
101-
offer: (value) =>
106+
offer: (value, opts) =>
102107
Effect.flatMap(
103108
encodeUnknown(value),
104109
(element) => {
105-
const id = crypto.randomUUID()
106-
return Effect.as(store.offer(options.name, id, element), id)
110+
const id = opts?.id ?? crypto.randomUUID()
111+
return Effect.as(
112+
store.offer({
113+
name: options.name,
114+
id,
115+
element,
116+
isCustomId: opts?.id !== undefined
117+
}),
118+
id
119+
)
107120
}
108121
),
109122
take: (f, opts) =>
@@ -174,7 +187,14 @@ export class PersistedQueueError extends Schema.TaggedError<PersistedQueueError>
174187
export class PersistedQueueStore extends Context.Tag("@effect/experimental/PersistedQueue/PersistedQueueStore")<
175188
PersistedQueueStore,
176189
{
177-
readonly offer: (name: string, id: string, element: unknown) => Effect.Effect<void, PersistedQueueError>
190+
readonly offer: (
191+
options: {
192+
readonly name: string
193+
readonly id: string
194+
readonly element: unknown
195+
readonly isCustomId: boolean
196+
}
197+
) => Effect.Effect<void, PersistedQueueError>
178198

179199
readonly take: (options: {
180200
readonly name: string
@@ -203,6 +223,7 @@ export const layerStoreMemory: Layer.Layer<
203223
attempts: number
204224
readonly element: unknown
205225
}
226+
const ids = new Set<string>()
206227
const queues = new Map<string, {
207228
latch: Effect.Latch
208229
items: Set<Entry>
@@ -220,10 +241,12 @@ export const layerStoreMemory: Layer.Layer<
220241
}
221242

222243
return PersistedQueueStore.of({
223-
offer: (name, id, element) =>
244+
offer: (options) =>
224245
Effect.sync(() => {
225-
const queue = getOrCreateQueue(name)
226-
queue.items.add({ id, attempts: 0, element })
246+
if (ids.has(options.id)) return
247+
ids.add(options.id)
248+
const queue = getOrCreateQueue(options.name)
249+
queue.items.add({ id: options.id, attempts: 0, element: options.element })
227250
queue.latch.unsafeOpen()
228251
}),
229252
take: Effect.fnUntraced(function*(options) {

packages/experimental/src/PersistedQueue/Redis.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ import { Redis } from "ioredis"
1717
import * as PersistedQueue from "../PersistedQueue.js"
1818

1919
interface RedisWithQueue extends Redis {
20+
offer(
21+
keyQueue: string,
22+
keyIds: string,
23+
id: string,
24+
payload: string
25+
): Promise<void>
2026
resetQueue(
2127
keyQueue: string,
2228
keyPending: string,
@@ -70,6 +76,22 @@ export const make = Effect.fnUntraced(function*(
7076
(redis) => Effect.promise(() => redis.quit())
7177
)
7278

79+
redis.defineCommand("offer", {
80+
lua: `
81+
local key_queue = KEYS[1]
82+
local key_ids = KEYS[2]
83+
local id = ARGV[1]
84+
local payload = ARGV[2]
85+
86+
local result = redis.call("SADD", key_ids, id)
87+
if result == 1 then
88+
redis.call("RPUSH", key_queue, payload)
89+
end
90+
`,
91+
numberOfKeys: 2,
92+
readOnly: false
93+
})
94+
7395
redis.defineCommand("resetQueue", {
7496
lua: `
7597
local key_queue = KEYS[1]
@@ -273,9 +295,17 @@ return payloads
273295
)
274296

275297
return PersistedQueue.PersistedQueueStore.of({
276-
offer: (name, id, element) =>
298+
offer: ({ element, id, isCustomId, name }) =>
277299
Effect.tryPromise({
278-
try: () => redis.lpush(`${prefix}${name}`, JSON.stringify({ id, element, attempts: 0 })),
300+
try: (): Promise<any> =>
301+
isCustomId
302+
? redis.offer(
303+
`${prefix}${name}`,
304+
`${prefix}${name}:ids`,
305+
id,
306+
JSON.stringify({ id, element, attempts: 0 })
307+
)
308+
: redis.lpush(`${prefix}${name}`, JSON.stringify({ id, element, attempts: 0 })),
279309
catch: (cause) =>
280310
new PersistedQueue.PersistedQueueError({
281311
message: "Failed to offer element to persisted queue",

packages/experimental/test/PersistedQueue.test.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const layer = PersistedQueue.layer.pipe(
1515
Layer.provide(RedisContainer.layer)
1616
)
1717

18-
it.layer(layer, { timeout: "30 seconds" })("SqlPersistedQueue", (it) => {
18+
it.layer(layer, { timeout: "30 seconds" })("PersistedQueue", (it) => {
1919
it.effect("offer + take", () =>
2020
Effect.gen(function*() {
2121
const queue = yield* PersistedQueue.make({
@@ -81,6 +81,30 @@ it.layer(layer, { timeout: "30 seconds" })("SqlPersistedQueue", (it) => {
8181
})
8282
assert.strictEqual(value.n, 42n)
8383
}))
84+
85+
it.effect("idempotent offer", () =>
86+
Effect.gen(function*() {
87+
const queue = yield* PersistedQueue.make({
88+
name: "idempotent-offer",
89+
schema: Item
90+
})
91+
92+
yield* queue.offer({ n: 42n }, { id: "custom-id" })
93+
yield* queue.offer({ n: 42n }, { id: "custom-id" })
94+
yield* queue.take(Effect.fnUntraced(function*(value) {
95+
assert.strictEqual(value.n, 42n)
96+
}))
97+
const fiber = yield* queue.take(Effect.fnUntraced(function*(value) {
98+
assert.strictEqual(value.n, 42n)
99+
})).pipe(Effect.fork)
100+
101+
yield* TestClock.adjust(1000)
102+
yield* Effect.sleep(1000).pipe(
103+
TestServices.provideLive
104+
)
105+
106+
assert.isNull(fiber.unsafePoll())
107+
}))
84108
})
85109

86110
const Item = Schema.Struct({

packages/sql/src/SqlPersistedQueue.ts

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ export const make: (
7979
pg: () =>
8080
sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} (
8181
sequence SERIAL PRIMARY KEY,
82-
id UUID NOT NULL,
82+
id VARCHAR(36) NOT NULL,
8383
queue_name VARCHAR(100) NOT NULL,
8484
element TEXT NOT NULL,
8585
completed BOOLEAN NOT NULL,
@@ -94,7 +94,7 @@ export const make: (
9494
sql`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name=${tableNameSql} AND xtype='U')
9595
CREATE TABLE ${tableNameSql} (
9696
sequence INT IDENTITY(1,1) PRIMARY KEY,
97-
id UNIQUEIDENTIFIER NOT NULL,
97+
id NVARCHAR(36) NOT NULL,
9898
queue_name NVARCHAR(100) NOT NULL,
9999
element NVARCHAR(MAX) NOT NULL,
100100
completed BIT NOT NULL,
@@ -122,6 +122,14 @@ export const make: (
122122
)`
123123
})
124124

125+
yield* sql.onDialectOrElse({
126+
mssql: () =>
127+
sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_id')
128+
CREATE UNIQUE INDEX idx_${tableNameSql}_id ON ${tableNameSql} (id)`,
129+
mysql: () => sql`CREATE UNIQUE INDEX ${sql(`idx_${tableName}_id`)} ON ${tableNameSql} (id)`.pipe(Effect.ignore),
130+
orElse: () => sql`CREATE UNIQUE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_id`)} ON ${tableNameSql} (id)`
131+
})
132+
125133
yield* sql.onDialectOrElse({
126134
mssql: () =>
127135
sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_take')
@@ -149,6 +157,34 @@ export const make: (
149157
sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (sequence, acquired_by)`
150158
})
151159

160+
const offer = sql.onDialectOrElse({
161+
pg: () => (id: string, name: string, element: string) =>
162+
sql`
163+
INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
164+
VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow})
165+
ON CONFLICT (id) DO NOTHING
166+
`,
167+
mysql: () => (id: string, name: string, element: string) =>
168+
sql`
169+
INSERT IGNORE INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
170+
VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow})
171+
`,
172+
mssql: () => (id: string, name: string, element: string) =>
173+
sql`
174+
IF NOT EXISTS (SELECT 1 FROM ${tableNameSql} WHERE id = ${id})
175+
BEGIN
176+
INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
177+
VALUES (${id}, ${name}, ${element}, 0, 0, ${sqlNow}, ${sqlNow})
178+
END
179+
`,
180+
// sqlite
181+
orElse: () => (id: string, name: string, element: string) =>
182+
sql`
183+
INSERT OR IGNORE INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
184+
VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow})
185+
`
186+
})
187+
152188
const wrapString = sql.onDialectOrElse({
153189
mssql: () => (s: string) => `N'${s}'`,
154190
orElse: () => (s: string) => `'${s}'`
@@ -159,10 +195,6 @@ export const make: (
159195
sqlite: () => sql.literal("1"),
160196
orElse: () => sql.literal("TRUE")
161197
})
162-
const sqlFalse = sql.onDialectOrElse({
163-
sqlite: () => sql.literal("0"),
164-
orElse: () => sql.literal("FALSE")
165-
})
166198

167199
const workerIdSql = stringLiteral(workerId)
168200
const elementIds = new Set<number>()
@@ -361,22 +393,14 @@ export const make: (
361393
})
362394

363395
return PersistedQueue.PersistedQueueStore.of({
364-
offer: (name, id, element) =>
365-
Effect.suspend(() =>
366-
sql`
367-
INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
368-
VALUES (${id}, ${name}, ${JSON.stringify(element)}, ${sqlFalse}, 0, ${sqlNow}, ${sqlNow})
369-
`
370-
).pipe(
371-
Effect.catchAllCause((cause) =>
372-
Effect.fail(
373-
new PersistedQueue.PersistedQueueError({
374-
message: "Failed to offer element to persisted queue",
375-
cause: Cause.squash(cause)
376-
})
377-
)
378-
)
379-
),
396+
offer: ({ element, id, name }) =>
397+
Effect.catchAllCause(Effect.suspend(() => offer(id, name, JSON.stringify(element))), (cause) =>
398+
Effect.fail(
399+
new PersistedQueue.PersistedQueueError({
400+
message: "Failed to offer element to persisted queue",
401+
cause: Cause.squash(cause)
402+
})
403+
)),
380404
take: ({ maxAttempts, name }) =>
381405
Effect.uninterruptibleMask((restore) =>
382406
RcMap.get(mailboxes, new QueueKey({ name, maxAttempts })).pipe(

packages/sql/test/SqlPersistedQueueTest.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,30 @@ export const suite = <E>(client: Layer.Layer<SqlClient.SqlClient, E>) => {
7676
})
7777
assert.strictEqual(value.n, 42n)
7878
}))
79+
80+
it.effect("idempotent offer", () =>
81+
Effect.gen(function*() {
82+
const queue = yield* PersistedQueue.make({
83+
name: "idempotent-offer",
84+
schema: Item
85+
})
86+
87+
yield* queue.offer({ n: 42n }, { id: "custom-id" })
88+
yield* queue.offer({ n: 42n }, { id: "custom-id" })
89+
yield* queue.take(Effect.fnUntraced(function*(value) {
90+
assert.strictEqual(value.n, 42n)
91+
}))
92+
const fiber = yield* queue.take(Effect.fnUntraced(function*(value) {
93+
assert.strictEqual(value.n, 42n)
94+
})).pipe(Effect.fork)
95+
96+
yield* TestClock.adjust(1000)
97+
yield* Effect.sleep(1000).pipe(
98+
TestServices.provideLive
99+
)
100+
101+
assert.isNull(fiber.unsafePoll())
102+
}))
79103
})
80104
}
81105

packages/workflow/src/Activity.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,24 @@ export class CurrentAttempt extends Context.Reference<CurrentAttempt>()("@effect
178178

179179
/**
180180
* @since 1.0.0
181-
* @category Execution ID
181+
* @category Idempotency
182182
*/
183-
export const executionIdWithAttempt: Effect.Effect<
184-
string,
185-
never,
186-
WorkflowInstance
187-
> = Effect.gen(function*() {
183+
export const idempotencyKey: (
184+
name: string,
185+
options?: {
186+
readonly includeAttempt?: boolean | undefined
187+
} | undefined
188+
) => Effect.Effect<string, never, WorkflowInstance> = Effect.fnUntraced(function*(name: string, options?: {
189+
readonly includeAttempt?: boolean | undefined
190+
}) {
188191
const instance = yield* InstanceTag
189-
const attempt = yield* CurrentAttempt
190-
return yield* makeHashDigest(`${instance.executionId}-${attempt}`)
192+
let key = `${instance.executionId}`
193+
if (options?.includeAttempt) {
194+
const attempt = yield* CurrentAttempt
195+
key += `-${attempt}`
196+
}
197+
key += `-${name}`
198+
return yield* makeHashDigest(key)
191199
})
192200

193201
/**

0 commit comments

Comments
 (0)