diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts index 13c3f07bac..ec1968705f 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts @@ -1,14 +1,14 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; -import { BatchTriggerTaskRequestBody } from "@trigger.dev/core/v3"; +import { BatchTriggerTaskRequestBody, BatchTriggerTaskV2RequestBody } from "@trigger.dev/core/v3"; import { z } from "zod"; +import { fromZodError } from "zod-validation-error"; import { MAX_BATCH_TRIGGER_ITEMS } from "~/consts"; +import { env } from "~/env.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; -import { BatchTriggerTaskService } from "~/v3/services/batchTriggerTask.server"; +import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; -import { env } from "~/env.server"; -import { fromZodError } from "zod-validation-error"; const ParamsSchema = z.object({ taskId: z.string(), @@ -85,15 +85,17 @@ export async function action({ request, params }: ActionFunctionArgs) { ); } - const service = new BatchTriggerTaskService(); + const service = new BatchTriggerV3Service(); const traceContext = traceparent && isFromWorker // If the request is from a worker, we should pass the trace context ? { traceparent, tracestate } : undefined; + const v3Body = convertV1BodyToV2Body(body.data, taskId); + try { - const result = await service.call(taskId, authenticationResult.environment, body.data, { + const result = await service.call(authenticationResult.environment, v3Body, { idempotencyKey: idempotencyKey ?? undefined, triggerVersion: triggerVersion ?? undefined, traceContext, @@ -106,8 +108,8 @@ export async function action({ request, params }: ActionFunctionArgs) { return json( { - batchId: result.batch.friendlyId, - runs: result.runs, + batchId: result.id, + runs: result.runs.map((run) => run.id), }, { headers: { @@ -126,3 +128,29 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ error: "Something went wrong" }, { status: 500 }); } } + +// Strip from options: +// - dependentBatch +// - dependentAttempt +// - parentBatch +function convertV1BodyToV2Body( + body: BatchTriggerTaskRequestBody, + taskIdentifier: string +): BatchTriggerTaskV2RequestBody { + return { + items: body.items.map((item) => ({ + task: taskIdentifier, + payload: item.payload, + context: item.context, + options: item.options + ? { + ...item.options, + dependentBatch: undefined, + parentBatch: undefined, + dependentAttempt: undefined, + } + : undefined, + })), + dependentAttempt: body.dependentAttempt, + }; +} diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 83bc67b22b..4d6039c912 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -94,14 +94,17 @@ type RunItemData = { */ export class BatchTriggerV3Service extends BaseService { private _batchProcessingStrategy: BatchProcessingStrategy; + private _asyncBatchProcessSizeThreshold: number; constructor( batchProcessingStrategy?: BatchProcessingStrategy, + asyncBatchProcessSizeThreshold: number = ASYNC_BATCH_PROCESS_SIZE_THRESHOLD, protected readonly _prisma: PrismaClientOrTransaction = prisma ) { super(_prisma); this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel"; + this._asyncBatchProcessSizeThreshold = asyncBatchProcessSizeThreshold; } public async call( @@ -403,7 +406,7 @@ export class BatchTriggerV3Service extends BaseService { options: BatchTriggerTaskServiceOptions = {}, dependentAttempt?: TaskRunAttempt ) { - if (runs.length <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) { + if (runs.length <= this._asyncBatchProcessSizeThreshold) { const batch = await this._prisma.batchTaskRun.create({ data: { friendlyId: batchId, diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 70148e2530..3651c2a5f0 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -74,25 +74,25 @@ export const TriggerTaskRequestBody = z.object({ context: z.any(), options: z .object({ + concurrencyKey: z.string().optional(), + delay: z.string().or(z.coerce.date()).optional(), dependentAttempt: z.string().optional(), - parentAttempt: z.string().optional(), dependentBatch: z.string().optional(), - parentBatch: z.string().optional(), - lockToVersion: z.string().optional(), - queue: QueueOptions.optional(), - concurrencyKey: z.string().optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), - test: z.boolean().optional(), - payloadType: z.string().optional(), - delay: z.string().or(z.coerce.date()).optional(), - ttl: z.string().or(z.number().nonnegative().int()).optional(), - tags: RunTags.optional(), + lockToVersion: z.string().optional(), + machine: MachinePresetName.optional(), maxAttempts: z.number().int().optional(), + maxDuration: z.number().optional(), metadata: z.any(), metadataType: z.string().optional(), - maxDuration: z.number().optional(), - machine: MachinePresetName.optional(), + parentAttempt: z.string().optional(), + parentBatch: z.string().optional(), + payloadType: z.string().optional(), + queue: QueueOptions.optional(), + tags: RunTags.optional(), + test: z.boolean().optional(), + ttl: z.string().or(z.number().nonnegative().int()).optional(), }) .optional(), }); @@ -118,22 +118,22 @@ export const BatchTriggerTaskItem = z.object({ context: z.any(), options: z .object({ - lockToVersion: z.string().optional(), - queue: QueueOptions.optional(), concurrencyKey: z.string().optional(), + delay: z.string().or(z.coerce.date()).optional(), idempotencyKey: z.string().optional(), idempotencyKeyTTL: z.string().optional(), - test: z.boolean().optional(), - payloadType: z.string().optional(), - delay: z.string().or(z.coerce.date()).optional(), - ttl: z.string().or(z.number().nonnegative().int()).optional(), - tags: RunTags.optional(), + lockToVersion: z.string().optional(), + machine: MachinePresetName.optional(), maxAttempts: z.number().int().optional(), + maxDuration: z.number().optional(), metadata: z.any(), metadataType: z.string().optional(), - maxDuration: z.number().optional(), parentAttempt: z.string().optional(), - machine: MachinePresetName.optional(), + payloadType: z.string().optional(), + queue: QueueOptions.optional(), + tags: RunTags.optional(), + test: z.boolean().optional(), + ttl: z.string().or(z.number().nonnegative().int()).optional(), }) .optional(), });