Skip to content

Retry batch item completion #1675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/webapp/app/db.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ function getClient() {
const databaseUrl = extendQueryParams(DATABASE_URL, {
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(),
});

console.log(`🔌 setting up prisma client to ${redactUrlSecrets(databaseUrl)}`);
Expand Down Expand Up @@ -162,6 +163,7 @@ function getReplicaClient() {
const replicaUrl = extendQueryParams(env.DATABASE_READ_REPLICA_URL, {
connection_limit: env.DATABASE_CONNECTION_LIMIT.toString(),
pool_timeout: env.DATABASE_POOL_TIMEOUT.toString(),
connection_timeout: env.DATABASE_CONNECTION_TIMEOUT.toString(),
});

console.log(`🔌 setting up read replica connection to ${redactUrlSecrets(replicaUrl)}`);
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const EnvironmentSchema = z.object({
),
DATABASE_CONNECTION_LIMIT: z.coerce.number().int().default(10),
DATABASE_POOL_TIMEOUT: z.coerce.number().int().default(60),
DATABASE_CONNECTION_TIMEOUT: z.coerce.number().int().default(20),
DIRECT_URL: z
.string()
.refine(
Expand Down
25 changes: 25 additions & 0 deletions apps/webapp/app/v3/legacyRunEngineWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
import { prisma } from "~/db.server";

function initializeWorker() {
const redisOptions = {
Expand Down Expand Up @@ -34,6 +36,19 @@ function initializeWorker() {
maxAttempts: 3,
},
},
completeBatchTaskRunItem: {
schema: z.object({
itemId: z.string(),
batchTaskRunId: z.string(),
scheduleResumeOnComplete: z.boolean(),
taskRunAttemptId: z.string().optional(),
attempt: z.number().optional(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 10,
},
},
},
concurrency: {
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
Expand All @@ -49,6 +64,16 @@ function initializeWorker() {

await service.call(payload.runId);
},
completeBatchTaskRunItem: async ({ payload, attempt }) => {
await completeBatchTaskRunItemV3(
payload.itemId,
payload.batchTaskRunId,
prisma,
payload.scheduleResumeOnComplete,
payload.taskRunAttemptId,
attempt
);
},
},
});

Expand Down
159 changes: 107 additions & 52 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
} from "@trigger.dev/core/v3";
import {
BatchTaskRun,
isPrismaRaceConditionError,
isPrismaRetriableError,
isUniqueConstraintError,
Prisma,
TaskRunAttempt,
Expand All @@ -20,6 +22,7 @@ import { logger } from "~/services/logger.server";
import { getEntitlement } from "~/services/platform.v3.server";
import { workerQueue } from "~/services/worker.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
import { marqs } from "../marqs/index.server";
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server";
Expand Down Expand Up @@ -923,71 +926,123 @@ export async function completeBatchTaskRunItemV3(
batchTaskRunId: string,
tx: PrismaClientOrTransaction,
scheduleResumeOnComplete = false,
taskRunAttemptId?: string
taskRunAttemptId?: string,
retryAttempt?: number
) {
await $transaction(
tx,
"completeBatchTaskRunItemV3",
async (tx, span) => {
span?.setAttribute("batch_id", batchTaskRunId);

// Update the item to complete
const updated = await tx.batchTaskRunItem.updateMany({
where: {
id: itemId,
status: "PENDING",
},
data: {
status: "COMPLETED",
taskRunAttemptId,
},
});
const isRetry = retryAttempt !== undefined;

if (isRetry) {
logger.debug("completeBatchTaskRunItemV3 retrying", {
itemId,
batchTaskRunId,
scheduleResumeOnComplete,
taskRunAttemptId,
retryAttempt,
});
}

if (updated.count === 0) {
return;
}
try {
await $transaction(
tx,
"completeBatchTaskRunItemV3",
async (tx, span) => {
span?.setAttribute("batch_id", batchTaskRunId);

const updatedBatchRun = await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
completedCount: {
increment: 1,
// Update the item to complete
const updated = await tx.batchTaskRunItem.updateMany({
where: {
id: itemId,
status: "PENDING",
},
},
select: {
sealed: true,
status: true,
completedCount: true,
expectedCount: true,
dependentTaskAttemptId: true,
},
});
data: {
status: "COMPLETED",
taskRunAttemptId,
},
});

if (
updatedBatchRun.status === "PENDING" &&
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
updatedBatchRun.sealed
) {
await tx.batchTaskRun.update({
if (updated.count === 0) {
return;
}

const updatedBatchRun = await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
status: "COMPLETED",
completedAt: new Date(),
completedCount: {
increment: 1,
},
},
select: {
sealed: true,
status: true,
completedCount: true,
expectedCount: true,
dependentTaskAttemptId: true,
},
});

// We only need to resume the batch if it has a dependent task attempt ID
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
if (
updatedBatchRun.status === "PENDING" &&
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
updatedBatchRun.sealed
) {
await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});

// We only need to resume the batch if it has a dependent task attempt ID
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
}
}
},
{
timeout: 10_000,
maxWait: 4_000,
}
},
{
timeout: 10000,
);
} catch (error) {
if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) {
logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", {
itemId,
batchTaskRunId,
error,
retryAttempt,
isRetry,
});

if (isRetry) {
//throwing this error will cause the Redis worker to retry the job
throw error;
} else {
//schedule a retry
await legacyRunEngineWorker.enqueue({
id: `completeBatchTaskRunItem:${itemId}`,
job: "completeBatchTaskRunItem",
payload: {
itemId,
batchTaskRunId,
scheduleResumeOnComplete,
taskRunAttemptId,
},
availableAt: new Date(Date.now() + 2_000),
});
}
} else {
logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", {
itemId,
batchTaskRunId,
error,
retryAttempt,
isRetry,
});
}
);
}
}
30 changes: 28 additions & 2 deletions internal-packages/database/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,38 @@ function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is Prism
return !("$transaction" in prisma);
}

function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
export function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
return (
typeof error === "object" && error !== null && "code" in error && typeof error.code === "string"
);
}

/*
• P2024: Connection timeout errors
• P2028: Transaction timeout errors
• P2034: Transaction deadlock/conflict errors
*/
const retryCodes = ["P2024", "P2028", "P2034"];

export function isPrismaRetriableError(error: unknown): boolean {
if (!isPrismaKnownError(error)) {
return false;
}

return retryCodes.includes(error.code);
}

/*
• P2025: Record not found errors (in race conditions) [not included for now]
*/
export function isPrismaRaceConditionError(error: unknown): boolean {
if (!isPrismaKnownError(error)) {
return false;
}

return error.code === "P2025";
}

export type PrismaTransactionOptions = {
/** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */
maxWait?: number;
Expand Down Expand Up @@ -55,7 +81,7 @@ export async function $transaction<R>(
} catch (error) {
if (isPrismaKnownError(error)) {
if (
error.code === "P2034" &&
retryCodes.includes(error.code) &&
typeof options?.maxRetries === "number" &&
attempt < options.maxRetries
) {
Expand Down
18 changes: 18 additions & 0 deletions references/hello-world/src/trigger/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,21 @@ export const maxDurationParentTask = task({
return result;
},
});

export const batchTask = task({
id: "batch",
run: async (payload: { count: number }, { ctx }) => {
logger.info("Starting batch task", { count: payload.count });

const items = Array.from({ length: payload.count }, (_, i) => ({
payload: { message: `Batch item ${i + 1}` },
}));

const results = await childTask.batchTriggerAndWait(items);

return {
batchCount: payload.count,
results,
};
},
});