From 5c19ae5620347b35da7342f580b2f8029094669a Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 11 Feb 2025 18:11:18 +0000 Subject: [PATCH 1/5] WIP fix for ResumeAttemptService selecting the wrong attempt (which has no error or output) --- .../app/v3/services/resumeAttempt.server.ts | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index 7f927e7517..d4f11b3eb1 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -27,6 +27,8 @@ export class ResumeAttemptService extends BaseService { take: 1, select: { id: true, + number: true, + status: true, }, } satisfies Prisma.TaskRunInclude["attempts"]; @@ -130,7 +132,20 @@ export class ResumeAttemptService extends BaseService { return; } - completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id); + //find the best attempt for each batch item + //it should be the most recent one in a final state + const finalAttempts = dependentBatchItems.map((item) => { + return item.taskRun.attempts + .filter((a) => FINAL_ATTEMPT_STATUSES.includes(a.status)) + .sort((a, b) => b.number - a.number)[0]; + }); + + completedAttemptIds = finalAttempts.map((a) => a.id); + + if (completedAttemptIds.length !== dependentBatchItems.length) { + this._logger.error("[ResumeAttemptService] not all batch items have attempts"); + return; + } } else { this._logger.error("No batch dependency"); return; From b7b3bb07352fc1af0e3530f911cc90fc69d61158 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 12 Feb 2025 13:20:34 +0000 Subject: [PATCH 2/5] =?UTF-8?q?Don=E2=80=99t=20create=20an=20attempt=20if?= =?UTF-8?q?=20the=20run=20is=20already=20in=20a=20final=20status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/webapp/app/v3/services/createTaskRunAttempt.server.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index a2d6436fa7..327844f201 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -11,6 +11,7 @@ import { BaseService, ServiceValidationError } from "./baseService.server"; import { CrashTaskRunService } from "./crashTaskRun.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { findQueueInEnvironment } from "~/models/taskQueue.server"; +import { FINAL_RUN_STATUSES } from "../taskStatus"; export class CreateTaskRunAttemptService extends BaseService { public async call({ @@ -91,11 +92,17 @@ export class CreateTaskRunAttemptService extends BaseService { span.setAttribute("taskRunId", taskRun.id); span.setAttribute("taskRunFriendlyId", taskRun.friendlyId); + span.setAttribute("taskRunStatus", taskRun.status); if (taskRun.status === "CANCELED") { throw new ServiceValidationError("Task run is cancelled", 400); } + // If the run is finalized, it's pointless to create another attempt + if (FINAL_RUN_STATUSES.includes(taskRun.status)) { + throw new ServiceValidationError("Task run is already finished", 400); + } + const lockedBy = taskRun.lockedBy; if (!lockedBy) { From ecf8317b624037ddfa854878a6d8c0b8258bd869 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 12 Feb 2025 13:21:21 +0000 Subject: [PATCH 3/5] =?UTF-8?q?Don=E2=80=99t=20get=20all=20the=20columns?= =?UTF-8?q?=20for=20the=20query.=20Improved=20the=20logging.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/v3/services/resumeAttempt.server.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index d4f11b3eb1..9632f88d81 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -39,9 +39,9 @@ export class ResumeAttemptService extends BaseService { include: { taskRun: true, dependencies: { - include: { + select: { taskRun: { - include: { + select: { attempts: latestAttemptSelect, }, }, @@ -52,11 +52,11 @@ export class ResumeAttemptService extends BaseService { take: 1, }, batchDependencies: { - include: { + select: { items: { - include: { + select: { taskRun: { - include: { + select: { attempts: latestAttemptSelect, }, }, @@ -143,7 +143,13 @@ export class ResumeAttemptService extends BaseService { completedAttemptIds = finalAttempts.map((a) => a.id); if (completedAttemptIds.length !== dependentBatchItems.length) { - this._logger.error("[ResumeAttemptService] not all batch items have attempts"); + this._logger.error("[ResumeAttemptService] not all batch items have attempts", { + runId: attempt.taskRunId, + completedAttemptIds, + finalAttempts, + dependentBatchItems, + }); + return; } } else { From eacc3b1e063b1591704cb543e78aa30baa201a08 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 12 Feb 2025 13:42:51 +0000 Subject: [PATCH 4/5] Added a log to the batch example --- references/hello-world/src/trigger/example.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 3a19fd0946..7c35f8fc84 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -80,6 +80,8 @@ export const batchTask = task({ const results = await childTask.batchTriggerAndWait(items); + logger.info("Batch task complete", { results }); + return { batchCount: payload.count, results, From 3b2b2bef82a6562e77297a14b3e2cd4ad76ddd47 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 12 Feb 2025 13:54:34 +0000 Subject: [PATCH 5/5] Filter out the undefined values --- apps/webapp/app/v3/services/resumeAttempt.server.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index 9632f88d81..3dcc1cf6e4 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -134,11 +134,14 @@ export class ResumeAttemptService extends BaseService { //find the best attempt for each batch item //it should be the most recent one in a final state - const finalAttempts = dependentBatchItems.map((item) => { - return item.taskRun.attempts - .filter((a) => FINAL_ATTEMPT_STATUSES.includes(a.status)) - .sort((a, b) => b.number - a.number)[0]; - }); + const finalAttempts = dependentBatchItems + .map((item) => { + return item.taskRun.attempts + .filter((a) => FINAL_ATTEMPT_STATUSES.includes(a.status)) + .sort((a, b) => b.number - a.number) + .at(0); + }) + .filter(Boolean); completedAttemptIds = finalAttempts.map((a) => a.id);