From d7f34c7b7cb595806a56ef0f25fa20a5a268d197 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 5 Sep 2025 12:58:50 +0100 Subject: [PATCH] fix(webapp): worker actions now catch service validation errors and respond properly This also stops all the unnecessary error logging when throwing ServiceValidationErrors --- .../app/routes/api.v1.tasks.$taskId.trigger.ts | 2 +- apps/webapp/app/runEngine/concerns/errors.ts | 6 ------ .../app/runEngine/concerns/payloads.server.ts | 7 ++----- .../webapp/app/runEngine/concerns/queues.server.ts | 14 +++++++------- .../app/runEngine/services/triggerTask.server.ts | 10 +++++----- .../runEngine/validators/triggerTaskValidator.ts | 8 ++++---- .../services/routeBuilders/apiBuilder.server.ts | 11 ++++++++++- internal-packages/run-engine/src/engine/index.ts | 2 +- internal-packages/run-engine/src/engine/locking.ts | 6 ++++++ internal-packages/run-engine/src/index.ts | 6 +++++- 10 files changed, 41 insertions(+), 31 deletions(-) delete mode 100644 apps/webapp/app/runEngine/concerns/errors.ts diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 11613427a9..129bf4c3cc 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -1,3 +1,4 @@ +import { EngineServiceValidationError } from "@internal/run-engine"; import { json } from "@remix-run/server-runtime"; import { generateJWT as internal_generateJWT, @@ -8,7 +9,6 @@ import { TaskRun } from "@trigger.dev/database"; import { z } from "zod"; import { prisma } from "~/db.server"; import { env } from "~/env.server"; -import { EngineServiceValidationError } from "~/runEngine/concerns/errors"; import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; diff --git a/apps/webapp/app/runEngine/concerns/errors.ts b/apps/webapp/app/runEngine/concerns/errors.ts deleted file mode 100644 index f1f56b86dd..0000000000 --- a/apps/webapp/app/runEngine/concerns/errors.ts +++ /dev/null @@ -1,6 +0,0 @@ -export class EngineServiceValidationError extends Error { - constructor(message: string, public status?: number) { - super(message); - this.name = "EngineServiceValidationError"; - } -} diff --git a/apps/webapp/app/runEngine/concerns/payloads.server.ts b/apps/webapp/app/runEngine/concerns/payloads.server.ts index 637a54d584..5ad84f8dc6 100644 --- a/apps/webapp/app/runEngine/concerns/payloads.server.ts +++ b/apps/webapp/app/runEngine/concerns/payloads.server.ts @@ -3,7 +3,7 @@ import { PayloadProcessor, TriggerTaskRequest } from "../types"; import { env } from "~/env.server"; import { startActiveSpan } from "~/v3/tracer.server"; import { uploadPacketToObjectStore } from "~/v3/r2.server"; -import { EngineServiceValidationError } from "./errors"; +import { ServiceValidationError } from "~/v3/services/common.server"; export class DefaultPayloadProcessor implements PayloadProcessor { async process(request: TriggerTaskRequest): Promise { @@ -36,10 +36,7 @@ export class DefaultPayloadProcessor implements PayloadProcessor { ); if (uploadError) { - throw new EngineServiceValidationError( - "Failed to upload large payload to object store", - 500 - ); // This is retryable + throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable } return { diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 0e213a58d2..3a853f9b25 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -13,8 +13,8 @@ import { import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server"; import type { RunEngine } from "~/v3/runEngine.server"; import { env } from "~/env.server"; -import { EngineServiceValidationError } from "./errors"; import { tryCatch } from "@trigger.dev/core/v3"; +import { ServiceValidationError } from "~/v3/services/common.server"; export class DefaultQueueManager implements QueueManager { constructor( @@ -45,7 +45,7 @@ export class DefaultQueueManager implements QueueManager { }); if (!specifiedQueue) { - throw new EngineServiceValidationError( + throw new ServiceValidationError( `Specified queue '${specifiedQueueName}' not found or not associated with locked version '${ lockedBackgroundWorker.version ?? "" }'.` @@ -68,7 +68,7 @@ export class DefaultQueueManager implements QueueManager { }); if (!lockedTask) { - throw new EngineServiceValidationError( + throw new ServiceValidationError( `Task '${request.taskId}' not found on locked version '${ lockedBackgroundWorker.version ?? "" }'.` @@ -83,7 +83,7 @@ export class DefaultQueueManager implements QueueManager { workerId: lockedBackgroundWorker.id, version: lockedBackgroundWorker.version, }); - throw new EngineServiceValidationError( + throw new ServiceValidationError( `Default queue configuration for task '${request.taskId}' missing on locked version '${ lockedBackgroundWorker.version ?? "" }'.` @@ -97,7 +97,7 @@ export class DefaultQueueManager implements QueueManager { // Task is not locked to a specific version, use regular logic if (request.body.options?.lockToVersion) { // This should only happen if the findFirst failed, indicating the version doesn't exist - throw new EngineServiceValidationError( + throw new ServiceValidationError( `Task locked to version '${request.body.options.lockToVersion}', but no worker found with that version.` ); } @@ -221,11 +221,11 @@ export class DefaultQueueManager implements QueueManager { ); if (error) { - throw new EngineServiceValidationError(error.message); + throw new ServiceValidationError(error.message); } if (!workerGroup) { - throw new EngineServiceValidationError("No worker group found"); + throw new ServiceValidationError("No worker group found"); } return workerGroup.masterQueue; diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 9a190883cd..0203ba0c76 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -31,7 +31,6 @@ import type { } from "../../v3/services/triggerTask.server"; import { getTaskEventStore } from "../../v3/taskEventStore.server"; import { clampMaxDuration } from "../../v3/utils/maxDuration"; -import { EngineServiceValidationError } from "../concerns/errors"; import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server"; import type { PayloadProcessor, @@ -41,6 +40,7 @@ import type { TriggerTaskRequest, TriggerTaskValidator, } from "../types"; +import { ServiceValidationError } from "~/v3/services/common.server"; export class RunEngineTriggerTaskService { private readonly queueConcern: QueueManager; @@ -157,7 +157,7 @@ export class RunEngineTriggerTaskService { const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay)); if (parseDelayError) { - throw new EngineServiceValidationError(`Invalid delay ${body.options?.delay}`); + throw new ServiceValidationError(`Invalid delay ${body.options?.delay}`); } const ttl = @@ -210,7 +210,7 @@ export class RunEngineTriggerTaskService { }); if (!queueSizeGuard.ok) { - throw new EngineServiceValidationError( + throw new ServiceValidationError( `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` ); } @@ -351,7 +351,7 @@ export class RunEngineTriggerTaskService { ); if (result?.error) { - throw new EngineServiceValidationError( + throw new ServiceValidationError( taskRunErrorToString(taskRunErrorEnhancer(result.error)) ); } @@ -365,7 +365,7 @@ export class RunEngineTriggerTaskService { } if (error instanceof RunOneTimeUseTokenError) { - throw new EngineServiceValidationError( + throw new ServiceValidationError( `Cannot trigger ${taskId} with a one-time use token as it has already been used.` ); } diff --git a/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts b/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts index 93eb22258c..ab6e46521a 100644 --- a/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts +++ b/apps/webapp/app/runEngine/validators/triggerTaskValidator.ts @@ -3,7 +3,6 @@ import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; import { MAX_ATTEMPTS, OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { isFinalRunStatus } from "~/v3/taskStatus"; -import { EngineServiceValidationError } from "../concerns/errors"; import type { EntitlementValidationParams, EntitlementValidationResult, @@ -13,6 +12,7 @@ import type { TriggerTaskValidator, ValidationResult, } from "../types"; +import { ServiceValidationError } from "~/v3/services/common.server"; export class DefaultTriggerTaskValidator implements TriggerTaskValidator { validateTags(params: TagValidationParams): ValidationResult { @@ -29,7 +29,7 @@ export class DefaultTriggerTaskValidator implements TriggerTaskValidator { if (tags.length > MAX_TAGS_PER_RUN) { return { ok: false, - error: new EngineServiceValidationError( + error: new ServiceValidationError( `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${tags.length}.` ), }; @@ -65,7 +65,7 @@ export class DefaultTriggerTaskValidator implements TriggerTaskValidator { if (attempt > MAX_ATTEMPTS) { return { ok: false, - error: new EngineServiceValidationError( + error: new ServiceValidationError( `Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.` ), }; @@ -95,7 +95,7 @@ export class DefaultTriggerTaskValidator implements TriggerTaskValidator { return { ok: false, - error: new EngineServiceValidationError( + error: new ServiceValidationError( `Cannot trigger ${taskId} as the parent run has a status of ${parentRun.status}` ), }; diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index 25783f8610..9fa2b8e5d0 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -23,6 +23,8 @@ import { } from "~/v3/services/worker/workerGroupTokenService.server"; import { API_VERSIONS, getApiVersion } from "~/api/versions"; import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker"; +import { ServiceValidationError } from "~/v3/services/common.server"; +import { EngineServiceValidationError } from "@internal/run-engine"; type AnyZodSchema = z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; @@ -1040,11 +1042,18 @@ export function createActionWorkerApiRoute< }); return result; } catch (error) { - console.error("Error in API route:", error); if (error instanceof Response) { return error; } + if (error instanceof EngineServiceValidationError) { + return json({ error: error.message }, { status: error.status ?? 422 }); + } + + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: error.status ?? 422 }); + } + logger.error("Error in action", { error: error instanceof Error diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e81aedfe8b..d513a16b78 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1,4 +1,3 @@ -import { BillingCache } from "./billingCache.js"; import { createRedisClient, Redis } from "@internal/redis"; import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; @@ -30,6 +29,7 @@ import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrat import { RunQueue } from "../run-queue/index.js"; import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js"; import { MinimalAuthenticatedEnvironment } from "../shared/index.js"; +import { BillingCache } from "./billingCache.js"; import { NotImplementedError, RunDuplicateIdempotencyKeyError } from "./errors.js"; import { EventBus, EventBusEvents } from "./eventBus.js"; import { RunLocker } from "./locking.js"; diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 40548cd435..299969a65e 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -15,6 +15,7 @@ import { Attributes, Histogram, } from "@internal/tracing"; +import { ServiceValidationError } from "./errors.js"; const SemanticAttributes = { LOCK_TYPE: "run_engine.lock.type", @@ -174,6 +175,10 @@ export class RunLocker { ); if (error) { + if (error instanceof ServiceValidationError) { + throw error; + } + // Record failed lock acquisition const lockDuration = performance.now() - lockStartTime; this.lockDurationHistogram.record(lockDuration, { @@ -186,6 +191,7 @@ export class RunLocker { resources, duration: this.duration, }); + throw error; } diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index 845fb48e6e..86cacc6b13 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -1,4 +1,8 @@ export { RunEngine } from "./engine/index.js"; -export { RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./engine/errors.js"; +export { + RunDuplicateIdempotencyKeyError, + RunOneTimeUseTokenError, + ServiceValidationError as EngineServiceValidationError, +} from "./engine/errors.js"; export type { EventBusEventArgs, EventBusEvents } from "./engine/eventBus.js"; export type { AuthenticatedEnvironment } from "./shared/index.js";