diff --git a/apps/webapp/app/assets/icons/TaskCachedIcon.tsx b/apps/webapp/app/assets/icons/TaskCachedIcon.tsx new file mode 100644 index 0000000000..650f9be396 --- /dev/null +++ b/apps/webapp/app/assets/icons/TaskCachedIcon.tsx @@ -0,0 +1,49 @@ +export function TaskCachedIcon({ className }: { className?: string }) { + return ( + + + + + + + + + + + + + + + + + + + ); +} diff --git a/apps/webapp/app/components/runs/v3/BatchFilters.tsx b/apps/webapp/app/components/runs/v3/BatchFilters.tsx index f7ab261d4e..69c342fe67 100644 --- a/apps/webapp/app/components/runs/v3/BatchFilters.tsx +++ b/apps/webapp/app/components/runs/v3/BatchFilters.tsx @@ -359,8 +359,8 @@ function BatchIdDropdown({ if (batchId) { if (!batchId.startsWith("batch_")) { error = "Batch IDs start with 'batch_'"; - } else if (batchId.length !== 27) { - error = "Batch IDs are 27 characters long"; + } else if (batchId.length !== 27 && batchId.length !== 31) { + error = "Batch IDs are 27/32 characters long"; } } diff --git a/apps/webapp/app/components/runs/v3/RunFilters.tsx b/apps/webapp/app/components/runs/v3/RunFilters.tsx index 6cc75a6129..0937ccf945 100644 --- a/apps/webapp/app/components/runs/v3/RunFilters.tsx +++ b/apps/webapp/app/components/runs/v3/RunFilters.tsx @@ -763,8 +763,8 @@ function RunIdDropdown({ if (runId) { if (!runId.startsWith("run_")) { error = "Run IDs start with 'run_'"; - } else if (runId.length !== 25) { - error = "Run IDs are 25 characters long"; + } else if (runId.length !== 25 && runId.length !== 29) { + error = "Run IDs are 25/30 characters long"; } } diff --git a/apps/webapp/app/components/runs/v3/RunIcon.tsx b/apps/webapp/app/components/runs/v3/RunIcon.tsx index 41e442bf72..84c386706d 100644 --- a/apps/webapp/app/components/runs/v3/RunIcon.tsx +++ b/apps/webapp/app/components/runs/v3/RunIcon.tsx @@ -7,6 +7,7 @@ import { } from "@heroicons/react/20/solid"; import { AttemptIcon } from "~/assets/icons/AttemptIcon"; import { TaskIcon } from "~/assets/icons/TaskIcon"; +import { TaskCachedIcon } from "~/assets/icons/TaskCachedIcon"; import { NamedIcon } from "~/components/primitives/NamedIcon"; import { cn } from "~/utils/cn"; @@ -41,6 +42,8 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) { switch (name) { case "task": return ; + case "task-cached": + return ; case "scheduled": return ; case "attempt": diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index c8a9b50529..ae902d1317 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -2,6 +2,7 @@ import { MachinePresetName, parsePacket, prettyPrintPacket, + SemanticInternalAttributes, TaskRunError, } from "@trigger.dev/core/v3"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; @@ -39,7 +40,22 @@ export class SpanPresenter extends BasePresenter { throw new Error("Project not found"); } - const run = await this.getRun(spanId); + const parentRun = await this._prisma.taskRun.findFirst({ + select: { + traceId: true, + }, + where: { + friendlyId: runFriendlyId, + }, + }); + + if (!parentRun) { + return; + } + + const { traceId } = parentRun; + + const run = await this.getRun(traceId, spanId); if (run) { return { type: "run" as const, @@ -48,7 +64,7 @@ export class SpanPresenter extends BasePresenter { } //get the run - const span = await this.getSpan(runFriendlyId, spanId); + const span = await this.getSpan(traceId, spanId); if (!span) { throw new Error("Span not found"); @@ -60,10 +76,17 @@ export class SpanPresenter extends BasePresenter { }; } - async getRun(spanId: string) { + async getRun(traceId: string, spanId: string) { + const span = await eventRepository.getSpan(spanId, traceId); + + if (!span) { + return; + } + const run = await this._replica.taskRun.findFirst({ select: { id: true, + spanId: true, traceId: true, //metadata number: true, @@ -92,6 +115,7 @@ export class SpanPresenter extends BasePresenter { //status + duration status: true, startedAt: true, + firstAttemptStartedAt: true, createdAt: true, updatedAt: true, queuedAt: true, @@ -99,6 +123,7 @@ export class SpanPresenter extends BasePresenter { logsDeletedAt: true, //idempotency idempotencyKey: true, + idempotencyKeyExpiresAt: true, //delayed delayUntil: true, //ttl @@ -161,9 +186,13 @@ export class SpanPresenter extends BasePresenter { }, }, }, - where: { - spanId, - }, + where: span.originalRun + ? { + friendlyId: span.originalRun, + } + : { + spanId, + }, }); if (!run) { @@ -238,8 +267,6 @@ export class SpanPresenter extends BasePresenter { } } - const span = await eventRepository.getSpan(spanId, run.traceId); - const metadata = run.metadata ? await prettyPrintPacket(run.metadata, run.metadataType, { filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"], @@ -296,6 +323,7 @@ export class SpanPresenter extends BasePresenter { status: run.status, createdAt: run.createdAt, startedAt: run.startedAt, + firstAttemptStartedAt: run.firstAttemptStartedAt, updatedAt: run.updatedAt, delayUntil: run.delayUntil, expiredAt: run.expiredAt, @@ -307,6 +335,8 @@ export class SpanPresenter extends BasePresenter { sdkVersion: run.lockedToVersion?.sdkVersion, isTest: run.isTest, environmentId: run.runtimeEnvironment.id, + idempotencyKey: run.idempotencyKey, + idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt, schedule: run.schedule ? { friendlyId: run.schedule.friendlyId, @@ -349,24 +379,13 @@ export class SpanPresenter extends BasePresenter { engine: run.engine, masterQueue: run.masterQueue, secondaryMasterQueue: run.secondaryMasterQueue, + spanId: run.spanId, + isCached: !!span.originalRun, }; } - async getSpan(runFriendlyId: string, spanId: string) { - const run = await this._prisma.taskRun.findFirst({ - select: { - traceId: true, - }, - where: { - friendlyId: runFriendlyId, - }, - }); - - if (!run) { - return; - } - - const span = await eventRepository.getSpan(spanId, run.traceId); + async getSpan(traceId: string, spanId: string) { + const span = await eventRepository.getSpan(spanId, traceId); if (!span) { return; diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 96eec3ba31..2bb709080f 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -108,6 +108,7 @@ const { action, loader } = createActionApiRoute( return json( { id: run.friendlyId, + isCached: run.isCached, }, { headers: $responseHeaders, diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 591d04f0ce..c989e950be 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -9,13 +9,11 @@ import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; -import { determineEngineVersion } from "~/v3/engineVersion.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy, BatchTriggerV2Service, } from "~/v3/services/batchTriggerV2.server"; -import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; @@ -88,15 +86,7 @@ const { action, loader } = createActionApiRoute( resolveIdempotencyKeyTTL(idempotencyKeyTTL) ?? new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); - const version = await determineEngineVersion({ - environment: authentication.environment, - version: engineVersion ?? undefined, - }); - - const service = - version === "V1" - ? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined) - : new BatchTriggerV3Service(batchProcessingStrategy ?? undefined); + const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined); try { const batch = await service.call(authentication.environment, body, { diff --git a/apps/webapp/app/routes/api.v2.batches.$batchId.ts b/apps/webapp/app/routes/api.v2.batches.$batchId.ts new file mode 100644 index 0000000000..150978331e --- /dev/null +++ b/apps/webapp/app/routes/api.v2.batches.$batchId.ts @@ -0,0 +1,40 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + batchId: z.string(), +}); + +export const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + findResource: (params, auth) => { + return $replica.batchTaskRun.findFirst({ + where: { + friendlyId: params.batchId, + runtimeEnvironmentId: auth.environment.id, + }, + }); + }, + authorization: { + action: "read", + resource: (batch) => ({ batch: batch.friendlyId }), + superScopes: ["read:runs", "read:all", "admin"], + }, + }, + async ({ resource: batch }) => { + return json({ + id: batch.friendlyId, + status: batch.status, + idempotencyKey: batch.idempotencyKey ?? undefined, + createdAt: batch.createdAt, + updatedAt: batch.updatedAt, + runCount: batch.runCount, + runs: batch.runIds, + }); + } +); diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts new file mode 100644 index 0000000000..72f271e9cf --- /dev/null +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -0,0 +1,154 @@ +import { json } from "@remix-run/server-runtime"; +import { + BatchTriggerTaskV3RequestBody, + BatchTriggerTaskV3Response, + generateJWT, +} from "@trigger.dev/core/v3"; +import { env } from "~/env.server"; +import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; +import { + BatchProcessingStrategy, + BatchTriggerV3Service, +} from "~/v3/services/batchTriggerV3.server"; +import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; +import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; + +const { action, loader } = createActionApiRoute( + { + headers: HeadersSchema.extend({ + "batch-processing-strategy": BatchProcessingStrategy.nullish(), + }), + body: BatchTriggerTaskV3RequestBody, + allowJWT: true, + maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE, + authorization: { + action: "batchTrigger", + resource: (_, __, ___, body) => ({ + tasks: Array.from(new Set(body.items.map((i) => i.task))), + }), + superScopes: ["write:tasks", "admin"], + }, + corsStrategy: "all", + }, + async ({ body, headers, params, authentication }) => { + if (!body.items.length) { + return json({ error: "Batch cannot be triggered with no items" }, { status: 400 }); + } + + // Check the there are fewer than MAX_BATCH_V2_TRIGGER_ITEMS items + if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) { + return json( + { + error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`, + }, + { status: 400 } + ); + } + + const { + "trigger-version": triggerVersion, + "x-trigger-span-parent-as-link": spanParentAsLink, + "x-trigger-worker": isFromWorker, + "x-trigger-client": triggerClient, + "x-trigger-engine-version": engineVersion, + "batch-processing-strategy": batchProcessingStrategy, + traceparent, + tracestate, + } = headers; + + const oneTimeUseToken = await getOneTimeUseToken(authentication); + + logger.debug("Batch trigger request", { + triggerVersion, + spanParentAsLink, + isFromWorker, + triggerClient, + traceparent, + tracestate, + batchProcessingStrategy, + }); + + const traceContext = + traceparent && isFromWorker // If the request is from a worker, we should pass the trace context + ? { traceparent, tracestate } + : undefined; + + const service = new BatchTriggerV3Service(batchProcessingStrategy ?? undefined); + + try { + const batch = await service.call(authentication.environment, body, { + triggerVersion: triggerVersion ?? undefined, + traceContext, + spanParentAsLink: spanParentAsLink === 1, + oneTimeUseToken, + }); + + const $responseHeaders = await responseHeaders( + batch, + authentication.environment, + triggerClient + ); + + return json(batch, { status: 202, headers: $responseHeaders }); + } catch (error) { + logger.error("Batch trigger error", { + error: { + message: (error as Error).message, + stack: (error as Error).stack, + }, + }); + + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: 422 }); + } else if (error instanceof OutOfEntitlementError) { + return json({ error: error.message }, { status: 422 }); + } else if (error instanceof Error) { + return json( + { error: error.message }, + { status: 500, headers: { "x-should-retry": "false" } } + ); + } + + return json({ error: "Something went wrong" }, { status: 500 }); + } + } +); + +async function responseHeaders( + batch: BatchTriggerTaskV3Response, + environment: AuthenticatedEnvironment, + triggerClient?: string | null +): Promise> { + const claimsHeader = JSON.stringify({ + sub: environment.id, + pub: true, + }); + + if (triggerClient === "browser") { + const claims = { + sub: environment.id, + pub: true, + scopes: [`read:batch:${batch.id}`], + }; + + const jwt = await generateJWT({ + secretKey: environment.apiKey, + payload: claims, + expirationTime: "1h", + }); + + return { + "x-trigger-jwt-claims": claimsHeader, + "x-trigger-jwt": jwt, + }; + } + + return { + "x-trigger-jwt-claims": claimsHeader, + }; +} + +export { action, loader }; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx index 1594e82cfd..f1fdd77ea2 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx @@ -44,7 +44,6 @@ import { RunTag } from "~/components/runs/v3/RunTag"; import { SpanEvents } from "~/components/runs/v3/SpanEvents"; import { SpanTitle } from "~/components/runs/v3/SpanTitle"; import { TaskRunAttemptStatusCombo } from "~/components/runs/v3/TaskRunAttemptStatus"; -import { TaskRunsTable } from "~/components/runs/v3/TaskRunsTable"; import { TaskRunStatusCombo } from "~/components/runs/v3/TaskRunStatus"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; @@ -58,7 +57,6 @@ import { cn } from "~/utils/cn"; import { formatCurrencyAccurate } from "~/utils/numberFormatter"; import { v3BatchPath, - v3BatchRunsPath, v3RunDownloadLogsPath, v3RunPath, v3RunSpanPath, @@ -427,12 +425,15 @@ function RunBody({
- {run.taskIdentifier} + + {run.taskIdentifier} + {run.isCached ? " (cached)" : null} +
{runParam && closePanel && ( @@ -602,6 +603,22 @@ function RunBody({ )} + + Idempotency + +
{run.idempotencyKey ? run.idempotencyKey : "–"}
+ {run.idempotencyKey && ( +
+ Expires:{" "} + {run.idempotencyKeyExpiresAt ? ( + + ) : ( + "–" + )} +
+ )} +
+
Version @@ -804,12 +821,17 @@ function RunBody({
{run.friendlyId !== runParam && ( - Focus on run + {run.isCached ? "Jump to original run" : "Focus on run"} )}
diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 5cf7b5313d..42ef5cc28f 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -1,4 +1,4 @@ -import { Attributes, Link, TraceFlags } from "@opentelemetry/api"; +import { Attributes, AttributeValue, Link, TraceFlags } from "@opentelemetry/api"; import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base"; import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"; import { @@ -603,6 +603,11 @@ export class EventRepository { spanEvent.environmentType === "DEVELOPMENT" ); + const originalRun = rehydrateAttribute( + spanEvent.properties, + SemanticInternalAttributes.ORIGINAL_RUN_ID + ); + return { ...spanEvent, ...span.data, @@ -612,6 +617,7 @@ export class EventRepository { events: spanEvents, show, links, + originalRun, }; }); } @@ -754,7 +760,10 @@ export class EventRepository { }); } - public async recordEvent(message: string, options: TraceEventOptions & { duration?: number }) { + public async recordEvent( + message: string, + options: TraceEventOptions & { duration?: number; parentId?: string } + ) { const propagatedContext = extractContextFromCarrier(options.context ?? {}); const startTime = options.startTime ?? getNowInNanoseconds(); @@ -763,7 +772,7 @@ export class EventRepository { (options.endTime ? calculateDurationFromStart(startTime, options.endTime) : 100); const traceId = propagatedContext?.traceparent?.traceId ?? this.generateTraceId(); - const parentId = propagatedContext?.traceparent?.spanId; + const parentId = options.parentId ?? propagatedContext?.traceparent?.spanId; const tracestate = propagatedContext?.tracestate; const spanId = options.spanIdSeed ? this.#generateDeterministicSpanId(traceId, options.spanIdSeed) @@ -847,7 +856,7 @@ export class EventRepository { public async traceEvent( message: string, - options: TraceEventOptions & { incomplete?: boolean }, + options: TraceEventOptions & { incomplete?: boolean; isError?: boolean }, callback: ( e: EventBuilder, traceContext: Record, @@ -944,6 +953,7 @@ export class EventRepository { tracestate, duration: options.incomplete ? 0 : duration, isPartial: options.incomplete, + isError: options.isError, message: message, serviceName: "api server", serviceNamespace: "trigger.dev", @@ -1223,7 +1233,7 @@ function excludePartialEventsWithCorrespondingFullEvent(batch: CreatableEvent[]) ); } -function extractContextFromCarrier(carrier: Record) { +export function extractContextFromCarrier(carrier: Record) { const traceparent = carrier["traceparent"]; const tracestate = carrier["tracestate"]; @@ -1550,3 +1560,26 @@ function rehydrateShow(properties: Prisma.JsonValue): { actions?: boolean } | un return; } + +function rehydrateAttribute( + properties: Prisma.JsonValue, + key: string +): T | undefined { + if (properties === null || properties === undefined) { + return; + } + + if (typeof properties !== "object") { + return; + } + + if (Array.isArray(properties)) { + return; + } + + const value = properties[key]; + + if (!value) return; + + return value as T; +} diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index bb1e239be0..0a42a7351c 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -138,6 +138,29 @@ export function registerRunEngineEventBusHandlers() { } }); + engine.eventBus.on("cachedRunCompleted", async ({ time, spanId, hasError }) => { + try { + const completedEvent = await eventRepository.completeEvent(spanId, { + endTime: time, + attributes: { + isError: hasError, + }, + }); + + if (!completedEvent) { + logger.error("[cachedRunCompleted] Failed to complete event for unknown reason", { + spanId, + }); + return; + } + } catch (error) { + logger.error("[cachedRunCompleted] Failed to complete event for unknown reason", { + error: error instanceof Error ? error.message : error, + spanId, + }); + } + }); + engine.eventBus.on("runExpired", async ({ time, run }) => { try { const completedEvent = await eventRepository.completeEvent(run.spanId, { diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 66c259f83a..b0bf3529fb 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -1,6 +1,7 @@ import { BatchTriggerTaskV2RequestBody, - BatchTriggerTaskV2Response, + BatchTriggerTaskV3RequestBody, + BatchTriggerTaskV3Response, IOPacket, packetRequiresOffloading, parsePacket, @@ -18,7 +19,6 @@ import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2. import { startActiveSpan } from "../tracer.server"; import { ServiceValidationError, WithRunEngine } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; -import { guardQueueSizeLimitsForEnv } from "./triggerTaskV2.server"; const PROCESSING_BATCH_SIZE = 50; const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; @@ -40,8 +40,6 @@ export const BatchProcessingOptions = z.object({ export type BatchProcessingOptions = z.infer; export type BatchTriggerTaskServiceOptions = { - idempotencyKey?: string; - idempotencyKeyExpiresAt?: Date; triggerVersion?: string; traceContext?: Record; spanParentAsLink?: boolean; @@ -65,59 +63,14 @@ export class BatchTriggerV3Service extends WithRunEngine { public async call( environment: AuthenticatedEnvironment, - body: BatchTriggerTaskV2RequestBody, + body: BatchTriggerTaskV3RequestBody, options: BatchTriggerTaskServiceOptions = {} - ): Promise { + ): Promise { try { - return await this.traceWithEnv( + return await this.traceWithEnv( "call()", environment, async (span) => { - const existingBatch = options.idempotencyKey - ? await this._prisma.batchTaskRun.findUnique({ - where: { - runtimeEnvironmentId_idempotencyKey: { - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - }, - }, - }) - : undefined; - - if (existingBatch) { - if ( - existingBatch.idempotencyKeyExpiresAt && - existingBatch.idempotencyKeyExpiresAt < new Date() - ) { - logger.debug("[BatchTriggerV3][call] Idempotency key has expired", { - idempotencyKey: options.idempotencyKey, - batch: { - id: existingBatch.id, - friendlyId: existingBatch.friendlyId, - runCount: existingBatch.runCount, - idempotencyKeyExpiresAt: existingBatch.idempotencyKeyExpiresAt, - idempotencyKey: existingBatch.idempotencyKey, - }, - }); - - // Update the existing batch to remove the idempotency key - await this._prisma.batchTaskRun.update({ - where: { id: existingBatch.id }, - data: { idempotencyKey: null }, - }); - - // Don't return, just continue with the batch trigger - } else { - span.setAttribute("batchId", existingBatch.friendlyId); - - return this.#respondWithExistingBatch( - existingBatch, - environment, - body.resumeParentOnCompletion ? body.parentRunId : undefined - ); - } - } - const { id, friendlyId } = BatchId.generate(); span.setAttribute("batchId", friendlyId); @@ -129,160 +82,6 @@ export class BatchTriggerV3Service extends WithRunEngine { } } - const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); - - const cachedRuns = - idempotencyKeys.length > 0 - ? await this._prisma.taskRun.findMany({ - where: { - runtimeEnvironmentId: environment.id, - idempotencyKey: { - in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), - }, - }, - select: { - friendlyId: true, - idempotencyKey: true, - idempotencyKeyExpiresAt: true, - }, - }) - : []; - - if (cachedRuns.length) { - logger.debug("[BatchTriggerV3][call] Found cached runs", { - cachedRuns, - batchId: friendlyId, - }); - } - - // Now we need to create an array of all the run IDs, in order - // If we have a cached run, that isn't expired, we should use that run ID - // If we have a cached run, that is expired, we should generate a new run ID and save that cached run ID to a set of expired run IDs - // If we don't have a cached run, we should generate a new run ID - const expiredRunIds = new Set(); - let cachedRunCount = 0; - - const runs = body.items.map((item) => { - const cachedRun = cachedRuns.find( - (r) => r.idempotencyKey === item.options?.idempotencyKey - ); - - const runId = RunId.generate(); - - if (cachedRun) { - if ( - cachedRun.idempotencyKeyExpiresAt && - cachedRun.idempotencyKeyExpiresAt < new Date() - ) { - expiredRunIds.add(cachedRun.friendlyId); - - return { - id: runId.friendlyId, - isCached: false, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - } - - cachedRunCount++; - - return { - id: cachedRun.friendlyId, - isCached: true, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - } - - return { - id: runId.friendlyId, - isCached: false, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - }); - - //block the parent with any existing children - if (body.resumeParentOnCompletion && body.parentRunId) { - const existingChildFriendlyIds = runs.flatMap((r) => (r.isCached ? [r.id] : [])); - - if (existingChildFriendlyIds.length > 0) { - await this.#blockParentRun({ - parentRunId: body.parentRunId, - childFriendlyIds: existingChildFriendlyIds, - environment, - }); - } - } - - // Calculate how many new runs we need to create - const newRunCount = body.items.length - cachedRunCount; - - if (newRunCount === 0) { - logger.debug("[BatchTriggerV3][call] All runs are cached", { - batchId: friendlyId, - }); - - await this._prisma.batchTaskRun.create({ - data: { - friendlyId, - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, - runCount: body.items.length, - runIds: runs.map((r) => r.id), - //todo is this correct? Surely some of the runs could still be in progress? - status: "COMPLETED", - batchVersion: "v2", - oneTimeUseToken: options.oneTimeUseToken, - }, - }); - - return { - id: friendlyId, - isCached: false, - idempotencyKey: options.idempotencyKey ?? undefined, - runs, - }; - } - - const queueSizeGuard = await guardQueueSizeLimitsForEnv( - this._engine, - environment, - newRunCount - ); - - logger.debug("Queue size guard result", { - newRunCount, - queueSizeGuard, - environment: { - id: environment.id, - type: environment.type, - organization: environment.organization, - project: environment.project, - }, - }); - - if (!queueSizeGuard.isWithinLimits) { - throw new ServiceValidationError( - `Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` - ); - } - - // Expire the cached runs that are no longer valid - if (expiredRunIds.size) { - logger.debug("Expiring cached runs", { - expiredRunIds: Array.from(expiredRunIds), - batchId: friendlyId, - }); - - // TODO: is there a limit to the number of items we can update in a single query? - await this._prisma.taskRun.updateMany({ - where: { friendlyId: { in: Array.from(expiredRunIds) } }, - data: { idempotencyKey: null }, - }); - } - // Upload to object store const payloadPacket = await this.#handlePayloadPacket( body.items, @@ -292,9 +91,7 @@ export class BatchTriggerV3Service extends WithRunEngine { const batch = await this.#createAndProcessBatchTaskRun( friendlyId, - runs, payloadPacket, - newRunCount, environment, body, options @@ -308,7 +105,7 @@ export class BatchTriggerV3Service extends WithRunEngine { id: batch.friendlyId, isCached: false, idempotencyKey: batch.idempotencyKey ?? undefined, - runs, + runCount: body.items.length, }; } ); @@ -347,27 +144,19 @@ export class BatchTriggerV3Service extends WithRunEngine { async #createAndProcessBatchTaskRun( batchId: string, - runs: Array<{ - id: string; - isCached: boolean; - idempotencyKey: string | undefined; - taskIdentifier: string; - }>, payloadPacket: IOPacket, - newRunCount: number, environment: AuthenticatedEnvironment, body: BatchTriggerTaskV2RequestBody, options: BatchTriggerTaskServiceOptions = {} ) { - if (newRunCount <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) { + if (body.items.length <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) { const batch = await this._prisma.batchTaskRun.create({ data: { + id: BatchId.fromFriendlyId(batchId), friendlyId: batchId, runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, - runCount: newRunCount, - runIds: runs.map((r) => r.id), + runCount: body.items.length, + runIds: [], payload: payloadPacket.data, payloadType: payloadPacket.dataType, options, @@ -376,6 +165,15 @@ export class BatchTriggerV3Service extends WithRunEngine { }, }); + if (body.parentRunId && body.resumeParentOnCompletion) { + await this._engine.blockRunWithCreatedBatch({ + runId: RunId.fromFriendlyId(body.parentRunId), + batchId: batch.id, + environmentId: environment.id, + projectId: environment.projectId, + }); + } + const result = await this.#processBatchTaskRunItems({ batch, environment, @@ -445,12 +243,11 @@ export class BatchTriggerV3Service extends WithRunEngine { return await $transaction(this._prisma, async (tx) => { const batch = await tx.batchTaskRun.create({ data: { + id: BatchId.fromFriendlyId(batchId), friendlyId: batchId, runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, runCount: body.items.length, - runIds: runs.map((r) => r.id), + runIds: [], payload: payloadPacket.data, payloadType: payloadPacket.dataType, options, @@ -459,6 +256,16 @@ export class BatchTriggerV3Service extends WithRunEngine { }, }); + if (body.parentRunId && body.resumeParentOnCompletion) { + await this._engine.blockRunWithCreatedBatch({ + runId: RunId.fromFriendlyId(body.parentRunId), + batchId: batch.id, + environmentId: environment.id, + projectId: environment.projectId, + tx, + }); + } + switch (this._batchProcessingStrategy) { case "sequential": { await this.#enqueueBatchTaskRun({ @@ -475,7 +282,7 @@ export class BatchTriggerV3Service extends WithRunEngine { } case "parallel": { const ranges = Array.from({ - length: Math.ceil(newRunCount / PROCESSING_BATCH_SIZE), + length: Math.ceil(body.items.length / PROCESSING_BATCH_SIZE), }).map((_, index) => ({ start: index * PROCESSING_BATCH_SIZE, count: PROCESSING_BATCH_SIZE, @@ -507,52 +314,6 @@ export class BatchTriggerV3Service extends WithRunEngine { } } - async #respondWithExistingBatch( - batch: BatchTaskRun, - environment: AuthenticatedEnvironment, - blockParentRunId: string | undefined - ): Promise { - // Resolve the payload - const payloadPacket = await downloadPacketFromObjectStore( - { - data: batch.payload ?? undefined, - dataType: batch.payloadType, - }, - environment - ); - - const payload = await parsePacket(payloadPacket).then( - (p) => p as BatchTriggerTaskV2RequestBody["items"] - ); - - const runs = batch.runIds.map((id, index) => { - const item = payload[index]; - - return { - id, - taskIdentifier: item.task, - isCached: true, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - }; - }); - - //block the parent with all of the children - if (blockParentRunId) { - await this.#blockParentRun({ - parentRunId: blockParentRunId, - childFriendlyIds: batch.runIds, - environment, - }); - } - - return { - id: batch.friendlyId, - idempotencyKey: batch.idempotencyKey ?? undefined, - isCached: true, - runs, - }; - } - async processBatchTaskRun(options: BatchProcessingOptions) { logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch", { options, @@ -630,6 +391,8 @@ export class BatchTriggerV3Service extends WithRunEngine { batchSize: options.range.count, items: $payload, options: $options, + parentRunId: options.parentRunId, + resumeParentOnCompletion: options.resumeParentOnCompletion, }); switch (result.status) { @@ -737,36 +500,42 @@ export class BatchTriggerV3Service extends WithRunEngine { | { status: "INCOMPLETE"; workingIndex: number } | { status: "ERROR"; error: string; workingIndex: number } > { - // Grab the next PROCESSING_BATCH_SIZE runIds - const runFriendlyIds = batch.runIds.slice(currentIndex, currentIndex + batchSize); + // Grab the next PROCESSING_BATCH_SIZE items + const itemsToProcess = items.slice(currentIndex, currentIndex + batchSize); logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch items", { batchId: batch.friendlyId, currentIndex, - runIds: runFriendlyIds, runCount: batch.runCount, }); - // Combine the "window" between currentIndex and currentIndex + PROCESSING_BATCH_SIZE with the runId and the item in the payload which is an array - const itemsToProcess = runFriendlyIds.map((runFriendlyId, index) => ({ - runFriendlyId, - item: items[index + currentIndex], - })); - let workingIndex = currentIndex; + let runIds: string[] = []; + for (const item of itemsToProcess) { try { - await this.#processBatchTaskRunItem({ + const run = await this.#processBatchTaskRunItem({ batch, environment, - task: item, + item, currentIndex: workingIndex, options, parentRunId, resumeParentOnCompletion, }); + if (!run) { + logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", { + batchId: batch.friendlyId, + currentIndex: workingIndex, + }); + + throw new Error("[BatchTriggerV3][processBatchTaskRun] Failed to process item"); + } + + runIds.push(run.friendlyId); + workingIndex++; } catch (error) { logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", { @@ -783,18 +552,45 @@ export class BatchTriggerV3Service extends WithRunEngine { } } + //add the run ids to the batch + const updatedBatch = await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + runIds: { + push: runIds, + }, + }, + }); + // if there are more items to process, requeue the batch if (workingIndex < batch.runCount) { return { status: "INCOMPLETE", workingIndex }; } + //triggered all the runs + if (updatedBatch.runIds.length === updatedBatch.runCount) { + //unblock the parent run from the batch + //this prevents the parent continuing before all the runs are created + if (parentRunId && resumeParentOnCompletion) { + await this._engine.unblockRunForCreatedBatch({ + runId: RunId.fromFriendlyId(parentRunId), + batchId: batch.id, + environmentId: environment.id, + projectId: environment.projectId, + }); + } + + //if all the runs were idempotent, it's possible the batch is already completed + await this._engine.tryCompleteBatch({ batchId: batch.id }); + } + return { status: "COMPLETE" }; } async #processBatchTaskRunItem({ batch, environment, - task, + item, currentIndex, options, parentRunId, @@ -802,7 +598,7 @@ export class BatchTriggerV3Service extends WithRunEngine { }: { batch: BatchTaskRun; environment: AuthenticatedEnvironment; - task: { runFriendlyId: string; item: BatchTriggerTaskV2RequestBody["items"][number] }; + item: BatchTriggerTaskV2RequestBody["items"][number]; currentIndex: number; options?: BatchTriggerTaskServiceOptions; parentRunId: string | undefined; @@ -810,33 +606,38 @@ export class BatchTriggerV3Service extends WithRunEngine { }) { logger.debug("[BatchTriggerV3][processBatchTaskRunItem] Processing item", { batchId: batch.friendlyId, - runId: task.runFriendlyId, currentIndex, }); const triggerTaskService = new TriggerTaskService(); - await triggerTaskService.call( - task.item.task, + const run = await triggerTaskService.call( + item.task, environment, { - ...task.item, + ...item, options: { - ...task.item.options, + ...item.options, parentRunId, resumeParentOnCompletion, + parentBatch: batch.id, }, }, { triggerVersion: options?.triggerVersion, traceContext: options?.traceContext, spanParentAsLink: options?.spanParentAsLink, - batchId: batch.friendlyId, - skipChecks: true, - runFriendlyId: task.runFriendlyId, + batchId: batch.id, + batchIndex: currentIndex, }, "V2" ); + + return run + ? { + friendlyId: run.friendlyId, + } + : undefined; } async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { @@ -877,38 +678,4 @@ export class BatchTriggerV3Service extends WithRunEngine { }; }); } - - async #blockParentRun({ - parentRunId, - childFriendlyIds, - environment, - }: { - parentRunId: string; - childFriendlyIds: string[]; - environment: AuthenticatedEnvironment; - }) { - const runsWithAssociatedWaitpoints = await this._prisma.taskRun.findMany({ - where: { - id: { - in: childFriendlyIds.map((r) => RunId.fromFriendlyId(r)), - }, - }, - select: { - associatedWaitpoint: { - select: { - id: true, - }, - }, - }, - }); - - await this._engine.blockRunWithWaitpoint({ - runId: RunId.fromFriendlyId(parentRunId), - waitpointId: runsWithAssociatedWaitpoints.flatMap((r) => - r.associatedWaitpoint ? [r.associatedWaitpoint.id] : [] - ), - environmentId: environment.id, - projectId: environment.projectId, - }); - } } diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 55ed259b8d..2a4b6028b1 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -14,6 +14,7 @@ export type TriggerTaskServiceOptions = { spanParentAsLink?: boolean; parentAsLinkType?: "replay" | "trigger"; batchId?: string; + batchIndex?: number; customIcon?: string; runFriendlyId?: string; skipChecks?: boolean; @@ -41,7 +42,13 @@ export class TriggerTaskService extends WithRunEngine { switch (v) { case "V1": { - return await this.callV1(taskId, environment, body, options); + const run = await this.callV1(taskId, environment, body, options); + return run + ? { + ...run, + isCached: false, + } + : undefined; } case "V2": { return await this.callV2(taskId, environment, body, options); diff --git a/apps/webapp/app/v3/services/triggerTaskV2.server.ts b/apps/webapp/app/v3/services/triggerTaskV2.server.ts index ad1dd097d0..b0633640fa 100644 --- a/apps/webapp/app/v3/services/triggerTaskV2.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV2.server.ts @@ -1,32 +1,34 @@ +import { RunEngine, RunDuplicateIdempotencyKeyError } from "@internal/run-engine"; import { IOPacket, + packetRequiresOffloading, QueueOptions, SemanticInternalAttributes, TriggerTaskRequestBody, - packetRequiresOffloading, } from "@trigger.dev/core/v3"; +import { BatchId, RunId, stringifyDuration } from "@trigger.dev/core/v3/apps"; +import { Prisma, TaskRun } from "@trigger.dev/database"; import { env } from "~/env.server"; +import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; +import { logger } from "~/services/logger.server"; +import { getEntitlement } from "~/services/platform.v3.server"; +import { parseDelay } from "~/utils/delays"; +import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { handleMetadataPacket } from "~/utils/packets"; import { sanitizeQueueName } from "~/v3/marqs/index.server"; import { eventRepository } from "../eventRepository.server"; +import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; import { uploadPacketToObjectStore } from "../r2.server"; +import { isFinalRunStatus } from "../taskStatus"; import { startActiveSpan } from "../tracer.server"; -import { getEntitlement } from "~/services/platform.v3.server"; +import { clampMaxDuration } from "../utils/maxDuration"; import { ServiceValidationError, WithRunEngine } from "./baseService.server"; -import { logger } from "~/services/logger.server"; -import { isFinalRunStatus } from "../taskStatus"; -import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; -import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; -import { handleMetadataPacket } from "~/utils/packets"; -import { WorkerGroupService } from "./worker/workerGroupService.server"; -import { parseDelay } from "~/utils/delays"; -import { RunId, stringifyDuration } from "@trigger.dev/core/v3/apps"; import { OutOfEntitlementError, TriggerTaskServiceOptions } from "./triggerTask.server"; -import { Prisma } from "@trigger.dev/database"; -import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; -import { clampMaxDuration } from "../utils/maxDuration"; -import { RunEngine } from "@internal/run-engine"; +import { WorkerGroupService } from "./worker/workerGroupService.server"; + +type Result = TaskRun & { isCached: boolean }; /** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */ export class TriggerTaskServiceV2 extends WithRunEngine { @@ -40,7 +42,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { environment: AuthenticatedEnvironment; body: TriggerTaskRequestBody; options?: TriggerTaskServiceOptions; - }) { + }): Promise { return await this.traceWithEnv("call()", environment, async (span) => { span.setAttribute("taskId", taskId); @@ -94,16 +96,66 @@ export class TriggerTaskServiceV2 extends WithRunEngine { body.options?.resumeParentOnCompletion && body.options?.parentRunId ) { - await this._engine.blockRunWithWaitpoint({ - runId: RunId.fromFriendlyId(body.options.parentRunId), - waitpointId: existingRun.associatedWaitpoint.id, - environmentId: environment.id, - projectId: environment.projectId, - tx: this._prisma, - }); + await eventRepository.traceEvent( + `${taskId} (cached)`, + { + context: options.traceContext, + spanParentAsLink: options.spanParentAsLink, + parentAsLinkType: options.parentAsLinkType, + kind: "SERVER", + environment, + taskSlug: taskId, + attributes: { + properties: { + [SemanticInternalAttributes.SHOW_ACTIONS]: true, + [SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId, + }, + style: { + icon: "task-cached", + }, + runIsTest: body.options?.test ?? false, + batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, + idempotencyKey, + runId: existingRun.friendlyId, + }, + incomplete: existingRun.associatedWaitpoint.status === "PENDING", + isError: existingRun.associatedWaitpoint.outputIsError, + immediate: true, + }, + async (event) => { + //log a message + await eventRepository.recordEvent( + `There's an existing run for idempotencyKey: ${idempotencyKey}`, + { + taskSlug: taskId, + environment, + attributes: { + runId: existingRun.friendlyId, + }, + context: options.traceContext, + parentId: event.spanId, + } + ); + //block run with waitpoint + await this._engine.blockRunWithWaitpoint({ + runId: RunId.fromFriendlyId(body.options!.parentRunId!), + waitpoints: existingRun.associatedWaitpoint!.id, + spanIdToComplete: event.spanId, + batch: options?.batchId + ? { + id: options.batchId, + index: options.batchIndex ?? 0, + } + : undefined, + environmentId: environment.id, + projectId: environment.projectId, + tx: this._prisma, + }); + } + ); } - return existingRun; + return { ...existingRun, isCached: true }; } } @@ -195,7 +247,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { icon: options.customIcon ?? "task", }, runIsTest: body.options?.test ?? false, - batchId: options.batchId, + batchId: options.batchId ? BatchId.toFriendlyId(options.batchId) : undefined, idempotencyKey, }, incomplete: true, @@ -299,7 +351,12 @@ export class TriggerTaskServiceV2 extends WithRunEngine { oneTimeUseToken: options.oneTimeUseToken, parentTaskRunId: parentRun?.id, rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, - batchId: body.options?.parentBatch ?? undefined, + batch: options?.batchId + ? { + id: options.batchId, + index: options.batchIndex ?? 0, + } + : undefined, resumeParentOnCompletion: body.options?.resumeParentOnCompletion, depth, metadata: metadataPacket?.data, @@ -313,7 +370,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { this._prisma ); - return taskRun; + return { ...taskRun, isCached: false }; }, async (_, tx) => { const counter = await tx.taskRunNumberCounter.findUnique({ @@ -335,6 +392,11 @@ export class TriggerTaskServiceV2 extends WithRunEngine { } ); } catch (error) { + if (error instanceof RunDuplicateIdempotencyKeyError) { + //retry calling this function, because this time it will return the idempotent run + return await this.call({ taskId, environment, body, options }); + } + // Detect a prisma transaction Unique constraint violation if (error instanceof Prisma.PrismaClientKnownRequestError) { logger.debug("TriggerTask: Prisma transaction error", { diff --git a/internal-packages/database/prisma/migrations/20250106172943_added_span_id_to_complete_to_task_run_waitpoint/migration.sql b/internal-packages/database/prisma/migrations/20250106172943_added_span_id_to_complete_to_task_run_waitpoint/migration.sql new file mode 100644 index 0000000000..8d624ba757 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250106172943_added_span_id_to_complete_to_task_run_waitpoint/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRunWaitpoint" ADD COLUMN "spanIdToComplete" TEXT; diff --git a/internal-packages/database/prisma/migrations/20250109131442_added_batch_and_index_to_task_run_waitpoint_and_task_run_execution_snapshot/migration.sql b/internal-packages/database/prisma/migrations/20250109131442_added_batch_and_index_to_task_run_waitpoint_and_task_run_execution_snapshot/migration.sql new file mode 100644 index 0000000000..5756f7fa5d --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250109131442_added_batch_and_index_to_task_run_waitpoint_and_task_run_execution_snapshot/migration.sql @@ -0,0 +1,13 @@ +-- AlterTable +ALTER TABLE "TaskRunExecutionSnapshot" ADD COLUMN "batchId" TEXT, +ADD COLUMN "completedWaitpointOrder" TEXT[]; + +-- AlterTable +ALTER TABLE "TaskRunWaitpoint" ADD COLUMN "batchId" TEXT, +ADD COLUMN "batchIndex" INTEGER; + +-- AddForeignKey +ALTER TABLE "TaskRunExecutionSnapshot" ADD CONSTRAINT "TaskRunExecutionSnapshot_batchId_fkey" FOREIGN KEY ("batchId") REFERENCES "BatchTaskRun"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "TaskRunWaitpoint" ADD CONSTRAINT "TaskRunWaitpoint_batchId_fkey" FOREIGN KEY ("batchId") REFERENCES "BatchTaskRun"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/internal-packages/database/prisma/migrations/20250109173506_waitpoint_added_batch_type/migration.sql b/internal-packages/database/prisma/migrations/20250109173506_waitpoint_added_batch_type/migration.sql new file mode 100644 index 0000000000..1e1fead5a5 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250109173506_waitpoint_added_batch_type/migration.sql @@ -0,0 +1,8 @@ +-- AlterEnum +ALTER TYPE "WaitpointType" ADD VALUE 'BATCH'; + +-- AlterTable +ALTER TABLE "Waitpoint" ADD COLUMN "completedByBatchId" TEXT; + +-- AddForeignKey +ALTER TABLE "Waitpoint" ADD CONSTRAINT "Waitpoint_completedByBatchId_fkey" FOREIGN KEY ("completedByBatchId") REFERENCES "BatchTaskRun"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/internal-packages/database/prisma/migrations/20250109175955_waitpoint_added_completed_by_batch_id_index/migration.sql b/internal-packages/database/prisma/migrations/20250109175955_waitpoint_added_completed_by_batch_id_index/migration.sql new file mode 100644 index 0000000000..7d691d17e1 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250109175955_waitpoint_added_completed_by_batch_id_index/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX "Waitpoint_completedByBatchId_idx" ON "Waitpoint"("completedByBatchId"); diff --git a/internal-packages/database/prisma/migrations/20250114153223_task_run_waitpoint_unique_constraint_added_batch_index/migration.sql b/internal-packages/database/prisma/migrations/20250114153223_task_run_waitpoint_unique_constraint_added_batch_index/migration.sql new file mode 100644 index 0000000000..22a41947d4 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250114153223_task_run_waitpoint_unique_constraint_added_batch_index/migration.sql @@ -0,0 +1,14 @@ +/* + Warnings: + + - A unique constraint covering the columns `[taskRunId,waitpointId,batchIndex]` on the table `TaskRunWaitpoint` will be added. If there are existing duplicate values, this will fail. + +*/ +-- DropIndex +DROP INDEX "TaskRunWaitpoint_taskRunId_waitpointId_key"; + +-- CreateIndex (multiple can have null batchIndex, so we need the other one below) +CREATE UNIQUE INDEX "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_key" ON "TaskRunWaitpoint" ("taskRunId", "waitpointId", "batchIndex"); + +-- CreateIndex (where batchIndex is null) +CREATE UNIQUE INDEX "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_null_key" ON "TaskRunWaitpoint"("taskRunId", "waitpointId") WHERE "batchIndex" IS NULL; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 91ca18ec98..e2e68864db 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1946,12 +1946,18 @@ model TaskRunExecutionSnapshot { run TaskRun @relation(fields: [runId], references: [id]) runStatus TaskRunStatus + batchId String? + batch BatchTaskRun? @relation(fields: [batchId], references: [id]) + /// This is the current run attempt number. Users can define how many attempts they want for a run. attemptNumber Int? /// Waitpoints that have been completed for this execution completedWaitpoints Waitpoint[] @relation("completedWaitpoints") + /// An array of waitpoint IDs in the correct order, used for batches + completedWaitpointOrder String[] + /// Checkpoint checkpointId String? checkpoint TaskRunCheckpoint? @relation(fields: [checkpointId], references: [id]) @@ -2050,6 +2056,10 @@ model Waitpoint { /// If it's a DATETIME type waitpoint, this is the date completedAfter DateTime? + /// If it's a BATCH type waitpoint, this is the associated batch + completedByBatchId String? + completedByBatch BatchTaskRun? @relation(fields: [completedByBatchId], references: [id], onDelete: SetNull) + /// The runs this waitpoint is blocking blockingTaskRuns TaskRunWaitpoint[] @@ -2071,12 +2081,14 @@ model Waitpoint { updatedAt DateTime @updatedAt @@unique([environmentId, idempotencyKey]) + @@index([completedByBatchId]) } enum WaitpointType { RUN DATETIME MANUAL + BATCH } enum WaitpointStatus { @@ -2096,10 +2108,22 @@ model TaskRunWaitpoint { project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) projectId String + /// This span id is completed when the waitpoint is completed. This is used with cached runs (idempotent) + spanIdToComplete String? + + //associated batch + batchId String? + batch BatchTaskRun? @relation(fields: [batchId], references: [id]) + //if there's an associated batch and this isn't set it's for the entire batch + //if it is set, it's a specific run in the batch + batchIndex Int? + createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - @@unique([taskRunId, waitpointId]) + /// There are two constraints, the one below and also one that Prisma doesn't support + /// The second one implemented in SQL only prevents a TaskRun + Waitpoint with a null batchIndex + @@unique([taskRunId, waitpointId, batchIndex]) @@index([taskRunId]) @@index([waitpointId]) } @@ -2480,6 +2504,7 @@ model BatchTaskRun { runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) status BatchTaskRunStatus @default(PENDING) runtimeEnvironmentId String + /// This only includes new runs, not idempotent runs. runs TaskRun[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -2493,6 +2518,15 @@ model BatchTaskRun { options Json? batchVersion String @default("v1") + //engine v2 + /// Snapshots that reference this batch + executionSnapshots TaskRunExecutionSnapshot[] + /// Specific run blockers, + runsBlocked TaskRunWaitpoint[] + /// Waitpoints that are blocked by this batch. + /// When a Batch is created it blocks execution of the associated parent run (for andWait) + waitpoints Waitpoint[] + /// optional token that can be used to authenticate the task run oneTimeUseToken String? diff --git a/internal-packages/run-engine/src/engine/eventBus.ts b/internal-packages/run-engine/src/engine/eventBus.ts index b36e886694..2b8b8961be 100644 --- a/internal-packages/run-engine/src/engine/eventBus.ts +++ b/internal-packages/run-engine/src/engine/eventBus.ts @@ -79,6 +79,13 @@ export type EventBusEvents = { }; }, ]; + cachedRunCompleted: [ + { + time: Date; + spanId: string; + hasError: boolean; + }, + ]; runMetadataUpdated: [ { time: Date; diff --git a/internal-packages/run-engine/src/engine/executionSnapshots.ts b/internal-packages/run-engine/src/engine/executionSnapshots.ts index eb2dfcf42e..5daca4f419 100644 --- a/internal-packages/run-engine/src/engine/executionSnapshots.ts +++ b/internal-packages/run-engine/src/engine/executionSnapshots.ts @@ -1,5 +1,5 @@ import { CompletedWaitpoint, ExecutionResult } from "@trigger.dev/core/v3"; -import { RunId, SnapshotId } from "@trigger.dev/core/v3/apps"; +import { BatchId, RunId, SnapshotId } from "@trigger.dev/core/v3/apps"; import { PrismaClientOrTransaction, TaskRunCheckpoint, @@ -35,10 +35,24 @@ export async function getLatestExecutionSnapshot( ...snapshot, friendlyId: SnapshotId.toFriendlyId(snapshot.id), runFriendlyId: RunId.toFriendlyId(snapshot.runId), - completedWaitpoints: snapshot.completedWaitpoints.map( - (w) => - ({ + completedWaitpoints: snapshot.completedWaitpoints.flatMap((w) => { + //get all indexes of the waitpoint in the completedWaitpointOrder + //we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey) + let indexes: (number | undefined)[] = []; + for (let i = 0; i < snapshot.completedWaitpointOrder.length; i++) { + if (snapshot.completedWaitpointOrder[i] === w.id) { + indexes.push(i); + } + } + + if (indexes.length === 0) { + indexes.push(undefined); + } + + return indexes.map((index) => { + return { id: w.id, + index: index === -1 ? undefined : index, friendlyId: w.friendlyId, type: w.type, completedAt: w.completedAt ?? new Date(), @@ -50,14 +64,27 @@ export async function getLatestExecutionSnapshot( ? { id: w.completedByTaskRunId, friendlyId: RunId.toFriendlyId(w.completedByTaskRunId), + batch: snapshot.batchId + ? { + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } + : undefined, } : undefined, completedAfter: w.completedAfter ?? undefined, + completedByBatch: w.completedByBatchId + ? { + id: w.completedByBatchId, + friendlyId: BatchId.toFriendlyId(w.completedByBatchId), + } + : undefined, output: w.output ?? undefined, outputType: w.outputType, outputIsError: w.outputIsError, - }) satisfies CompletedWaitpoint - ), + } satisfies CompletedWaitpoint; + }); + }), }; } diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 8cdfb07e11..0464bd7577 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -22,6 +22,7 @@ import { WaitForDurationResult, } from "@trigger.dev/core/v3"; import { + BatchId, getMaxDuration, parseNaturalLanguageDuration, QueueId, @@ -237,7 +238,7 @@ export class RunEngine { tags, parentTaskRunId, rootTaskRunId, - batchId, + batch, resumeParentOnCompletion, depth, metadata, @@ -252,7 +253,7 @@ export class RunEngine { const prisma = tx ?? this.prisma; return this.#trace( - "createRunAttempt", + "trigger", { friendlyId, environmentId: environment.id, @@ -268,67 +269,97 @@ export class RunEngine { } //create run - const taskRun = await prisma.taskRun.create({ - data: { - id: RunId.fromFriendlyId(friendlyId), - engine: "V2", - status, - number, - friendlyId, - runtimeEnvironmentId: environment.id, - projectId: environment.project.id, - idempotencyKey, - idempotencyKeyExpiresAt, - taskIdentifier, - payload, - payloadType, - context, - traceContext, - traceId, - spanId, - parentSpanId, - lockedToVersionId, - taskVersion, - sdkVersion, - cliVersion, - concurrencyKey, - queue: queueName, - masterQueue, - secondaryMasterQueue, - isTest, - delayUntil, - queuedAt, - maxAttempts, - priorityMs, - ttl, - tags: - tags.length === 0 - ? undefined - : { - connect: tags, - }, - runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name), - oneTimeUseToken, - parentTaskRunId, - rootTaskRunId, - batchId, - resumeParentOnCompletion, - depth, - metadata, - metadataType, - seedMetadata, - seedMetadataType, - maxDurationInSeconds, - executionSnapshots: { - create: { - engine: "V2", - executionStatus: "RUN_CREATED", - description: "Run was created", - runStatus: status, + let taskRun: TaskRun; + try { + taskRun = await prisma.taskRun.create({ + data: { + id: RunId.fromFriendlyId(friendlyId), + engine: "V2", + status, + number, + friendlyId, + runtimeEnvironmentId: environment.id, + projectId: environment.project.id, + idempotencyKey, + idempotencyKeyExpiresAt, + taskIdentifier, + payload, + payloadType, + context, + traceContext, + traceId, + spanId, + parentSpanId, + lockedToVersionId, + taskVersion, + sdkVersion, + cliVersion, + concurrencyKey, + queue: queueName, + masterQueue, + secondaryMasterQueue, + isTest, + delayUntil, + queuedAt, + maxAttempts, + priorityMs, + ttl, + tags: + tags.length === 0 + ? undefined + : { + connect: tags, + }, + runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name), + oneTimeUseToken, + parentTaskRunId, + rootTaskRunId, + batchId: batch?.id, + resumeParentOnCompletion, + depth, + metadata, + metadataType, + seedMetadata, + seedMetadataType, + maxDurationInSeconds, + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "RUN_CREATED", + description: "Run was created", + runStatus: status, + }, }, }, - }, - }); + }); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + this.logger.debug("engine.trigger(): Prisma transaction error", { + code: error.code, + message: error.message, + meta: error.meta, + idempotencyKey, + environmentId: environment.id, + }); + + if (error.code === "P2002") { + this.logger.debug("engine.trigger(): throwing RunDuplicateIdempotencyKeyError", { + code: error.code, + message: error.message, + meta: error.meta, + idempotencyKey, + environmentId: environment.id, + }); + + //this happens if a unique constraint failed, i.e. duplicate idempotency + throw new RunDuplicateIdempotencyKeyError( + `Run with idempotency key ${idempotencyKey} already exists` + ); + } + } + + throw error; + } span.setAttribute("runId", taskRun.id); @@ -345,9 +376,10 @@ export class RunEngine { //this will block the parent run from continuing until this waitpoint is completed (and removed) await this.blockRunWithWaitpoint({ runId: parentTaskRunId, - waitpointId: associatedWaitpoint.id, + waitpoints: associatedWaitpoint.id, environmentId: associatedWaitpoint.environmentId, projectId: associatedWaitpoint.projectId, + batch, tx: prisma, }); @@ -546,7 +578,7 @@ export class RunEngine { "Tried to dequeue a run that is not in a valid state to be dequeued.", }, checkpointId: snapshot.checkpointId ?? undefined, - completedWaitpointIds: snapshot.completedWaitpoints.map((wp) => wp.id), + completedWaitpoints: snapshot.completedWaitpoints, error: `Tried to dequeue a run that is not in a valid state to be dequeued.`, }); @@ -767,7 +799,7 @@ export class RunEngine { description: "Run was dequeued for execution", }, checkpointId: snapshot.checkpointId ?? undefined, - completedWaitpointIds: snapshot.completedWaitpoints.map((wp) => wp.id), + completedWaitpoints: snapshot.completedWaitpoints, }); return { @@ -1040,6 +1072,7 @@ export class RunEngine { data: { status: "EXECUTING", attemptNumber: nextAttemptNumber, + firstAttemptStartedAt: taskRun.attemptNumber === null ? new Date() : undefined, }, include: { tags: true, @@ -1280,7 +1313,7 @@ export class RunEngine { //block the run const blockResult = await this.blockRunWithWaitpoint({ runId, - waitpointId: waitpoint.id, + waitpoints: waitpoint.id, environmentId: waitpoint.environmentId, projectId: waitpoint.projectId, tx: prisma, @@ -1549,6 +1582,110 @@ export class RunEngine { }); } + /** This block a run with a BATCH waitpoint. + * The waitpoint will be created, and it will block the parent run. + */ + async blockRunWithCreatedBatch({ + runId, + batchId, + environmentId, + projectId, + tx, + }: { + runId: string; + batchId: string; + environmentId: string; + projectId: string; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.prisma; + + try { + const waitpoint = await prisma.waitpoint.create({ + data: { + ...WaitpointId.generate(), + type: "BATCH", + idempotencyKey: batchId, + userProvidedIdempotencyKey: false, + completedByBatchId: batchId, + environmentId, + projectId, + }, + }); + + await this.blockRunWithWaitpoint({ + runId, + waitpoints: waitpoint.id, + environmentId, + projectId, + batch: { id: batchId }, + tx: prisma, + }); + + return waitpoint; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + // duplicate idempotency key + if (error.code === "P2002") { + return null; + } else { + throw error; + } + } + throw error; + } + } + + /** + * This is called when all the runs for a batch have been created. + * This does NOT mean that all the runs for the batch are completed. + */ + async unblockRunForCreatedBatch({ + runId, + batchId, + environmentId, + projectId, + tx, + }: { + runId: string; + batchId: string; + environmentId: string; + projectId: string; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.prisma; + + const waitpoint = await prisma.waitpoint.findFirst({ + where: { + completedByBatchId: batchId, + }, + }); + + if (!waitpoint) { + this.logger.error("RunEngine.unblockRunForBatch(): Waitpoint not found", { + runId, + batchId, + }); + throw new ServiceValidationError("Waitpoint not found for batch", 404); + } + + await this.completeWaitpoint({ + id: waitpoint.id, + output: { value: "Batch waitpoint completed", isError: false }, + }); + } + + async tryCompleteBatch({ batchId }: { batchId: string }): Promise { + await this.worker.enqueue({ + //this will debounce the call + id: `tryCompleteBatch:${batchId}`, + job: "tryCompleteBatch", + payload: { batchId: batchId }, + //2s in the future + availableAt: new Date(Date.now() + 2_000), + }); + } + async getWaitpoint({ waitpointId, environmentId, @@ -1585,21 +1722,25 @@ export class RunEngine { */ async blockRunWithWaitpoint({ runId, - waitpointId, + waitpoints, projectId, failAfter, + spanIdToComplete, + batch, tx, }: { runId: string; - waitpointId: string | string[]; + waitpoints: string | string[]; environmentId: string; projectId: string; failAfter?: Date; + spanIdToComplete?: string; + batch?: { id: string; index?: number }; tx?: PrismaClientOrTransaction; }): Promise { const prisma = tx ?? this.prisma; - let waitpointIds = typeof waitpointId === "string" ? [waitpointId] : waitpointId; + let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints; return await this.runLock.lock([runId], 5000, async (signal) => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); @@ -1607,16 +1748,19 @@ export class RunEngine { //block the run with the waitpoints, returning how many waitpoints are pending const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>` WITH inserted AS ( - INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt") + INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex") SELECT gen_random_uuid(), ${runId}, w.id, ${projectId}, NOW(), - NOW() + NOW(), + ${spanIdToComplete ?? null}, + ${batch?.id ?? null}, + ${batch?.index ?? null} FROM "Waitpoint" w - WHERE w.id IN (${Prisma.join(waitpointIds)}) + WHERE w.id IN (${Prisma.join($waitpoints)}) ON CONFLICT DO NOTHING RETURNING "waitpointId" ) @@ -1647,15 +1791,16 @@ export class RunEngine { executionStatus: newStatus, description: "Run was blocked by a waitpoint.", }, + batchId: batch?.id ?? snapshot.batchId ?? undefined, }); } if (failAfter) { - for (const waitpointId of waitpointIds) { + for (const waitpoint of $waitpoints) { await this.worker.enqueue({ - id: `finishWaitpoint.${waitpointId}`, + id: `finishWaitpoint.${waitpoint}`, job: "finishWaitpoint", - payload: { waitpointId, error: "Waitpoint timed out" }, + payload: { waitpointId: waitpoint, error: "Waitpoint timed out" }, availableAt: failAfter, }); } @@ -1705,7 +1850,7 @@ export class RunEngine { // 1. Find the TaskRuns blocked by this waitpoint const affectedTaskRuns = await tx.taskRunWaitpoint.findMany({ where: { waitpointId: id }, - select: { taskRunId: true }, + select: { taskRunId: true, spanIdToComplete: true }, }); if (affectedTaskRuns.length === 0) { @@ -1748,6 +1893,15 @@ export class RunEngine { //50ms in the future availableAt: new Date(Date.now() + 50), }); + + // emit an event to complete associated cached runs + if (run.spanIdToComplete) { + this.eventBus.emit("cachedRunCompleted", { + time: new Date(), + spanId: run.spanIdToComplete, + hasError: output?.isError ?? false, + }); + } } return result.updatedWaitpoint; @@ -1865,6 +2019,12 @@ export class RunEngine { status: snapshot.runStatus, attemptNumber: snapshot.attemptNumber ?? undefined, }, + batch: snapshot.batchId + ? { + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } + : undefined, checkpoint: snapshot.checkpoint ? { id: snapshot.checkpoint.id, @@ -2482,6 +2642,8 @@ export class RunEngine { const blockingWaitpoints = await this.prisma.taskRunWaitpoint.findMany({ where: { taskRunId: runId }, select: { + batchId: true, + batchIndex: true, waitpoint: { select: { id: true, status: true }, }, @@ -2531,7 +2693,11 @@ export class RunEngine { executionStatus: "EXECUTING", description: "Run was continued, whilst still executing.", }, - completedWaitpointIds: blockingWaitpoints.map((b) => b.waitpoint.id), + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })), }); //we reacquire the concurrency if it's still running because we're not going to be dequeuing (which also does this) @@ -2545,7 +2711,11 @@ export class RunEngine { executionStatus: "QUEUED", description: "Run is QUEUED, because all waitpoints are completed.", }, - completedWaitpointIds: blockingWaitpoints.map((b) => b.waitpoint.id), + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })), }); //put it back in the queue, with the original timestamp (w/ priority) @@ -2751,8 +2921,9 @@ export class RunEngine { { run, snapshot, + batchId, checkpointId, - completedWaitpointIds, + completedWaitpoints, error, }: { run: { id: string; status: TaskRunStatus; attemptNumber?: number | null }; @@ -2760,8 +2931,12 @@ export class RunEngine { executionStatus: TaskRunExecutionStatus; description: string; }; + batchId?: string; checkpointId?: string; - completedWaitpointIds?: string[]; + completedWaitpoints?: { + id: string; + index?: number; + }[]; error?: string; } ) { @@ -2773,12 +2948,17 @@ export class RunEngine { runId: run.id, runStatus: run.status, attemptNumber: run.attemptNumber ?? undefined, - checkpointId: checkpointId ?? undefined, + batchId, + checkpointId, completedWaitpoints: { - connect: completedWaitpointIds?.map((id) => ({ id })), + connect: completedWaitpoints?.map((w) => ({ id: w.id })), }, + completedWaitpointOrder: completedWaitpoints + ?.filter((c) => c.index !== undefined) + .sort((a, b) => a.index! - b.index!) + .map((w) => w.id), isValid: error ? false : true, - error: error ?? undefined, + error, }, include: { checkpoint: true, @@ -2801,7 +2981,7 @@ export class RunEngine { }, snapshot: { ...newSnapshot, - completedWaitpointIds: completedWaitpointIds ?? [], + completedWaitpointIds: completedWaitpoints?.map((w) => w.id) ?? [], }, }); @@ -3020,14 +3200,7 @@ export class RunEngine { */ async #finalizeRun({ id, batchId }: { id: string; batchId: string | null }) { if (batchId) { - await this.worker.enqueue({ - //this will debounce the call - id: `tryCompleteBatch:${batchId}`, - job: "tryCompleteBatch", - payload: { batchId: batchId }, - //2s in the future - availableAt: new Date(Date.now() + 2_000), - }); + await this.tryCompleteBatch({ batchId }); } } @@ -3162,10 +3335,16 @@ export class ServiceValidationError extends Error { } } -//todo temporary during development class NotImplementedError extends Error { constructor(message: string) { - console.error("NOT IMPLEMENTED YET", { message }); + console.error("This isn't implemented", { message }); + super(message); + } +} + +export class RunDuplicateIdempotencyKeyError extends Error { + constructor(message: string) { super(message); + this.name = "RunDuplicateIdempotencyKeyError"; } } diff --git a/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts b/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts index f1025d4d0d..ee76462a16 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts @@ -79,7 +79,7 @@ describe("RunEngine batchTrigger", () => { queueName: "task/test-task", isTest: false, tags: [], - batchId: batch.id, + batch: { id: batch.id, index: 0 }, }, prisma ); @@ -100,7 +100,7 @@ describe("RunEngine batchTrigger", () => { queueName: "task/test-task", isTest: false, tags: [], - batchId: batch.id, + batch: { id: batch.id, index: 1 }, }, prisma ); diff --git a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts new file mode 100644 index 0000000000..159d219734 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts @@ -0,0 +1,363 @@ +import { + assertNonNullable, + containerTest, + setupAuthenticatedEnvironment, + setupBackgroundWorker, +} from "@internal/testcontainers"; +import { trace } from "@opentelemetry/api"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { setTimeout } from "node:timers/promises"; +import { generateFriendlyId } from "@trigger.dev/core/v3/apps"; + +describe("RunEngine batchTriggerAndWait", () => { + containerTest( + "batchTriggerAndWait (no idempotency)", + { timeout: 15_000 }, + async ({ prisma, redisContainer }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + redis: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + enableAutoPipelining: true, + }, + worker: { + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const parentTask = "parent-task"; + const childTask = "child-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, [parentTask, childTask]); + + //create a batch + const batch = await prisma.batchTaskRun.create({ + data: { + friendlyId: generateFriendlyId("batch"), + runtimeEnvironmentId: authenticatedEnvironment.id, + }, + }); + + //trigger the run + const parentRun = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: `task/${parentTask}`, + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue parent + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: parentRun.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const initialExecutionData = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(initialExecutionData); + const attemptResult = await engine.startRunAttempt({ + runId: parentRun.id, + snapshotId: initialExecutionData.snapshot.id, + }); + + //block using the batch + await engine.blockRunWithCreatedBatch({ + runId: parentRun.id, + batchId: batch.id, + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + + const afterBlockedByBatch = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(afterBlockedByBatch); + expect(afterBlockedByBatch.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + const child1 = await engine.trigger( + { + number: 1, + friendlyId: "run_c1234", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun.id, + batch: { id: batch.id, index: 0 }, + }, + prisma + ); + + const parentAfterChild1 = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(parentAfterChild1); + expect(parentAfterChild1.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + const child2 = await engine.trigger( + { + number: 2, + friendlyId: "run_c12345", + environment: authenticatedEnvironment, + taskIdentifier: childTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t123456", + spanId: "s123456", + masterQueue: "main", + queueName: `task/${childTask}`, + isTest: false, + tags: [], + resumeParentOnCompletion: true, + parentTaskRunId: parentRun.id, + batch: { id: batch.id, index: 1 }, + }, + prisma + ); + + const parentAfterChild2 = await engine.getRunExecutionData({ runId: parentRun.id }); + assertNonNullable(parentAfterChild2); + expect(parentAfterChild2.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + //check the waitpoint blocking the parent run + const runWaitpoints = await prisma.taskRunWaitpoint.findMany({ + where: { + taskRunId: parentRun.id, + }, + include: { + waitpoint: true, + }, + orderBy: { + createdAt: "asc", + }, + }); + expect(runWaitpoints.length).toBe(3); + const child1Waitpoint = runWaitpoints.find( + (w) => w.waitpoint.completedByTaskRunId === child1.id + ); + expect(child1Waitpoint?.waitpoint.type).toBe("RUN"); + expect(child1Waitpoint?.waitpoint.completedByTaskRunId).toBe(child1.id); + expect(child1Waitpoint?.batchId).toBe(batch.id); + expect(child1Waitpoint?.batchIndex).toBe(0); + const child2Waitpoint = runWaitpoints.find( + (w) => w.waitpoint.completedByTaskRunId === child2.id + ); + expect(child2Waitpoint?.waitpoint.type).toBe("RUN"); + expect(child2Waitpoint?.waitpoint.completedByTaskRunId).toBe(child2.id); + expect(child2Waitpoint?.batchId).toBe(batch.id); + expect(child2Waitpoint?.batchIndex).toBe(1); + const batchWaitpoint = runWaitpoints.find((w) => w.waitpoint.type === "BATCH"); + expect(batchWaitpoint?.waitpoint.type).toBe("BATCH"); + expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id); + + await engine.unblockRunForCreatedBatch({ + runId: parentRun.id, + batchId: batch.id, + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + + //dequeue and start the 1st child + const dequeuedChild = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: child1.masterQueue, + maxRunCount: 1, + }); + const childAttempt1 = await engine.startRunAttempt({ + runId: child1.id, + snapshotId: dequeuedChild[0].snapshot.id, + }); + + // complete the 1st child + await engine.completeRunAttempt({ + runId: child1.id, + snapshotId: childAttempt1.snapshot.id, + completion: { + id: child1.id, + ok: true, + output: '{"foo":"bar"}', + outputType: "application/json", + }, + }); + + //child snapshot + const childExecutionDataAfter = await engine.getRunExecutionData({ runId: child1.id }); + assertNonNullable(childExecutionDataAfter); + expect(childExecutionDataAfter.snapshot.executionStatus).toBe("FINISHED"); + + const child1WaitpointAfter = await prisma.waitpoint.findFirst({ + where: { + id: child1Waitpoint?.waitpointId, + }, + }); + expect(child1WaitpointAfter?.completedAt).not.toBeNull(); + expect(child1WaitpointAfter?.status).toBe("COMPLETED"); + expect(child1WaitpointAfter?.output).toBe('{"foo":"bar"}'); + + await setTimeout(500); + + const runWaitpointsAfterFirstChild = await prisma.taskRunWaitpoint.findMany({ + where: { + taskRunId: parentRun.id, + }, + include: { + waitpoint: true, + }, + }); + expect(runWaitpointsAfterFirstChild.length).toBe(3); + + //parent snapshot + const parentExecutionDataAfterFirstChildComplete = await engine.getRunExecutionData({ + runId: parentRun.id, + }); + assertNonNullable(parentExecutionDataAfterFirstChildComplete); + expect(parentExecutionDataAfterFirstChildComplete.snapshot.executionStatus).toBe( + "EXECUTING_WITH_WAITPOINTS" + ); + expect(parentExecutionDataAfterFirstChildComplete.batch?.id).toBe(batch.id); + expect(parentExecutionDataAfterFirstChildComplete.completedWaitpoints.length).toBe(0); + + //dequeue and start the 2nd child + const dequeuedChild2 = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: child2.masterQueue, + maxRunCount: 1, + }); + const childAttempt2 = await engine.startRunAttempt({ + runId: child2.id, + snapshotId: dequeuedChild2[0].snapshot.id, + }); + await engine.completeRunAttempt({ + runId: child2.id, + snapshotId: childAttempt2.snapshot.id, + completion: { + id: child2.id, + ok: true, + output: '{"baz":"qux"}', + outputType: "application/json", + }, + }); + + //child snapshot + const child2ExecutionDataAfter = await engine.getRunExecutionData({ runId: child1.id }); + assertNonNullable(child2ExecutionDataAfter); + expect(child2ExecutionDataAfter.snapshot.executionStatus).toBe("FINISHED"); + + const child2WaitpointAfter = await prisma.waitpoint.findFirst({ + where: { + id: child2Waitpoint?.waitpointId, + }, + }); + expect(child2WaitpointAfter?.completedAt).not.toBeNull(); + expect(child2WaitpointAfter?.status).toBe("COMPLETED"); + expect(child2WaitpointAfter?.output).toBe('{"baz":"qux"}'); + + await setTimeout(500); + + const runWaitpointsAfterSecondChild = await prisma.taskRunWaitpoint.findMany({ + where: { + taskRunId: parentRun.id, + }, + include: { + waitpoint: true, + }, + }); + expect(runWaitpointsAfterSecondChild.length).toBe(0); + + //parent snapshot + const parentExecutionDataAfterSecondChildComplete = await engine.getRunExecutionData({ + runId: parentRun.id, + }); + assertNonNullable(parentExecutionDataAfterSecondChildComplete); + expect(parentExecutionDataAfterSecondChildComplete.snapshot.executionStatus).toBe( + "EXECUTING" + ); + expect(parentExecutionDataAfterSecondChildComplete.batch?.id).toBe(batch.id); + expect(parentExecutionDataAfterSecondChildComplete.completedWaitpoints.length).toBe(3); + + const completedWaitpoint0 = + parentExecutionDataAfterSecondChildComplete.completedWaitpoints.find( + (w) => w.index === 0 + ); + assertNonNullable(completedWaitpoint0); + expect(completedWaitpoint0.id).toBe(child1Waitpoint!.waitpointId); + expect(completedWaitpoint0.completedByTaskRun?.id).toBe(child1.id); + expect(completedWaitpoint0.completedByTaskRun?.batch?.id).toBe(batch.id); + expect(completedWaitpoint0.output).toBe('{"foo":"bar"}'); + expect(completedWaitpoint0.index).toBe(0); + + const completedWaitpoint1 = + parentExecutionDataAfterSecondChildComplete.completedWaitpoints.find( + (w) => w.index === 1 + ); + assertNonNullable(completedWaitpoint1); + expect(completedWaitpoint1.id).toBe(child2Waitpoint!.waitpointId); + expect(completedWaitpoint1.completedByTaskRun?.id).toBe(child2.id); + expect(completedWaitpoint1.completedByTaskRun?.batch?.id).toBe(batch.id); + expect(completedWaitpoint1.index).toBe(1); + expect(completedWaitpoint1.output).toBe('{"baz":"qux"}'); + + const batchWaitpointAfter = + parentExecutionDataAfterSecondChildComplete.completedWaitpoints.find( + (w) => w.type === "BATCH" + ); + expect(batchWaitpointAfter?.id).toBe(batchWaitpoint?.waitpointId); + expect(batchWaitpointAfter?.completedByBatch?.id).toBe(batch.id); + expect(batchWaitpointAfter?.index).toBeUndefined(); + + const batchAfter = await prisma.batchTaskRun.findUnique({ + where: { + id: batch.id, + }, + }); + expect(batchAfter?.status === "COMPLETED"); + } finally { + engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts index 8c8bab2bd0..5776f4479a 100644 --- a/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts @@ -368,7 +368,7 @@ describe("RunEngine triggerAndWait", () => { }); const blockedResult = await engine.blockRunWithWaitpoint({ runId: parentRun2.id, - waitpointId: childRunWithWaitpoint.associatedWaitpoint!.id, + waitpoints: childRunWithWaitpoint.associatedWaitpoint!.id, environmentId: authenticatedEnvironment.id, projectId: authenticatedEnvironment.project.id, tx: prisma, diff --git a/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts b/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts index 419efacb85..d1429bc1a0 100644 --- a/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts +++ b/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts @@ -340,7 +340,7 @@ describe("RunEngine Waitpoints", () => { //block the run await engine.blockRunWithWaitpoint({ runId: run.id, - waitpointId: waitpoint.id, + waitpoints: waitpoint.id, environmentId: authenticatedEnvironment.id, projectId: authenticatedEnvironment.projectId, }); @@ -479,7 +479,7 @@ describe("RunEngine Waitpoints", () => { //block the run await engine.blockRunWithWaitpoint({ runId: run.id, - waitpointId: waitpoint.id, + waitpoints: waitpoint.id, environmentId: authenticatedEnvironment.id, projectId: authenticatedEnvironment.projectId, //fail after 200ms @@ -608,7 +608,7 @@ describe("RunEngine Waitpoints", () => { waitpoints.map((waitpoint) => engine.blockRunWithWaitpoint({ runId: run.id, - waitpointId: waitpoint.id, + waitpoints: waitpoint.id, environmentId: authenticatedEnvironment.id, projectId: authenticatedEnvironment.projectId, }) diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index dc40f7d1ed..9ee63b7744 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -65,7 +65,10 @@ export type TriggerParams = { tags: { id: string; name: string }[]; parentTaskRunId?: string; rootTaskRunId?: string; - batchId?: string; + batch?: { + id: string; + index: number; + }; resumeParentOnCompletion?: boolean; depth?: number; metadata?: string; diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index b71175be2a..e63b0dd836 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -1 +1 @@ -export { RunEngine } from "./engine/index"; +export { RunEngine, RunDuplicateIdempotencyKeyError } from "./engine/index"; diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 59d2a16a49..d2115552b4 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -4,8 +4,8 @@ import { generateJWT } from "../jwt.js"; import { AddTagsRequestBody, BatchTaskRunExecutionResult, - BatchTriggerTaskV2RequestBody, - BatchTriggerTaskV2Response, + BatchTriggerTaskV3RequestBody, + BatchTriggerTaskV3Response, CanceledRunResponse, CreateEnvironmentVariableRequestBody, CreateScheduleOptions, @@ -18,7 +18,7 @@ import { ListScheduleOptions, ReplayRunResponse, RescheduleRunRequestBody, - RetrieveBatchResponse, + RetrieveBatchV2Response, RetrieveRunResponse, ScheduleObject, TaskRunExecutionResult, @@ -42,9 +42,9 @@ import { } from "./core.js"; import { ApiError } from "./errors.js"; import { + AnyRealtimeRun, AnyRunShape, RealtimeRun, - AnyRealtimeRun, RunShape, RunStreamCallback, RunSubscription, @@ -75,8 +75,6 @@ export type ClientTriggerOptions = { }; export type ClientBatchTriggerOptions = ClientTriggerOptions & { - idempotencyKey?: string; - idempotencyKeyTTL?: string; processingStrategy?: "parallel" | "sequential"; }; @@ -100,10 +98,10 @@ const DEFAULT_ZOD_FETCH_OPTIONS: ZodFetchOptions = { export { isRequestOptions }; export type { + AnyRealtimeRun, AnyRunShape, ApiRequestOptions, RealtimeRun, - AnyRealtimeRun, RunShape, RunStreamCallback, RunSubscription, @@ -234,19 +232,17 @@ export class ApiClient { }); } - batchTriggerV2( - body: BatchTriggerTaskV2RequestBody, + batchTriggerV3( + body: BatchTriggerTaskV3RequestBody, clientOptions?: ClientBatchTriggerOptions, requestOptions?: TriggerRequestOptions ) { return zodfetch( - BatchTriggerTaskV2Response, - `${this.baseUrl}/api/v1/tasks/batch`, + BatchTriggerTaskV3Response, + `${this.baseUrl}/api/v2/tasks/batch`, { method: "POST", headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false, { - "idempotency-key": clientOptions?.idempotencyKey, - "idempotency-key-ttl": clientOptions?.idempotencyKeyTTL, "batch-processing-strategy": clientOptions?.processingStrategy, }), body: JSON.stringify(body), @@ -713,8 +709,8 @@ export class ApiClient { retrieveBatch(batchId: string, requestOptions?: ZodFetchOptions) { return zodfetch( - RetrieveBatchResponse, - `${this.baseUrl}/api/v1/batches/${batchId}`, + RetrieveBatchV2Response, + `${this.baseUrl}/api/v2/batches/${batchId}`, { method: "GET", headers: this.#getHeaders(false), diff --git a/packages/core/src/v3/idempotencyKeys.ts b/packages/core/src/v3/idempotencyKeys.ts index 7a8e053018..7544dc1ee2 100644 --- a/packages/core/src/v3/idempotencyKeys.ts +++ b/packages/core/src/v3/idempotencyKeys.ts @@ -8,6 +8,28 @@ export function isIdempotencyKey( return typeof value === "string" && value.length === 64; } +export function flattenIdempotencyKey( + idempotencyKey?: + | IdempotencyKey + | string + | string[] + | (undefined | IdempotencyKey | string | string[])[] +): IdempotencyKey | string | string[] | undefined { + if (!idempotencyKey) { + return; + } + + if (Array.isArray(idempotencyKey)) { + return idempotencyKey.flatMap((key) => { + const k = flattenIdempotencyKey(key); + if (!k) return []; + return [k]; + }) as string[]; + } + + return idempotencyKey; +} + export async function makeIdempotencyKey( idempotencyKey?: IdempotencyKey | string | string[] ): Promise { diff --git a/packages/core/src/v3/runtime/devRuntimeManager.ts b/packages/core/src/v3/runtime/devRuntimeManager.ts index acad2c3d0f..35e009ac5e 100644 --- a/packages/core/src/v3/runtime/devRuntimeManager.ts +++ b/packages/core/src/v3/runtime/devRuntimeManager.ts @@ -49,39 +49,42 @@ export class DevRuntimeManager implements RuntimeManager { async waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } + throw new Error("Method not implemented."); - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - const pendingCompletion = this._pendingCompletionNotifications.get(runId); + // if (!params.runs.length) { + // return Promise.resolve({ id: params.id, items: [] }); + // } - if (pendingCompletion) { - this._pendingCompletionNotifications.delete(runId); + // const promise = Promise.all( + // params.runs.map((runId) => { + // return new Promise((resolve, reject) => { + // const pendingCompletion = this._pendingCompletionNotifications.get(runId); - resolve(pendingCompletion); + // if (pendingCompletion) { + // this._pendingCompletionNotifications.delete(runId); - return; - } + // resolve(pendingCompletion); - this._taskWaits.set(runId, { resolve }); - }); - }) - ); + // return; + // } - await this.#tryFlushMetadata(); + // this._taskWaits.set(runId, { resolve }); + // }); + // }) + // ); + // await this.#tryFlushMetadata(); + + // const results = await promise; - const results = await promise; + // const results = await promise; - return { - id: params.id, - items: results, - }; + // return { + // id: params.id, + // items: results, + // }; } resumeTask(completion: TaskRunExecutionResult, runId: string): void { diff --git a/packages/core/src/v3/runtime/index.ts b/packages/core/src/v3/runtime/index.ts index 7eecb99296..a1fe0a804b 100644 --- a/packages/core/src/v3/runtime/index.ts +++ b/packages/core/src/v3/runtime/index.ts @@ -39,7 +39,7 @@ export class RuntimeAPI { public waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise { return usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params)); diff --git a/packages/core/src/v3/runtime/managedRuntimeManager.ts b/packages/core/src/v3/runtime/managedRuntimeManager.ts index 90eddcd5e7..ed3f4d13e0 100644 --- a/packages/core/src/v3/runtime/managedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/managedRuntimeManager.ts @@ -65,17 +65,18 @@ export class ManagedRuntimeManager implements RuntimeManager { async waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { + if (!params.runCount) { return Promise.resolve({ id: params.id, items: [] }); } const promise = Promise.all( - params.runs.map((runId) => { + Array.from({ length: params.runCount }, (_, index) => { + const resolverId = `${params.id}_${index}`; return new Promise((resolve, reject) => { - this.resolversByWaitId.set(runId, resolve); + this.resolversByWaitId.set(resolverId, resolve); }); }) ); @@ -99,8 +100,21 @@ export class ManagedRuntimeManager implements RuntimeManager { private completeWaitpoint(waitpoint: CompletedWaitpoint): void { console.log("completeWaitpoint", waitpoint); - const waitId = - waitpoint.completedByTaskRun?.friendlyId ?? this.resolversByWaitpoint.get(waitpoint.id); + let waitId: string | undefined; + + if (waitpoint.completedByTaskRun) { + if (waitpoint.completedByTaskRun.batch) { + waitId = `${waitpoint.completedByTaskRun.batch.friendlyId}_${waitpoint.index}`; + } else { + waitId = waitpoint.completedByTaskRun.friendlyId; + } + } else if (waitpoint.completedByBatch) { + //no waitpoint resolves associated with batch completions + //a batch completion isn't when all the runs from a batch are completed + return; + } else { + waitId = this.resolversByWaitpoint.get(waitpoint.id); + } if (!waitId) { // TODO: Handle failures better @@ -124,10 +138,12 @@ export class ManagedRuntimeManager implements RuntimeManager { } private waitpointToTaskRunExecutionResult(waitpoint: CompletedWaitpoint): TaskRunExecutionResult { + if (!waitpoint.completedByTaskRun?.friendlyId) throw new Error("Missing completedByTaskRun"); + if (waitpoint.outputIsError) { return { ok: false, - id: waitpoint.id, + id: waitpoint.completedByTaskRun.friendlyId, error: waitpoint.output ? JSON.parse(waitpoint.output) : { @@ -138,7 +154,7 @@ export class ManagedRuntimeManager implements RuntimeManager { } else { return { ok: true, - id: waitpoint.id, + id: waitpoint.completedByTaskRun.friendlyId, output: waitpoint.output, outputType: waitpoint.outputType ?? "application/json", } satisfies TaskRunSuccessfulExecutionResult; diff --git a/packages/core/src/v3/runtime/manager.ts b/packages/core/src/v3/runtime/manager.ts index 56acfe3cf2..d42e86cfad 100644 --- a/packages/core/src/v3/runtime/manager.ts +++ b/packages/core/src/v3/runtime/manager.ts @@ -11,7 +11,7 @@ export interface RuntimeManager { waitForTask(params: { id: string; ctx: TaskRunContext }): Promise; waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise; } diff --git a/packages/core/src/v3/runtime/noopRuntimeManager.ts b/packages/core/src/v3/runtime/noopRuntimeManager.ts index 16e96de3e9..30ee5fe788 100644 --- a/packages/core/src/v3/runtime/noopRuntimeManager.ts +++ b/packages/core/src/v3/runtime/noopRuntimeManager.ts @@ -32,7 +32,7 @@ export class NoopRuntimeManager implements RuntimeManager { waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise { return Promise.resolve({ diff --git a/packages/core/src/v3/runtime/prodRuntimeManager.ts b/packages/core/src/v3/runtime/prodRuntimeManager.ts index 4fd63e04bd..8c209d1e4c 100644 --- a/packages/core/src/v3/runtime/prodRuntimeManager.ts +++ b/packages/core/src/v3/runtime/prodRuntimeManager.ts @@ -80,34 +80,36 @@ export class ProdRuntimeManager implements RuntimeManager { async waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } + throw new Error("Method not implemented."); - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - this._taskWaits.set(runId, { resolve }); - }); - }) - ); - - await this.ipc.send("WAIT_FOR_BATCH", { - batchFriendlyId: params.id, - runFriendlyIds: params.runs, - }); + // if (!params.runs.length) { + // return Promise.resolve({ id: params.id, items: [] }); + // } - const results = await promise; + // const promise = Promise.all( + // params.runs.map((runId) => { + // return new Promise((resolve, reject) => { + // this._taskWaits.set(runId, { resolve }); + // }); + // }) + // ); - clock.reset(); + // await this.ipc.send("WAIT_FOR_BATCH", { + // batchFriendlyId: params.id, + // runFriendlyIds: params.runs, + // }); + + // const results = await promise; + + // clock.reset(); - return { - id: params.id, - items: results, - }; + // return { + // id: params.id, + // items: results, + // }; } resumeTask(completion: TaskRunExecutionResult): void { diff --git a/packages/core/src/v3/runtime/unmanagedRuntimeManager.ts b/packages/core/src/v3/runtime/unmanagedRuntimeManager.ts index 88b0350590..19796a1b6d 100644 --- a/packages/core/src/v3/runtime/unmanagedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/unmanagedRuntimeManager.ts @@ -40,27 +40,29 @@ export class UnmanagedRuntimeManager implements RuntimeManager { async waitForBatch(params: { id: string; - runs: string[]; + runCount: number; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } - - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - this._taskWaits.set(runId, { resolve }); - }); - }) - ); - - const results = await promise; - - return { - id: params.id, - items: results, - }; + throw new Error("Method not implemented."); + + // if (!params.runs.length) { + // return Promise.resolve({ id: params.id, items: [] }); + // } + + // const promise = Promise.all( + // params.runs.map((runId) => { + // return new Promise((resolve, reject) => { + // this._taskWaits.set(runId, { resolve }); + // }); + // }) + // ); + + // const results = await promise; + + // return { + // id: params.id, + // items: results, + // }; } async completeWaitpoints(waitpoints: Waitpoint[]): Promise { diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 6c7fde1a65..780bfc9b6b 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -119,6 +119,7 @@ export type TriggerTaskRequestBody = z.infer; export const TriggerTaskResponse = z.object({ id: z.string(), + isCached: z.boolean().optional(), }); export type TriggerTaskResponse = z.infer; @@ -191,6 +192,29 @@ export const BatchTriggerTaskV2Response = z.object({ export type BatchTriggerTaskV2Response = z.infer; +export const BatchTriggerTaskV3RequestBody = z.object({ + items: BatchTriggerTaskItem.array(), + /** + * RunEngine v2 + * If triggered inside another run, the parentRunId is the friendly ID of the parent run. + */ + parentRunId: z.string().optional(), + /** + * RunEngine v2 + * Should be `true` if `triggerAndWait` or `batchTriggerAndWait` + */ + resumeParentOnCompletion: z.boolean().optional(), +}); + +export type BatchTriggerTaskV3RequestBody = z.infer; + +export const BatchTriggerTaskV3Response = z.object({ + id: z.string(), + runCount: z.number(), +}); + +export type BatchTriggerTaskV3Response = z.infer; + export const BatchTriggerTaskResponse = z.object({ batchId: z.string(), runs: z.string().array(), @@ -807,6 +831,18 @@ export const RetrieveBatchResponse = z.object({ export type RetrieveBatchResponse = z.infer; +export const RetrieveBatchV2Response = z.object({ + id: z.string(), + status: BatchStatus, + idempotencyKey: z.string().optional(), + createdAt: z.coerce.date(), + updatedAt: z.coerce.date(), + runCount: z.number(), + runs: z.array(z.string()), +}); + +export type RetrieveBatchV2Response = z.infer; + export const SubscribeRealtimeStreamChunkRawShape = z.object({ id: z.string(), runId: z.string(), diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index 57a52095bc..e13f82d9c3 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -43,12 +43,14 @@ export const WaitpointType = { RUN: "RUN", DATETIME: "DATETIME", MANUAL: "MANUAL", + BATCH: "BATCH", } satisfies Enum; export type WaitpointType = (typeof WaitpointType)[keyof typeof WaitpointType]; export const CompletedWaitpoint = z.object({ id: z.string(), + index: z.number().optional(), friendlyId: z.string(), type: z.enum(Object.values(WaitpointType) as [WaitpointType]), completedAt: z.coerce.date(), @@ -58,10 +60,24 @@ export const CompletedWaitpoint = z.object({ .object({ id: z.string(), friendlyId: z.string(), + /** If the run has an associated batch */ + batch: z + .object({ + id: z.string(), + friendlyId: z.string(), + }) + .optional(), }) .optional(), /** For type === "DATETIME" */ completedAfter: z.coerce.date().optional(), + /** For type === "BATCH" */ + completedByBatch: z + .object({ + id: z.string(), + friendlyId: z.string(), + }) + .optional(), output: z.string().optional(), outputType: z.string().optional(), outputIsError: z.boolean(), @@ -164,6 +180,12 @@ export const RunExecutionData = z.object({ version: z.literal("1"), snapshot: ExecutionSnapshot, run: BaseRunMetadata, + batch: z + .object({ + id: z.string(), + friendlyId: z.string(), + }) + .optional(), checkpoint: z .object({ id: z.string(), diff --git a/packages/core/src/v3/semanticInternalAttributes.ts b/packages/core/src/v3/semanticInternalAttributes.ts index 98b14f1aa3..ed765b251d 100644 --- a/packages/core/src/v3/semanticInternalAttributes.ts +++ b/packages/core/src/v3/semanticInternalAttributes.ts @@ -12,6 +12,7 @@ export const SemanticInternalAttributes = { ATTEMPT_NUMBER: "ctx.attempt.number", RUN_ID: "ctx.run.id", RUN_IS_TEST: "ctx.run.isTest", + ORIGINAL_RUN_ID: "$original_run_id", BATCH_ID: "ctx.batch.id", TASK_SLUG: "ctx.task.id", TASK_PATH: "ctx.task.filePath", diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index a4d2edf9ee..e2843b764f 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -396,9 +396,7 @@ export type AnyBatchedRunHandle = BatchedRunHandle; export type BatchRunHandle = BrandedRun< { batchId: string; - isCached: boolean; - idempotencyKey?: string; - runs: Array>; + runCount: number; publicAccessToken: string; }, TOutput, @@ -777,7 +775,7 @@ export type TriggerOptions = { maxDuration?: number; }; -export type TriggerAndWaitOptions = Omit; +export type TriggerAndWaitOptions = TriggerOptions; export type BatchTriggerOptions = { idempotencyKey?: IdempotencyKey | string | string[]; @@ -796,19 +794,7 @@ export type BatchTriggerOptions = { triggerSequentially?: boolean; }; -export type BatchTriggerAndWaitOptions = { - /** - * When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower, - * especially for large batches. - * - * When false (default), triggers tasks in parallel for better performance, but order is not guaranteed. - * - * Note: This only affects the order of run creation, not the actual task execution. - * - * @default false - */ - triggerSequentially?: boolean; -}; +export type BatchTriggerAndWaitOptions = BatchTriggerOptions; export type TaskMetadataWithFunctions = TaskMetadata & { fns: { diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 86e0175662..13a4500649 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -1,4 +1,4 @@ -import { SpanKind } from "@opentelemetry/api"; +import { SpanKind, SpanStatusCode } from "@opentelemetry/api"; import { SerializableJson } from "@trigger.dev/core"; import { accessoryAttributes, @@ -24,6 +24,7 @@ import { TaskRunExecutionResult, TaskRunPromise, TaskFromIdentifier, + flattenIdempotencyKey, } from "@trigger.dev/core/v3"; import { PollOptions, runs } from "./runs.js"; import { tracer } from "./tracer.js"; @@ -584,10 +585,10 @@ export async function batchTriggerById( ): Promise>> { const apiClient = apiClientManager.clientOrThrow(); - const response = await apiClient.batchTriggerV2( + const response = await apiClient.batchTriggerV3( { items: await Promise.all( - items.map(async (item) => { + items.map(async (item, index) => { const taskMetadata = taskCatalog.getTask(item.id); const parsedPayload = taskMetadata?.fns.parsePayload @@ -596,6 +597,10 @@ export async function batchTriggerById( const payloadPacket = await stringifyIO(parsedPayload); + const batchItemIdempotencyKey = await makeIdempotencyKey( + flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) + ); + return { task: item.id, payload: payloadPacket.data, @@ -604,15 +609,15 @@ export async function batchTriggerById( concurrencyKey: item.options?.concurrencyKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, - idempotencyKey: await makeIdempotencyKey(item.options?.idempotencyKey), - idempotencyKeyTTL: item.options?.idempotencyKeyTTL, delay: item.options?.delay, ttl: item.options?.ttl, tags: item.options?.tags, maxAttempts: item.options?.maxAttempts, - parentAttempt: taskContext.ctx?.attempt.id, metadata: item.options?.metadata, maxDuration: item.options?.maxDuration, + idempotencyKey: + (await makeIdempotencyKey(item.options?.idempotencyKey)) ?? batchItemIdempotencyKey, + idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, }, }; }) @@ -621,8 +626,6 @@ export async function batchTriggerById( }, { spanParentAsLink: true, - idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), - idempotencyKeyTTL: options?.idempotencyKeyTTL, processingStrategy: options?.triggerSequentially ? "sequential" : undefined, }, { @@ -635,20 +638,8 @@ export async function batchTriggerById( span.setAttribute("batchId", body.id); } - if ("runs" in body && Array.isArray(body.runs)) { - span.setAttribute("runCount", body.runs.length); - } - - if ("isCached" in body && typeof body.isCached === "boolean") { - if (body.isCached) { - console.warn(`Result is a cached response because the request was idempotent.`); - } - - span.setAttribute("isCached", body.isCached); - } - - if ("idempotencyKey" in body && typeof body.idempotencyKey === "string") { - span.setAttribute("idempotencyKey", body.idempotencyKey); + if ("runCount" in body && typeof body.runCount === "number") { + span.setAttribute("runCount", body.runCount); } } }, @@ -658,9 +649,7 @@ export async function batchTriggerById( const handle = { batchId: response.id, - isCached: response.isCached, - idempotencyKey: response.idempotencyKey, - runs: response.runs, + runCount: response.runCount, publicAccessToken: response.publicAccessToken, }; @@ -760,10 +749,10 @@ export async function batchTriggerByIdAndWait( return await tracer.startActiveSpan( "batch.triggerAndWait()", async (span) => { - const response = await apiClient.batchTriggerV2( + const response = await apiClient.batchTriggerV3( { items: await Promise.all( - items.map(async (item) => { + items.map(async (item, index) => { const taskMetadata = taskCatalog.getTask(item.id); const parsedPayload = taskMetadata?.fns.parsePayload @@ -772,6 +761,10 @@ export async function batchTriggerByIdAndWait( const payloadPacket = await stringifyIO(parsedPayload); + const batchItemIdempotencyKey = await makeIdempotencyKey( + flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) + ); + return { task: item.id, payload: payloadPacket.data, @@ -787,11 +780,14 @@ export async function batchTriggerByIdAndWait( maxAttempts: item.options?.maxAttempts, metadata: item.options?.metadata, maxDuration: item.options?.maxDuration, + idempotencyKey: + (await makeIdempotencyKey(item.options?.idempotencyKey)) ?? + batchItemIdempotencyKey, + idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, }, }; }) ), - dependentAttempt: ctx.attempt.id, parentRunId: ctx.run.id, resumeParentOnCompletion: true, }, @@ -802,20 +798,11 @@ export async function batchTriggerByIdAndWait( ); span.setAttribute("batchId", response.id); - span.setAttribute("runCount", response.runs.length); - span.setAttribute("isCached", response.isCached); - - if (response.isCached) { - console.warn(`Result is a cached response because the request was idempotent.`); - } - - if (response.idempotencyKey) { - span.setAttribute("idempotencyKey", response.idempotencyKey); - } + span.setAttribute("runCount", response.runCount); const result = await runtime.waitForBatch({ id: response.id, - runs: response.runs.map((run) => run.id), + runCount: response.runCount, ctx, }); @@ -921,10 +908,10 @@ export async function batchTriggerTasks( ): Promise> { const apiClient = apiClientManager.clientOrThrow(); - const response = await apiClient.batchTriggerV2( + const response = await apiClient.batchTriggerV3( { items: await Promise.all( - items.map(async (item) => { + items.map(async (item, index) => { const taskMetadata = taskCatalog.getTask(item.task.id); const parsedPayload = taskMetadata?.fns.parsePayload @@ -933,6 +920,10 @@ export async function batchTriggerTasks( const payloadPacket = await stringifyIO(parsedPayload); + const batchItemIdempotencyKey = await makeIdempotencyKey( + flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) + ); + return { task: item.task.id, payload: payloadPacket.data, @@ -941,15 +932,15 @@ export async function batchTriggerTasks( concurrencyKey: item.options?.concurrencyKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, - idempotencyKey: await makeIdempotencyKey(item.options?.idempotencyKey), - idempotencyKeyTTL: item.options?.idempotencyKeyTTL, delay: item.options?.delay, ttl: item.options?.ttl, tags: item.options?.tags, maxAttempts: item.options?.maxAttempts, - parentAttempt: taskContext.ctx?.attempt.id, metadata: item.options?.metadata, maxDuration: item.options?.maxDuration, + idempotencyKey: + (await makeIdempotencyKey(item.options?.idempotencyKey)) ?? batchItemIdempotencyKey, + idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, }, }; }) @@ -958,8 +949,6 @@ export async function batchTriggerTasks( }, { spanParentAsLink: true, - idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), - idempotencyKeyTTL: options?.idempotencyKeyTTL, processingStrategy: options?.triggerSequentially ? "sequential" : undefined, }, { @@ -972,20 +961,8 @@ export async function batchTriggerTasks( span.setAttribute("batchId", body.id); } - if ("runs" in body && Array.isArray(body.runs)) { - span.setAttribute("runCount", body.runs.length); - } - - if ("isCached" in body && typeof body.isCached === "boolean") { - if (body.isCached) { - console.warn(`Result is a cached response because the request was idempotent.`); - } - - span.setAttribute("isCached", body.isCached); - } - - if ("idempotencyKey" in body && typeof body.idempotencyKey === "string") { - span.setAttribute("idempotencyKey", body.idempotencyKey); + if ("runCount" in body && typeof body.runCount === "number") { + span.setAttribute("runCount", body.runCount); } } }, @@ -995,9 +972,7 @@ export async function batchTriggerTasks( const handle = { batchId: response.id, - isCached: response.isCached, - idempotencyKey: response.idempotencyKey, - runs: response.runs, + runCount: response.runCount, publicAccessToken: response.publicAccessToken, }; @@ -1099,10 +1074,10 @@ export async function batchTriggerAndWaitTasks { - const response = await apiClient.batchTriggerV2( + const response = await apiClient.batchTriggerV3( { items: await Promise.all( - items.map(async (item) => { + items.map(async (item, index) => { const taskMetadata = taskCatalog.getTask(item.task.id); const parsedPayload = taskMetadata?.fns.parsePayload @@ -1111,6 +1086,10 @@ export async function batchTriggerAndWaitTasks run.id), + runCount: response.runCount, ctx, }); @@ -1203,7 +1176,6 @@ async function trigger_internal( ttl: options?.ttl, tags: options?.tags, maxAttempts: options?.maxAttempts, - parentAttempt: taskContext.ctx?.attempt.id, metadata: options?.metadata, maxDuration: options?.maxDuration, parentRunId: taskContext.ctx?.run.id, @@ -1243,14 +1215,18 @@ async function batchTrigger_internal( const ctx = taskContext.ctx; - const response = await apiClient.batchTriggerV2( + const response = await apiClient.batchTriggerV3( { items: await Promise.all( - items.map(async (item) => { + items.map(async (item, index) => { const parsedPayload = parsePayload ? await parsePayload(item.payload) : item.payload; const payloadPacket = await stringifyIO(parsedPayload); + const batchItemIdempotencyKey = await makeIdempotencyKey( + flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) + ); + return { task: taskIdentifier, payload: payloadPacket.data, @@ -1259,25 +1235,24 @@ async function batchTrigger_internal( concurrencyKey: item.options?.concurrencyKey, test: taskContext.ctx?.run.isTest, payloadType: payloadPacket.dataType, - idempotencyKey: await makeIdempotencyKey(item.options?.idempotencyKey), - idempotencyKeyTTL: item.options?.idempotencyKeyTTL, delay: item.options?.delay, ttl: item.options?.ttl, tags: item.options?.tags, maxAttempts: item.options?.maxAttempts, - parentAttempt: taskContext.ctx?.attempt.id, metadata: item.options?.metadata, maxDuration: item.options?.maxDuration, parentRunId: ctx?.run.id, + idempotencyKey: + (await makeIdempotencyKey(item.options?.idempotencyKey)) ?? batchItemIdempotencyKey, + idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, }, }; }) ), + parentRunId: ctx?.run.id, }, { spanParentAsLink: true, - idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), - idempotencyKeyTTL: options?.idempotencyKeyTTL, processingStrategy: options?.triggerSequentially ? "sequential" : undefined, }, { @@ -1290,20 +1265,8 @@ async function batchTrigger_internal( span.setAttribute("batchId", body.id); } - if ("runs" in body && Array.isArray(body.runs)) { - span.setAttribute("runCount", body.runs.length); - } - - if ("isCached" in body && typeof body.isCached === "boolean") { - if (body.isCached) { - console.warn(`Result is a cached response because the request was idempotent.`); - } - - span.setAttribute("isCached", body.isCached); - } - - if ("idempotencyKey" in body && typeof body.idempotencyKey === "string") { - span.setAttribute("idempotencyKey", body.idempotencyKey); + if ("runCount" in body && Array.isArray(body.runCount)) { + span.setAttribute("runCount", body.runCount); } } }, @@ -1313,9 +1276,7 @@ async function batchTrigger_internal( const handle = { batchId: response.id, - isCached: response.isCached, - idempotencyKey: response.idempotencyKey, - runs: response.runs, + runCount: response.runCount, publicAccessToken: response.publicAccessToken, }; @@ -1364,6 +1325,8 @@ async function triggerAndWait_internal { - const response = await apiClient.batchTriggerV2( + const response = await apiClient.batchTriggerV3( { items: await Promise.all( - items.map(async (item) => { + items.map(async (item, index) => { const parsedPayload = parsePayload ? await parsePayload(item.payload) : item.payload; const payloadPacket = await stringifyIO(parsedPayload); + const batchItemIdempotencyKey = await makeIdempotencyKey( + flattenIdempotencyKey([options?.idempotencyKey, `${index}`]) + ); + return { task: id, payload: payloadPacket.data, @@ -1440,11 +1407,14 @@ async function batchTriggerAndWait_internal run.id), + runCount: response.runCount, ctx, }); diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 18cb388b25..abedf0b1e3 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -50,7 +50,10 @@ export const batchParentTask = task({ logger.log("Results 3", { results3 }); const results4 = await batch.triggerByTaskAndWait([ - { task: childTask, payload: { message: "Hello, world !" } }, + { + task: childTask, + payload: { message: "Hello, world !" }, + }, { task: childTask, payload: { message: "Hello, world 2!" } }, ]); logger.log("Results 4", { results4 }); diff --git a/references/hello-world/src/trigger/idempotency.ts b/references/hello-world/src/trigger/idempotency.ts index 9136399cc4..6ea7c1c5ea 100644 --- a/references/hello-world/src/trigger/idempotency.ts +++ b/references/hello-world/src/trigger/idempotency.ts @@ -4,73 +4,281 @@ import { childTask } from "./example.js"; export const idempotency = task({ id: "idempotency", + maxDuration: 60, run: async (payload: any, { ctx }) => { logger.log("Hello, world from the parent", { payload }); - const child1Key = await idempotencyKeys.create("a", { scope: "global" }); + const successfulKey = await idempotencyKeys.create("a", { scope: "global" }); const child1 = await childTask.triggerAndWait( - { message: "Hello, world!", duration: 10_000 }, - { idempotencyKey: child1Key, idempotencyKeyTTL: "60s" } + { message: "Hello, world!", duration: 500, failureChance: 0 }, + { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" } ); logger.log("Child 1", { child1 }); - - ctx.attempt.id; - const child2 = await childTask.triggerAndWait( - { message: "Hello, world!", duration: 10_000 }, - { idempotencyKey: child1Key, idempotencyKeyTTL: "60s" } + { message: "Hello, world!", duration: 500 }, + { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" } ); logger.log("Child 2", { child2 }); + await childTask.trigger( + { message: "Hello, world!", duration: 500, failureChance: 0 }, + { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" } + ); + + const failureKey = await idempotencyKeys.create("b", { scope: "global" }); + + const child3 = await childTask.triggerAndWait( + { message: "Hello, world!", duration: 500, failureChance: 1 }, + { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" } + ); + logger.log("Child 3", { child3 }); + const child4 = await childTask.triggerAndWait( + { message: "Hello, world!", duration: 500, failureChance: 1 }, + { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" } + ); + logger.log("Child 4", { child4 }); + + const anotherKey = await idempotencyKeys.create("c", { scope: "global" }); + + const batch1 = await childTask.batchTriggerAndWait([ + { + payload: { message: "Hello, world!" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 3", duration: 500, failureChance: 0 }, + options: { idempotencyKey: anotherKey, idempotencyKeyTTL: "120s" }, + }, + ]); + logger.log("Batch 1", { batch1 }); + + await childTask.batchTrigger([ + { + payload: { message: "Hello, world!" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" }, + }, + ]); + + const results2 = await batch.triggerAndWait([ + { + id: "child", + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + id: "child", + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 2", { results2 }); + + const results3 = await batch.triggerByTask([ + { + task: childTask, + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + task: childTask, + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 3", { results3 }); + + const results4 = await batch.triggerByTaskAndWait([ + { + task: childTask, + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + task: childTask, + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 4", { results4 }); + }, +}); + +export const idempotencyBatch = task({ + id: "idempotency-batch", + maxDuration: 60, + run: async ({ additionalItems }: { additionalItems?: 2 }) => { + const successfulKey = await idempotencyKeys.create("a", { scope: "global" }); + const failureKey = await idempotencyKeys.create("b", { scope: "global" }); + const anotherKey = await idempotencyKeys.create("c", { scope: "global" }); + const batchKey = await idempotencyKeys.create("batch", { scope: "global" }); + + const moreItems = Array.from({ length: additionalItems ?? 0 }, (_, i) => ({ + payload: { message: `Hello, world ${i}!` }, + options: { idempotencyKey: `key-${i}`, idempotencyKeyTTL: "120s" }, + })); + + const batch1 = await childTask.batchTriggerAndWait( + [ + { + payload: { message: "Hello, world!" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 3", duration: 500, failureChance: 0 }, + }, + // Include runs in the same batch with the same idempotencyKeys + // I'm sure people will do this, even though it doesn't make sense + { + payload: { message: "Hello, world!" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" }, + }, + ...moreItems, + ], + { + idempotencyKey: batchKey, + idempotencyKeyTTL: "120s", + } + ); + logger.log("Batch 1", { batch1 }); + + const b = await batch.retrieve(batch1.id); + logger.log("Batch retrieve", { ...b }); + + const batch2 = await childTask.batchTriggerAndWait( + [ + { + payload: { message: "Hello, world!" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 3", duration: 500, failureChance: 0 }, + }, + ...moreItems, + ], + { + idempotencyKey: batchKey, + idempotencyKeyTTL: "120s", + } + ); + logger.log("Batch 1", { batch1 }); + + await childTask.batchTrigger([ + { + payload: { message: "Hello, world!" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "120s" }, + }, + { + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "120s" }, + }, + ]); + + await childTask.batchTrigger([ + { + payload: { message: "Hello, world!" }, + }, + { + payload: { message: "Hello, world 2!" }, + }, + ]); + + const results2 = await batch.triggerAndWait([ + { + id: "child", + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + id: "child", + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 2", { results2 }); + + const results3 = await batch.triggerByTask([ + { + task: childTask, + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + task: childTask, + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 3", { results3 }); + + const results4 = await batch.triggerByTaskAndWait([ + { + task: childTask, + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + task: childTask, + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 4", { results4 }); + }, +}); + +export const idempotencyTriggerByTaskAndWait = task({ + id: "idempotency-trigger-by-task-and-wait", + maxDuration: 60, + run: async () => { + const successfulKey = await idempotencyKeys.create("a", { scope: "global" }); + const failureKey = await idempotencyKeys.create("b", { scope: "global" }); + + const results1 = await batch.triggerByTaskAndWait([ + { + task: childTask, + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + task: childTask, + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 1", { results1 }); - // const results = await childTask.batchTriggerAndWait([ - // { - // payload: { message: "Hello, world!" }, - // //@ts-ignore - // options: { idempotencyKey: "1", idempotencyKeyTTL: "60s" }, - // }, - // { - // payload: { message: "Hello, world 2!" }, - // //@ts-ignore - // options: { idempotencyKey: "2", idempotencyKeyTTL: "60s" }, - // }, - // ]); - // logger.log("Results", { results }); - - // const results2 = await batch.triggerAndWait([ - // { - // id: "child", - // payload: { message: "Hello, world !" }, - // //@ts-ignore - // options: { idempotencyKey: "1", idempotencyKeyTTL: "60s" }, - // }, - // { - // id: "child", - // payload: { message: "Hello, world 2!" }, - // //@ts-ignore - // options: { idempotencyKey: "2", idempotencyKeyTTL: "60s" }, - // }, - // ]); - // logger.log("Results 2", { results2 }); - - // const results3 = await batch.triggerByTask([ - // { - // task: childTask, - // payload: { message: "Hello, world !" }, - // options: { idempotencyKey: "1", idempotencyKeyTTL: "60s" }, - // }, - // { - // task: childTask, - // payload: { message: "Hello, world 2!" }, - // options: { idempotencyKey: "2", idempotencyKeyTTL: "60s" }, - // }, - // ]); - // logger.log("Results 3", { results3 }); - - // const results4 = await batch.triggerByTaskAndWait([ - // { task: childTask, payload: { message: "Hello, world !" } }, - // { task: childTask, payload: { message: "Hello, world 2!" } }, - // ]); - // logger.log("Results 4", { results4 }); + const results2 = await batch.triggerByTaskAndWait([ + { + task: childTask, + payload: { message: "Hello, world !" }, + options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" }, + }, + { + task: childTask, + payload: { message: "Hello, world 2!" }, + options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" }, + }, + ]); + logger.log("Results 2", { results2 }); }, });