Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/cute-vans-read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect/experimental": patch
"@effect/sql": patch
---

support idempotent offers to PersistedQueue
5 changes: 5 additions & 0 deletions .changeset/full-breads-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/workflow": minor
---

add Activity.idempotencyKey
41 changes: 32 additions & 9 deletions packages/experimental/src/PersistedQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ export interface PersistedQueue<in out A, out R = never> {

/**
* Adds an element to the queue. Returns the id of the enqueued element.
*
* If an element with the same id already exists in the queue, it will not be
* added again.
*/
readonly offer: (value: A) => Effect.Effect<string, PersistedQueueError | ParseResult.ParseError, R>
readonly offer: (value: A, options?: {
readonly id: string | undefined
}) => Effect.Effect<string, PersistedQueueError | ParseResult.ParseError, R>

/**
* Takes an element from the queue.
* If the queue is empty, it will wait until an element is available.
*
* If the returned effect succeeds, the element is marked as processed,
* otherwise it will be retried according to the provided options.
* ake
*
* By default, max attempts is set to 10.
*/
readonly take: <XA, XE, XR>(
Expand Down Expand Up @@ -98,12 +103,20 @@ export const makeFactory = Effect.gen(function*() {

return Effect.succeed<PersistedQueue<A, R>>({
[TypeId]: TypeId,
offer: (value) =>
offer: (value, opts) =>
Effect.flatMap(
encodeUnknown(value),
(element) => {
const id = crypto.randomUUID()
return Effect.as(store.offer(options.name, id, element), id)
const id = opts?.id ?? crypto.randomUUID()
return Effect.as(
store.offer({
name: options.name,
id,
element,
isCustomId: opts?.id !== undefined
}),
id
)
}
),
take: (f, opts) =>
Expand Down Expand Up @@ -174,7 +187,14 @@ export class PersistedQueueError extends Schema.TaggedError<PersistedQueueError>
export class PersistedQueueStore extends Context.Tag("@effect/experimental/PersistedQueue/PersistedQueueStore")<
PersistedQueueStore,
{
readonly offer: (name: string, id: string, element: unknown) => Effect.Effect<void, PersistedQueueError>
readonly offer: (
options: {
readonly name: string
readonly id: string
readonly element: unknown
readonly isCustomId: boolean
}
) => Effect.Effect<void, PersistedQueueError>

readonly take: (options: {
readonly name: string
Expand Down Expand Up @@ -203,6 +223,7 @@ export const layerStoreMemory: Layer.Layer<
attempts: number
readonly element: unknown
}
const ids = new Set<string>()
const queues = new Map<string, {
latch: Effect.Latch
items: Set<Entry>
Expand All @@ -220,10 +241,12 @@ export const layerStoreMemory: Layer.Layer<
}

return PersistedQueueStore.of({
offer: (name, id, element) =>
offer: (options) =>
Effect.sync(() => {
const queue = getOrCreateQueue(name)
queue.items.add({ id, attempts: 0, element })
if (ids.has(options.id)) return
ids.add(options.id)
const queue = getOrCreateQueue(options.name)
queue.items.add({ id: options.id, attempts: 0, element: options.element })
queue.latch.unsafeOpen()
}),
take: Effect.fnUntraced(function*(options) {
Expand Down
34 changes: 32 additions & 2 deletions packages/experimental/src/PersistedQueue/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import { Redis } from "ioredis"
import * as PersistedQueue from "../PersistedQueue.js"

interface RedisWithQueue extends Redis {
offer(
keyQueue: string,
keyIds: string,
id: string,
payload: string
): Promise<void>
resetQueue(
keyQueue: string,
keyPending: string,
Expand Down Expand Up @@ -70,6 +76,22 @@ export const make = Effect.fnUntraced(function*(
(redis) => Effect.promise(() => redis.quit())
)

redis.defineCommand("offer", {
lua: `
local key_queue = KEYS[1]
local key_ids = KEYS[2]
local id = ARGV[1]
local payload = ARGV[2]

local result = redis.call("SADD", key_ids, id)
if result == 1 then
redis.call("RPUSH", key_queue, payload)
end
`,
numberOfKeys: 2,
readOnly: false
})

redis.defineCommand("resetQueue", {
lua: `
local key_queue = KEYS[1]
Expand Down Expand Up @@ -273,9 +295,17 @@ return payloads
)

return PersistedQueue.PersistedQueueStore.of({
offer: (name, id, element) =>
offer: ({ element, id, isCustomId, name }) =>
Effect.tryPromise({
try: () => redis.lpush(`${prefix}${name}`, JSON.stringify({ id, element, attempts: 0 })),
try: (): Promise<any> =>
isCustomId
? redis.offer(
`${prefix}${name}`,
`${prefix}${name}:ids`,
id,
JSON.stringify({ id, element, attempts: 0 })
)
: redis.lpush(`${prefix}${name}`, JSON.stringify({ id, element, attempts: 0 })),
catch: (cause) =>
new PersistedQueue.PersistedQueueError({
message: "Failed to offer element to persisted queue",
Expand Down
26 changes: 25 additions & 1 deletion packages/experimental/test/PersistedQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const layer = PersistedQueue.layer.pipe(
Layer.provide(RedisContainer.layer)
)

it.layer(layer, { timeout: "30 seconds" })("SqlPersistedQueue", (it) => {
it.layer(layer, { timeout: "30 seconds" })("PersistedQueue", (it) => {
it.effect("offer + take", () =>
Effect.gen(function*() {
const queue = yield* PersistedQueue.make({
Expand Down Expand Up @@ -81,6 +81,30 @@ it.layer(layer, { timeout: "30 seconds" })("SqlPersistedQueue", (it) => {
})
assert.strictEqual(value.n, 42n)
}))

it.effect("idempotent offer", () =>
Effect.gen(function*() {
const queue = yield* PersistedQueue.make({
name: "idempotent-offer",
schema: Item
})

yield* queue.offer({ n: 42n }, { id: "custom-id" })
yield* queue.offer({ n: 42n }, { id: "custom-id" })
yield* queue.take(Effect.fnUntraced(function*(value) {
assert.strictEqual(value.n, 42n)
}))
const fiber = yield* queue.take(Effect.fnUntraced(function*(value) {
assert.strictEqual(value.n, 42n)
})).pipe(Effect.fork)

yield* TestClock.adjust(1000)
yield* Effect.sleep(1000).pipe(
TestServices.provideLive
)

assert.isNull(fiber.unsafePoll())
}))
})

const Item = Schema.Struct({
Expand Down
68 changes: 46 additions & 22 deletions packages/sql/src/SqlPersistedQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export const make: (
pg: () =>
sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} (
sequence SERIAL PRIMARY KEY,
id UUID NOT NULL,
id VARCHAR(36) NOT NULL,
queue_name VARCHAR(100) NOT NULL,
element TEXT NOT NULL,
completed BOOLEAN NOT NULL,
Expand All @@ -94,7 +94,7 @@ export const make: (
sql`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name=${tableNameSql} AND xtype='U')
CREATE TABLE ${tableNameSql} (
sequence INT IDENTITY(1,1) PRIMARY KEY,
id UNIQUEIDENTIFIER NOT NULL,
id NVARCHAR(36) NOT NULL,
queue_name NVARCHAR(100) NOT NULL,
element NVARCHAR(MAX) NOT NULL,
completed BIT NOT NULL,
Expand Down Expand Up @@ -122,6 +122,14 @@ export const make: (
)`
})

yield* sql.onDialectOrElse({
mssql: () =>
sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_id')
CREATE UNIQUE INDEX idx_${tableNameSql}_id ON ${tableNameSql} (id)`,
mysql: () => sql`CREATE UNIQUE INDEX ${sql(`idx_${tableName}_id`)} ON ${tableNameSql} (id)`.pipe(Effect.ignore),
orElse: () => sql`CREATE UNIQUE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_id`)} ON ${tableNameSql} (id)`
})

yield* sql.onDialectOrElse({
mssql: () =>
sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_take')
Expand Down Expand Up @@ -149,6 +157,34 @@ export const make: (
sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (sequence, acquired_by)`
})

const offer = sql.onDialectOrElse({
pg: () => (id: string, name: string, element: string) =>
sql`
INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow})
ON CONFLICT (id) DO NOTHING
`,
mysql: () => (id: string, name: string, element: string) =>
sql`
INSERT IGNORE INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow})
`,
mssql: () => (id: string, name: string, element: string) =>
sql`
IF NOT EXISTS (SELECT 1 FROM ${tableNameSql} WHERE id = ${id})
BEGIN
INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
VALUES (${id}, ${name}, ${element}, 0, 0, ${sqlNow}, ${sqlNow})
END
`,
// sqlite
orElse: () => (id: string, name: string, element: string) =>
sql`
INSERT OR IGNORE INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
VALUES (${id}, ${name}, ${element}, FALSE, 0, ${sqlNow}, ${sqlNow})
`
})

const wrapString = sql.onDialectOrElse({
mssql: () => (s: string) => `N'${s}'`,
orElse: () => (s: string) => `'${s}'`
Expand All @@ -159,10 +195,6 @@ export const make: (
sqlite: () => sql.literal("1"),
orElse: () => sql.literal("TRUE")
})
const sqlFalse = sql.onDialectOrElse({
sqlite: () => sql.literal("0"),
orElse: () => sql.literal("FALSE")
})

const workerIdSql = stringLiteral(workerId)
const elementIds = new Set<number>()
Expand Down Expand Up @@ -361,22 +393,14 @@ export const make: (
})

return PersistedQueue.PersistedQueueStore.of({
offer: (name, id, element) =>
Effect.suspend(() =>
sql`
INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at)
VALUES (${id}, ${name}, ${JSON.stringify(element)}, ${sqlFalse}, 0, ${sqlNow}, ${sqlNow})
`
).pipe(
Effect.catchAllCause((cause) =>
Effect.fail(
new PersistedQueue.PersistedQueueError({
message: "Failed to offer element to persisted queue",
cause: Cause.squash(cause)
})
)
)
),
offer: ({ element, id, name }) =>
Effect.catchAllCause(Effect.suspend(() => offer(id, name, JSON.stringify(element))), (cause) =>
Effect.fail(
new PersistedQueue.PersistedQueueError({
message: "Failed to offer element to persisted queue",
cause: Cause.squash(cause)
})
)),
take: ({ maxAttempts, name }) =>
Effect.uninterruptibleMask((restore) =>
RcMap.get(mailboxes, new QueueKey({ name, maxAttempts })).pipe(
Expand Down
24 changes: 24 additions & 0 deletions packages/sql/test/SqlPersistedQueueTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ export const suite = <E>(client: Layer.Layer<SqlClient.SqlClient, E>) => {
})
assert.strictEqual(value.n, 42n)
}))

it.effect("idempotent offer", () =>
Effect.gen(function*() {
const queue = yield* PersistedQueue.make({
name: "idempotent-offer",
schema: Item
})

yield* queue.offer({ n: 42n }, { id: "custom-id" })
yield* queue.offer({ n: 42n }, { id: "custom-id" })
yield* queue.take(Effect.fnUntraced(function*(value) {
assert.strictEqual(value.n, 42n)
}))
const fiber = yield* queue.take(Effect.fnUntraced(function*(value) {
assert.strictEqual(value.n, 42n)
})).pipe(Effect.fork)

yield* TestClock.adjust(1000)
yield* Effect.sleep(1000).pipe(
TestServices.provideLive
)

assert.isNull(fiber.unsafePoll())
}))
})
}

Expand Down
24 changes: 16 additions & 8 deletions packages/workflow/src/Activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,24 @@ export class CurrentAttempt extends Context.Reference<CurrentAttempt>()("@effect

/**
* @since 1.0.0
* @category Execution ID
* @category Idempotency
*/
export const executionIdWithAttempt: Effect.Effect<
string,
never,
WorkflowInstance
> = Effect.gen(function*() {
export const idempotencyKey: (
name: string,
options?: {
readonly includeAttempt?: boolean | undefined
} | undefined
) => Effect.Effect<string, never, WorkflowInstance> = Effect.fnUntraced(function*(name: string, options?: {
readonly includeAttempt?: boolean | undefined
}) {
const instance = yield* InstanceTag
const attempt = yield* CurrentAttempt
return yield* makeHashDigest(`${instance.executionId}-${attempt}`)
let key = `${instance.executionId}`
if (options?.includeAttempt) {
const attempt = yield* CurrentAttempt
key += `-${attempt}`
}
key += `-${name}`
return yield* makeHashDigest(key)
})

/**
Expand Down
Loading