From b0c60a05707a0c26d009037e30a19713492ba409 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Feb 2025 14:00:03 +0000 Subject: [PATCH] Move run ttl and delays from graphile to redis worker --- apps/webapp/app/v3/commonWorker.server.ts | 36 +++++++++++++++++-- .../services/createTaskRunAttempt.server.ts | 2 +- .../v3/services/enqueueDelayedRun.server.ts | 28 ++++++++++++++- .../v3/services/expireEnqueuedRun.server.ts | 25 +++++++------ .../app/v3/services/finalizeTaskRun.server.ts | 2 +- .../v3/services/rescheduleTaskRun.server.ts | 31 +++++++--------- .../app/v3/services/triggerTask.server.ts | 9 ++--- 7 files changed, 91 insertions(+), 42 deletions(-) diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index 5e43d3c90a..18014bfa55 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -7,6 +7,8 @@ import { singleton } from "~/utils/singleton"; import { DeliverAlertService } from "./services/alerts/deliverAlert.server"; import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server"; import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server"; +import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server"; +import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server"; function initializeWorker() { const redisOptions = { @@ -52,6 +54,24 @@ function initializeWorker() { maxAttempts: 3, }, }, + "v3.expireRun": { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 6, + }, + }, + "v3.enqueueDelayedRun": { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 6, + }, + }, }, concurrency: { workers: env.COMMON_WORKER_CONCURRENCY_WORKERS, @@ -65,16 +85,26 @@ function initializeWorker() { "v3.deliverAlert": async ({ payload }) => { const service = new DeliverAlertService(); - return await service.call(payload.alertId); + await service.call(payload.alertId); }, "v3.performDeploymentAlerts": async ({ payload }) => { const service = new PerformDeploymentAlertsService(); - return await service.call(payload.deploymentId); + await service.call(payload.deploymentId); }, "v3.performTaskRunAlerts": async ({ payload }) => { const service = new PerformTaskRunAlertsService(); - return await service.call(payload.runId); + await service.call(payload.runId); + }, + "v3.expireRun": async ({ payload }) => { + const service = new ExpireEnqueuedRunService(); + + await service.call(payload.runId); + }, + "v3.enqueueDelayedRun": async ({ payload }) => { + const service = new EnqueueDelayedRunService(); + + await service.call(payload.runId); }, }, }); diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index c84bc413e3..a2d6436fa7 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -156,7 +156,7 @@ export class CreateTaskRunAttemptService extends BaseService { }); if (taskRun.ttl) { - await ExpireEnqueuedRunService.dequeue(taskRun.id, tx); + await ExpireEnqueuedRunService.ack(taskRun.id, tx); } } diff --git a/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts b/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts index 55db573cc2..6306df4765 100644 --- a/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts +++ b/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts @@ -4,8 +4,34 @@ import { logger } from "~/services/logger.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; +import { commonWorker } from "../commonWorker.server"; +import { workerQueue } from "~/services/worker.server"; export class EnqueueDelayedRunService extends BaseService { + public static async enqueue(runId: string, runAt?: Date) { + await commonWorker.enqueue({ + job: "v3.enqueueDelayedRun", + payload: { runId }, + availableAt: runAt, + id: `v3.enqueueDelayed:${runId}`, + }); + } + + public static async reschedule(runId: string, runAt?: Date) { + // We have to do this for now because it's possible that the workerQueue + // was used when the run was first delayed, and EnqueueDelayedRunService.reschedule + // is called from RescheduleTaskRunService, which allows the runAt to be changed + // so if we don't dequeue the old job, we might end up with multiple jobs + await workerQueue.dequeue(`v3.enqueueDelayedRun.${runId}`); + + await commonWorker.enqueue({ + job: "v3.enqueueDelayedRun", + payload: { runId }, + availableAt: runAt, + id: `v3.enqueueDelayed:${runId}`, + }); + } + public async call(runId: string) { const run = await this._prisma.taskRun.findFirst({ where: { @@ -52,7 +78,7 @@ export class EnqueueDelayedRunService extends BaseService { const expireAt = parseNaturalLanguageDuration(run.ttl); if (expireAt) { - await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx); + await ExpireEnqueuedRunService.enqueue(run.id, expireAt); } } }); diff --git a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts index 4834976101..a55063c124 100644 --- a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts +++ b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts @@ -1,21 +1,24 @@ +import { PrismaClientOrTransaction } from "~/db.server"; import { logger } from "~/services/logger.server"; -import { BaseService } from "./baseService.server"; +import { commonWorker } from "../commonWorker.server"; import { eventRepository } from "../eventRepository.server"; +import { BaseService } from "./baseService.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; -import { workerQueue } from "~/services/worker.server"; -import { PrismaClientOrTransaction } from "~/db.server"; export class ExpireEnqueuedRunService extends BaseService { - public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) { - return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx }); + public static async ack(runId: string, tx?: PrismaClientOrTransaction) { + // We don't "dequeue" from the workerQueue here because it would be redundant and if this service + // is called for a run that has already started, nothing happens + await commonWorker.ack(`v3.expireRun:${runId}`); } - public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) { - return await workerQueue.enqueue( - "v3.expireRun", - { runId }, - { runAt, jobKey: `v3.expireRun:${runId}`, tx } - ); + public static async enqueue(runId: string, runAt?: Date) { + return await commonWorker.enqueue({ + job: "v3.expireRun", + payload: { runId }, + availableAt: runAt, + id: `v3.expireRun:${runId}`, + }); } public async call(runId: string) { diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index 81df8461ec..d69d11c7d4 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -101,7 +101,7 @@ export class FinalizeTaskRunService extends BaseService { }); if (run.ttl) { - await ExpireEnqueuedRunService.dequeue(run.id); + await ExpireEnqueuedRunService.ack(run.id); } if (attemptStatus || error) { diff --git a/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts b/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts index 2d10ba6633..4a26bca94e 100644 --- a/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts +++ b/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts @@ -1,9 +1,8 @@ +import { RescheduleRunRequestBody } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; import { BaseService, ServiceValidationError } from "./baseService.server"; -import { RescheduleRunRequestBody } from "@trigger.dev/core/v3"; +import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server"; import { parseDelay } from "./triggerTask.server"; -import { $transaction } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; export class RescheduleTaskRunService extends BaseService { public async call(taskRun: TaskRun, body: RescheduleRunRequestBody) { @@ -17,23 +16,17 @@ export class RescheduleTaskRunService extends BaseService { throw new ServiceValidationError(`Invalid delay: ${body.delay}`); } - return await $transaction(this._prisma, "reschedule run", async (tx) => { - const updatedRun = await tx.taskRun.update({ - where: { - id: taskRun.id, - }, - data: { - delayUntil: delay, - }, - }); + const updatedRun = await this._prisma.taskRun.update({ + where: { + id: taskRun.id, + }, + data: { + delayUntil: delay, + }, + }); - await workerQueue.enqueue( - "v3.enqueueDelayedRun", - { runId: taskRun.id }, - { tx, runAt: delay, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` } - ); + await EnqueueDelayedRunService.reschedule(taskRun.id, delay); - return updatedRun; - }); + return updatedRun; } } diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 7bbb558dbb..47c96a0f19 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -28,6 +28,7 @@ import { clampMaxDuration } from "../utils/maxDuration"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { Prisma, TaskRun } from "@trigger.dev/database"; import { sanitizeQueueName } from "~/models/taskQueue.server"; +import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -515,18 +516,14 @@ export class TriggerTaskService extends BaseService { } if (taskRun.delayUntil) { - await workerQueue.enqueue( - "v3.enqueueDelayedRun", - { runId: taskRun.id }, - { tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` } - ); + await EnqueueDelayedRunService.enqueue(taskRun.id, taskRun.delayUntil); } if (!taskRun.delayUntil && taskRun.ttl) { const expireAt = parseNaturalLanguageDuration(taskRun.ttl); if (expireAt) { - await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx); + await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt); } }