From 1ba00b149aa89480574e888b43e12fb4615d437c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Feb 2025 16:24:05 +0000 Subject: [PATCH 1/6] Added isPrismaRetriableError() --- internal-packages/database/src/transaction.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/internal-packages/database/src/transaction.ts b/internal-packages/database/src/transaction.ts index e4cc2aa4df..5751023ee0 100644 --- a/internal-packages/database/src/transaction.ts +++ b/internal-packages/database/src/transaction.ts @@ -13,12 +13,20 @@ function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is Prism return !("$transaction" in prisma); } -function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError { +export function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError { return ( typeof error === "object" && error !== null && "code" in error && typeof error.code === "string" ); } +export function isPrismaRetriableError(error: unknown): boolean { + if (!isPrismaKnownError(error)) { + return false; + } + + return retryCodes.includes(error.code); +} + export type PrismaTransactionOptions = { /** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */ maxWait?: number; @@ -39,6 +47,9 @@ export type PrismaTransactionOptions = { maxRetries?: number; }; +//retry these Prisma error codes +const retryCodes = ["P2034", "P2028"]; + export async function $transaction( prisma: PrismaClientOrTransaction, fn: (prisma: PrismaTransactionClient) => Promise, @@ -55,7 +66,7 @@ export async function $transaction( } catch (error) { if (isPrismaKnownError(error)) { if ( - error.code === "P2034" && + retryCodes.includes(error.code) && typeof options?.maxRetries === "number" && attempt < options.maxRetries ) { From dea761f7c086e4d2a2121996b511036ece766fab Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Feb 2025 16:24:49 +0000 Subject: [PATCH 2/6] Retry completeBatchTaskRunItem if they fail because of a retriable Prisma error --- .../app/v3/legacyRunEngineWorker.server.ts | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index 52a978fffc..a5ca5c8483 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -5,6 +5,8 @@ import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; +import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server"; +import { prisma } from "~/db.server"; function initializeWorker() { const redisOptions = { @@ -34,6 +36,19 @@ function initializeWorker() { maxAttempts: 3, }, }, + completeBatchTaskRunItem: { + schema: z.object({ + itemId: z.string(), + batchTaskRunId: z.string(), + scheduleResumeOnComplete: z.boolean(), + taskRunAttemptId: z.string().optional(), + attempt: z.number().optional(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 10, + }, + }, }, concurrency: { workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS, @@ -49,6 +64,16 @@ function initializeWorker() { await service.call(payload.runId); }, + completeBatchTaskRunItem: async ({ payload, attempt }) => { + await completeBatchTaskRunItemV3( + payload.itemId, + payload.batchTaskRunId, + prisma, + payload.scheduleResumeOnComplete, + payload.taskRunAttemptId, + attempt + ); + }, }, }); From 6c5b50229b5455da2aba1114aa133932dd14e2b0 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Feb 2025 16:27:33 +0000 Subject: [PATCH 3/6] Retry using Redis worker --- .../app/v3/services/batchTriggerV3.server.ts | 157 ++++++++++++------ 1 file changed, 105 insertions(+), 52 deletions(-) diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 9bcb9eab56..c5ef801613 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -7,6 +7,7 @@ import { } from "@trigger.dev/core/v3"; import { BatchTaskRun, + isPrismaRetriableError, isUniqueConstraintError, Prisma, TaskRunAttempt, @@ -20,6 +21,7 @@ import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; import { workerQueue } from "~/services/worker.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; +import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; import { marqs } from "../marqs/index.server"; import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server"; @@ -923,71 +925,122 @@ export async function completeBatchTaskRunItemV3( batchTaskRunId: string, tx: PrismaClientOrTransaction, scheduleResumeOnComplete = false, - taskRunAttemptId?: string + taskRunAttemptId?: string, + retryAttempt?: number ) { - await $transaction( - tx, - "completeBatchTaskRunItemV3", - async (tx, span) => { - span?.setAttribute("batch_id", batchTaskRunId); - - // Update the item to complete - const updated = await tx.batchTaskRunItem.updateMany({ - where: { - id: itemId, - status: "PENDING", - }, - data: { - status: "COMPLETED", - taskRunAttemptId, - }, - }); + const isRetry = retryAttempt !== undefined; + + if (isRetry) { + logger.debug("completeBatchTaskRunItemV3 retrying", { + itemId, + batchTaskRunId, + scheduleResumeOnComplete, + taskRunAttemptId, + retryAttempt, + }); + } - if (updated.count === 0) { - return; - } + try { + await $transaction( + tx, + "completeBatchTaskRunItemV3", + async (tx, span) => { + span?.setAttribute("batch_id", batchTaskRunId); - const updatedBatchRun = await tx.batchTaskRun.update({ - where: { - id: batchTaskRunId, - }, - data: { - completedCount: { - increment: 1, + // Update the item to complete + const updated = await tx.batchTaskRunItem.updateMany({ + where: { + id: itemId, + status: "PENDING", }, - }, - select: { - sealed: true, - status: true, - completedCount: true, - expectedCount: true, - dependentTaskAttemptId: true, - }, - }); + data: { + status: "COMPLETED", + taskRunAttemptId, + }, + }); - if ( - updatedBatchRun.status === "PENDING" && - updatedBatchRun.completedCount === updatedBatchRun.expectedCount && - updatedBatchRun.sealed - ) { - await tx.batchTaskRun.update({ + if (updated.count === 0) { + return; + } + + const updatedBatchRun = await tx.batchTaskRun.update({ where: { id: batchTaskRunId, }, data: { - status: "COMPLETED", - completedAt: new Date(), + completedCount: { + increment: 1, + }, + }, + select: { + sealed: true, + status: true, + completedCount: true, + expectedCount: true, + dependentTaskAttemptId: true, }, }); - // We only need to resume the batch if it has a dependent task attempt ID - if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { - await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + if ( + updatedBatchRun.status === "PENDING" && + updatedBatchRun.completedCount === updatedBatchRun.expectedCount && + updatedBatchRun.sealed + ) { + await tx.batchTaskRun.update({ + where: { + id: batchTaskRunId, + }, + data: { + status: "COMPLETED", + completedAt: new Date(), + }, + }); + + // We only need to resume the batch if it has a dependent task attempt ID + if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { + await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + } } + }, + { + timeout: 10000, } - }, - { - timeout: 10000, + ); + } catch (error) { + if (isPrismaRetriableError(error)) { + logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", { + itemId, + batchTaskRunId, + error, + retryAttempt, + isRetry, + }); + + if (isRetry) { + //throwing this error will cause the Redis worker to retry the job + throw error; + } else { + //schedule a retry + await legacyRunEngineWorker.enqueue({ + id: `completeBatchTaskRunItem:${itemId}`, + job: "completeBatchTaskRunItem", + payload: { + itemId, + batchTaskRunId, + scheduleResumeOnComplete, + taskRunAttemptId, + }, + availableAt: new Date(Date.now() + 2_000), + }); + } + } else { + logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", { + itemId, + batchTaskRunId, + error, + retryAttempt, + isRetry, + }); } - ); + } } From 2b79d623d080f4ddd5fb8bd82e20114af689621d Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Feb 2025 16:42:37 +0000 Subject: [PATCH 4/6] Handle more retriable errors. Add special condition in for race condition error --- .../app/v3/services/batchTriggerV3.server.ts | 6 ++++-- internal-packages/database/src/transaction.ts | 21 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index c5ef801613..83bc67b22b 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -7,6 +7,7 @@ import { } from "@trigger.dev/core/v3"; import { BatchTaskRun, + isPrismaRaceConditionError, isPrismaRetriableError, isUniqueConstraintError, Prisma, @@ -1003,11 +1004,12 @@ export async function completeBatchTaskRunItemV3( } }, { - timeout: 10000, + timeout: 10_000, + maxWait: 4_000, } ); } catch (error) { - if (isPrismaRetriableError(error)) { + if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) { logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", { itemId, batchTaskRunId, diff --git a/internal-packages/database/src/transaction.ts b/internal-packages/database/src/transaction.ts index 5751023ee0..beb9e26751 100644 --- a/internal-packages/database/src/transaction.ts +++ b/internal-packages/database/src/transaction.ts @@ -19,6 +19,13 @@ export function isPrismaKnownError(error: unknown): error is Prisma.PrismaClient ); } +/* +• P2024: Connection timeout errors +• P2028: Transaction timeout errors +• P2034: Transaction deadlock/conflict errors +*/ +const retryCodes = ["P2024", "P2028", "P2034"]; + export function isPrismaRetriableError(error: unknown): boolean { if (!isPrismaKnownError(error)) { return false; @@ -27,6 +34,17 @@ export function isPrismaRetriableError(error: unknown): boolean { return retryCodes.includes(error.code); } +/* +• P2025: Record not found errors (in race conditions) [not included for now] +*/ +export function isPrismaRaceConditionError(error: unknown): boolean { + if (!isPrismaKnownError(error)) { + return false; + } + + return error.code === "P2025"; +} + export type PrismaTransactionOptions = { /** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */ maxWait?: number; @@ -47,9 +65,6 @@ export type PrismaTransactionOptions = { maxRetries?: number; }; -//retry these Prisma error codes -const retryCodes = ["P2034", "P2028"]; - export async function $transaction( prisma: PrismaClientOrTransaction, fn: (prisma: PrismaTransactionClient) => Promise, From 0055c051f1001f55521e9d4f47da587228203558 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Feb 2025 17:12:51 +0000 Subject: [PATCH 5/6] Added Postgres connection_timeout with default 20s --- apps/webapp/app/db.server.ts | 2 ++ apps/webapp/app/env.server.ts | 1 + 2 files changed, 3 insertions(+) diff --git a/apps/webapp/app/db.server.ts b/apps/webapp/app/db.server.ts index 445d7fbf49..969f3f0276 100644 --- a/apps/webapp/app/db.server.ts +++ b/apps/webapp/app/db.server.ts @@ -111,6 +111,7 @@ function getClient() { const databaseUrl = extendQueryParams(DATABASE_URL, { connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(), pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(), + connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(), }); console.log(`🔌 setting up prisma client to ${redactUrlSecrets(databaseUrl)}`); @@ -162,6 +163,7 @@ function getReplicaClient() { const replicaUrl = extendQueryParams(env.DATABASE_READ_REPLICA_URL, { connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(), pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(), + connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(), }); console.log(`🔌 setting up read replica connection to ${redactUrlSecrets(replicaUrl)}`); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c41b4ea19f..4facbfffb2 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -13,6 +13,7 @@ const EnvironmentSchema = z.object({ ), DATABASE_CONNECTION_LIMIT: z.coerce.number().int().default(10), DATABASE_POOL_TIMEOUT: z.coerce.number().int().default(60), + DATABASE_CONNECTION_TIMEOUT: z.coerce.number().int().default(20), DIRECT_URL: z .string() .refine( From b96ddfdd2ecdab31be6f1074972e06aaf20d687d Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 6 Feb 2025 18:09:11 +0000 Subject: [PATCH 6/6] Added a simple batchTriggerAndWait example --- references/hello-world/src/trigger/example.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 50d76069b0..3a19fd0946 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -68,3 +68,21 @@ export const maxDurationParentTask = task({ return result; }, }); + +export const batchTask = task({ + id: "batch", + run: async (payload: { count: number }, { ctx }) => { + logger.info("Starting batch task", { count: payload.count }); + + const items = Array.from({ length: payload.count }, (_, i) => ({ + payload: { message: `Batch item ${i + 1}` }, + })); + + const results = await childTask.batchTriggerAndWait(items); + + return { + batchCount: payload.count, + results, + }; + }, +});