From ae0c3730020d2b0de999068adb4e8dfcb470b79c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 11 Feb 2025 18:27:06 +0000 Subject: [PATCH] Batch queue runs that are waiting for deploy --- apps/webapp/app/env.server.ts | 3 + .../services/executeTasksWaitingForDeploy.ts | 77 +++++++++---------- 2 files changed, 38 insertions(+), 42 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 49726cfbdc..e888ff5b2e 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -442,6 +442,9 @@ const EnvironmentSchema = z.object({ .default(process.env.REDIS_TLS_DISABLED ?? "false"), LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE: z.coerce.number().int().default(100), + LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS: z.coerce.number().int().default(1_000), + COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts b/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts index cc11f93e4c..0f40ef290b 100644 --- a/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts +++ b/apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts @@ -3,6 +3,7 @@ import { workerQueue } from "~/services/worker.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { env } from "~/env.server"; export class ExecuteTasksWaitingForDeployService extends BaseService { public async call(backgroundWorkerId: string) { @@ -17,7 +18,11 @@ export class ExecuteTasksWaitingForDeployService extends BaseService { organization: true, }, }, - tasks: true, + tasks: { + select: { + slug: true, + }, + }, }, }); @@ -26,6 +31,8 @@ export class ExecuteTasksWaitingForDeployService extends BaseService { return; } + const maxCount = env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE; + const runsWaitingForDeploy = await this._prisma.taskRun.findMany({ where: { runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId, @@ -36,8 +43,16 @@ export class ExecuteTasksWaitingForDeployService extends BaseService { }, }, orderBy: { - number: "asc", + createdAt: "asc", }, + select: { + id: true, + status: true, + taskIdentifier: true, + concurrencyKey: true, + queue: true, + }, + take: maxCount + 1, }); if (!runsWaitingForDeploy.length) { @@ -63,50 +78,28 @@ export class ExecuteTasksWaitingForDeployService extends BaseService { }); } - if (!marqs) { - return; - } - - const enqueues: Promise[] = []; - let i = 0; - for (const run of runsWaitingForDeploy) { - enqueues.push( - marqs.enqueueMessage( - backgroundWorker.runtimeEnvironment, - run.queue, - run.id, - { - type: "EXECUTE", - taskIdentifier: run.taskIdentifier, - projectId: backgroundWorker.runtimeEnvironment.projectId, - environmentId: backgroundWorker.runtimeEnvironment.id, - environmentType: backgroundWorker.runtimeEnvironment.type, - }, - run.concurrencyKey ?? undefined, - Date.now() + i * 5 // slight delay to help preserve order - ) + await marqs?.enqueueMessage( + backgroundWorker.runtimeEnvironment, + run.queue, + run.id, + { + type: "EXECUTE", + taskIdentifier: run.taskIdentifier, + projectId: backgroundWorker.runtimeEnvironment.projectId, + environmentId: backgroundWorker.runtimeEnvironment.id, + environmentType: backgroundWorker.runtimeEnvironment.type, + }, + run.concurrencyKey ?? undefined ); - - i++; } - const settled = await Promise.allSettled(enqueues); - - if (settled.some((s) => s.status === "rejected")) { - const rejectedRuns: { id: string; reason: any }[] = []; - - runsWaitingForDeploy.forEach((run, i) => { - if (settled[i].status === "rejected") { - const rejected = settled[i] as PromiseRejectedResult; - - rejectedRuns.push({ id: run.id, reason: rejected.reason }); - } - }); - - logger.error("Failed to requeue task runs for immediate execution", { - rejectedRuns, - }); + if (runsWaitingForDeploy.length > maxCount) { + await ExecuteTasksWaitingForDeployService.enqueue( + backgroundWorkerId, + this._prisma, + new Date(Date.now() + env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS) + ); } }