diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index a2d6436fa7..327844f201 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -11,6 +11,7 @@ import { BaseService, ServiceValidationError } from "./baseService.server"; import { CrashTaskRunService } from "./crashTaskRun.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { findQueueInEnvironment } from "~/models/taskQueue.server"; +import { FINAL_RUN_STATUSES } from "../taskStatus"; export class CreateTaskRunAttemptService extends BaseService { public async call({ @@ -91,11 +92,17 @@ export class CreateTaskRunAttemptService extends BaseService { span.setAttribute("taskRunId", taskRun.id); span.setAttribute("taskRunFriendlyId", taskRun.friendlyId); + span.setAttribute("taskRunStatus", taskRun.status); if (taskRun.status === "CANCELED") { throw new ServiceValidationError("Task run is cancelled", 400); } + // If the run is finalized, it's pointless to create another attempt + if (FINAL_RUN_STATUSES.includes(taskRun.status)) { + throw new ServiceValidationError("Task run is already finished", 400); + } + const lockedBy = taskRun.lockedBy; if (!lockedBy) { diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index 7f927e7517..3dcc1cf6e4 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -27,6 +27,8 @@ export class ResumeAttemptService extends BaseService { take: 1, select: { id: true, + number: true, + status: true, }, } satisfies Prisma.TaskRunInclude["attempts"]; @@ -37,9 +39,9 @@ export class ResumeAttemptService extends BaseService { include: { taskRun: true, dependencies: { - include: { + select: { taskRun: { - include: { + select: { attempts: latestAttemptSelect, }, }, @@ -50,11 +52,11 @@ export class ResumeAttemptService extends BaseService { take: 1, }, batchDependencies: { - include: { + select: { items: { - include: { + select: { taskRun: { - include: { + select: { attempts: latestAttemptSelect, }, }, @@ -130,7 +132,29 @@ export class ResumeAttemptService extends BaseService { return; } - completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id); + //find the best attempt for each batch item + //it should be the most recent one in a final state + const finalAttempts = dependentBatchItems + .map((item) => { + return item.taskRun.attempts + .filter((a) => FINAL_ATTEMPT_STATUSES.includes(a.status)) + .sort((a, b) => b.number - a.number) + .at(0); + }) + .filter(Boolean); + + completedAttemptIds = finalAttempts.map((a) => a.id); + + if (completedAttemptIds.length !== dependentBatchItems.length) { + this._logger.error("[ResumeAttemptService] not all batch items have attempts", { + runId: attempt.taskRunId, + completedAttemptIds, + finalAttempts, + dependentBatchItems, + }); + + return; + } } else { this._logger.error("No batch dependency"); return; diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 3a19fd0946..7c35f8fc84 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -80,6 +80,8 @@ export const batchTask = task({ const results = await childTask.batchTriggerAndWait(items); + logger.info("Batch task complete", { results }); + return { batchCount: payload.count, results,