Skip to content

Move run ttl and delays from graphile to redis worker #1672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server";
import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server";

function initializeWorker() {
const redisOptions = {
Expand Down Expand Up @@ -52,6 +54,24 @@ function initializeWorker() {
maxAttempts: 3,
},
},
"v3.expireRun": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 6,
},
},
"v3.enqueueDelayedRun": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 6,
},
},
},
concurrency: {
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
Expand All @@ -65,16 +85,26 @@ function initializeWorker() {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

return await service.call(payload.alertId);
await service.call(payload.alertId);
},
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

return await service.call(payload.deploymentId);
await service.call(payload.deploymentId);
},
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
return await service.call(payload.runId);
await service.call(payload.runId);
},
"v3.expireRun": async ({ payload }) => {
const service = new ExpireEnqueuedRunService();

await service.call(payload.runId);
},
"v3.enqueueDelayedRun": async ({ payload }) => {
const service = new EnqueueDelayedRunService();

await service.call(payload.runId);
},
},
});
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class CreateTaskRunAttemptService extends BaseService {
});

if (taskRun.ttl) {
await ExpireEnqueuedRunService.dequeue(taskRun.id, tx);
await ExpireEnqueuedRunService.ack(taskRun.id, tx);
}
}

Expand Down
28 changes: 27 additions & 1 deletion apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,34 @@ import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { commonWorker } from "../commonWorker.server";
import { workerQueue } from "~/services/worker.server";

export class EnqueueDelayedRunService extends BaseService {
public static async enqueue(runId: string, runAt?: Date) {
await commonWorker.enqueue({
job: "v3.enqueueDelayedRun",
payload: { runId },
availableAt: runAt,
id: `v3.enqueueDelayed:${runId}`,
});
}

public static async reschedule(runId: string, runAt?: Date) {
// We have to do this for now because it's possible that the workerQueue
// was used when the run was first delayed, and EnqueueDelayedRunService.reschedule
// is called from RescheduleTaskRunService, which allows the runAt to be changed
// so if we don't dequeue the old job, we might end up with multiple jobs
await workerQueue.dequeue(`v3.enqueueDelayedRun.${runId}`);

await commonWorker.enqueue({
job: "v3.enqueueDelayedRun",
payload: { runId },
availableAt: runAt,
id: `v3.enqueueDelayed:${runId}`,
});
}

public async call(runId: string) {
const run = await this._prisma.taskRun.findFirst({
where: {
Expand Down Expand Up @@ -52,7 +78,7 @@ export class EnqueueDelayedRunService extends BaseService {
const expireAt = parseNaturalLanguageDuration(run.ttl);

if (expireAt) {
await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx);
await ExpireEnqueuedRunService.enqueue(run.id, expireAt);
}
}
});
Expand Down
25 changes: 14 additions & 11 deletions apps/webapp/app/v3/services/expireEnqueuedRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import { PrismaClientOrTransaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { commonWorker } from "../commonWorker.server";
import { eventRepository } from "../eventRepository.server";
import { BaseService } from "./baseService.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { workerQueue } from "~/services/worker.server";
import { PrismaClientOrTransaction } from "~/db.server";

export class ExpireEnqueuedRunService extends BaseService {
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx });
public static async ack(runId: string, tx?: PrismaClientOrTransaction) {
// We don't "dequeue" from the workerQueue here because it would be redundant and if this service
// is called for a run that has already started, nothing happens
await commonWorker.ack(`v3.expireRun:${runId}`);
}

public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
return await workerQueue.enqueue(
"v3.expireRun",
{ runId },
{ runAt, jobKey: `v3.expireRun:${runId}`, tx }
);
public static async enqueue(runId: string, runAt?: Date) {
return await commonWorker.enqueue({
job: "v3.expireRun",
payload: { runId },
availableAt: runAt,
id: `v3.expireRun:${runId}`,
});
}

public async call(runId: string) {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class FinalizeTaskRunService extends BaseService {
});

if (run.ttl) {
await ExpireEnqueuedRunService.dequeue(run.id);
await ExpireEnqueuedRunService.ack(run.id);
}

if (attemptStatus || error) {
Expand Down
31 changes: 12 additions & 19 deletions apps/webapp/app/v3/services/rescheduleTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
import { TaskRun } from "@trigger.dev/database";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
import { parseDelay } from "./triggerTask.server";
import { $transaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";

export class RescheduleTaskRunService extends BaseService {
public async call(taskRun: TaskRun, body: RescheduleRunRequestBody) {
Expand All @@ -17,23 +16,17 @@ export class RescheduleTaskRunService extends BaseService {
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
}

return await $transaction(this._prisma, "reschedule run", async (tx) => {
const updatedRun = await tx.taskRun.update({
where: {
id: taskRun.id,
},
data: {
delayUntil: delay,
},
});
const updatedRun = await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
delayUntil: delay,
},
});

await workerQueue.enqueue(
"v3.enqueueDelayedRun",
{ runId: taskRun.id },
{ tx, runAt: delay, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
);
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);

return updatedRun;
});
return updatedRun;
}
}
9 changes: 3 additions & 6 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { clampMaxDuration } from "../utils/maxDuration";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { Prisma, TaskRun } from "@trigger.dev/database";
import { sanitizeQueueName } from "~/models/taskQueue.server";
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -515,18 +516,14 @@ export class TriggerTaskService extends BaseService {
}

if (taskRun.delayUntil) {
await workerQueue.enqueue(
"v3.enqueueDelayedRun",
{ runId: taskRun.id },
{ tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
);
await EnqueueDelayedRunService.enqueue(taskRun.id, taskRun.delayUntil);
}

if (!taskRun.delayUntil && taskRun.ttl) {
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);

if (expireAt) {
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx);
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt);
}
}

Expand Down