Skip to content

Run Engine 2.0 trigger idempotency #1613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f1d5481
Return isCached from the trigger API endpoint
matt-aitken Jan 3, 2025
8642497
Fix for the wrong type when blocking a run
matt-aitken Jan 3, 2025
8d882d8
Render the idempotent run in the inspector
matt-aitken Jan 3, 2025
67c9085
Event repository for idempotency
matt-aitken Jan 3, 2025
0e958e7
Debug events off by default, added an admin toggle to show them
matt-aitken Jan 3, 2025
c175ed9
triggerAndWait idempotency span
matt-aitken Jan 3, 2025
cf83a53
Some improvements to the reference idempotency task
matt-aitken Jan 3, 2025
3064c2a
Merge remote-tracking branch 'origin/run-engine-2' into engine-2-idem…
matt-aitken Jan 3, 2025
d3043da
Removed the cached tracing from the SDK
matt-aitken Jan 6, 2025
1149c6e
Server-side creating cached span
matt-aitken Jan 6, 2025
4f9344a
Improved idempotency test task
matt-aitken Jan 6, 2025
59b30eb
Create cached task spans in a better way
matt-aitken Jan 6, 2025
93c3d23
Idempotency span support inc batch trigger
matt-aitken Jan 7, 2025
fdfd064
Simplified how the spans are done, using more of the existing code
matt-aitken Jan 7, 2025
8a8aaac
Improved the idempotency test task
matt-aitken Jan 9, 2025
2d4c67c
Added Waitpoint Batch type, add to TaskRunWaitpoint with order
matt-aitken Jan 9, 2025
4e9f8ca
Pass batch ids through to the run engine when triggering
matt-aitken Jan 9, 2025
bf6946a
Added batchIndex
matt-aitken Jan 9, 2025
7038f4c
Better batch support in the run engine
matt-aitken Jan 9, 2025
041cd87
Added settings to batch trigger service, before major overhaul
matt-aitken Jan 10, 2025
641edd2
Allow the longer run/batch ids in the filters
matt-aitken Jan 13, 2025
151a50a
Changed how batching works, includes breaking changes in CLI
matt-aitken Jan 13, 2025
efc83c9
Removed batch idempotency because it gets put on the runs instead
matt-aitken Jan 13, 2025
1230871
Added `runs` to the batch.retrieve call/API
matt-aitken Jan 13, 2025
5f0d45a
Set firstAttemptStartedAt when creating the first attempt
matt-aitken Jan 14, 2025
509438d
Do nothing when receiving a BATCH waitpoint
matt-aitken Jan 14, 2025
7ec2726
Some fixes in the new batch trigger service… mostly just passing miss…
matt-aitken Jan 14, 2025
5dfa932
Tweaked the idempotency test task for more situations
matt-aitken Jan 14, 2025
bd6ee2c
Only block with a batch if it’s a batchTriggerAndWait… 🤦‍♂️
matt-aitken Jan 14, 2025
b73ca8b
Added another case to the idempotency test task: multiple of the same…
matt-aitken Jan 14, 2025
6d0927e
Support for the same run multiple times in the same batch
matt-aitken Jan 14, 2025
5ac9673
Small tweaks
matt-aitken Jan 14, 2025
84ee4d3
Make sure to complete batches, even if they’re not andWait ones
matt-aitken Jan 14, 2025
67060db
Merge remote-tracking branch 'origin/run-engine-2' into engine-2-idem…
matt-aitken Jan 15, 2025
9bf52cc
Export RunDuplicateIdempotencyKeyError from the run engine
matt-aitken Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions apps/webapp/app/assets/icons/TaskCachedIcon.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export function TaskCachedIcon({ className }: { className?: string }) {
return (
<svg
className={className}
width="16"
height="16"
viewBox="0 0 16 16"
fill="none"
xmlns="http://www.w3.org/2000/svg"
>
<g clipPath="url(#clip0_15584_76102)">
<path
d="M0.5 3.5L0.5 2.5C0.5 1.39543 1.39543 0.5 2.5 0.5H3.5"
stroke="#3B82F6"
strokeLinecap="square"
strokeLinejoin="round"
/>
<path
d="M15.5 12.5L15.5 13.5C15.5 14.6046 14.6046 15.5 13.5 15.5L12.5 15.5"
stroke="#3B82F6"
strokeLinecap="square"
strokeLinejoin="round"
/>
<path
d="M12.5 0.5L13.5 0.5C14.6046 0.5 15.5 1.39543 15.5 2.5L15.5 3.5"
stroke="#3B82F6"
strokeLinecap="square"
strokeLinejoin="round"
/>
<path
d="M3.5 15.5L2.5 15.5C1.39543 15.5 0.5 14.6046 0.5 13.5L0.5 12.5"
stroke="#3B82F6"
strokeLinecap="square"
strokeLinejoin="round"
/>
<path d="M11.1799 4.19V5.598H8.8479V12H7.1649V5.598H4.8219V4.19H11.1799Z" fill="#3B82F6" />
<line x1="6" y1="15.5" x2="10" y2="15.5" stroke="#3B82F6" />
<line x1="6" y1="0.5" x2="10" y2="0.5" stroke="#3B82F6" />
<line x1="15.5" y1="6" x2="15.5" y2="10" stroke="#3B82F6" />
<line x1="0.5" y1="6" x2="0.5" y2="10" stroke="#3B82F6" />
</g>
<defs>
<clipPath id="clip0_15584_76102">
<rect width="16" height="16" fill="white" />
</clipPath>
</defs>
</svg>
);
}
4 changes: 2 additions & 2 deletions apps/webapp/app/components/runs/v3/BatchFilters.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ function BatchIdDropdown({
if (batchId) {
if (!batchId.startsWith("batch_")) {
error = "Batch IDs start with 'batch_'";
} else if (batchId.length !== 27) {
error = "Batch IDs are 27 characters long";
} else if (batchId.length !== 27 && batchId.length !== 31) {
error = "Batch IDs are 27/32 characters long";
}
}

Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/components/runs/v3/RunFilters.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ function RunIdDropdown({
if (runId) {
if (!runId.startsWith("run_")) {
error = "Run IDs start with 'run_'";
} else if (runId.length !== 25) {
error = "Run IDs are 25 characters long";
} else if (runId.length !== 25 && runId.length !== 29) {
error = "Run IDs are 25/30 characters long";
}
}

Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/components/runs/v3/RunIcon.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from "@heroicons/react/20/solid";
import { AttemptIcon } from "~/assets/icons/AttemptIcon";
import { TaskIcon } from "~/assets/icons/TaskIcon";
import { TaskCachedIcon } from "~/assets/icons/TaskCachedIcon";
import { NamedIcon } from "~/components/primitives/NamedIcon";
import { cn } from "~/utils/cn";

Expand Down Expand Up @@ -41,6 +42,8 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
switch (name) {
case "task":
return <TaskIcon className={cn(className, "text-blue-500")} />;
case "task-cached":
return <TaskCachedIcon className={cn(className, "text-blue-500")} />;
case "scheduled":
return <ClockIcon className={cn(className, "text-sun-500")} />;
case "attempt":
Expand Down
65 changes: 42 additions & 23 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
MachinePresetName,
parsePacket,
prettyPrintPacket,
SemanticInternalAttributes,
TaskRunError,
} from "@trigger.dev/core/v3";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
Expand Down Expand Up @@ -39,7 +40,22 @@ export class SpanPresenter extends BasePresenter {
throw new Error("Project not found");
}

const run = await this.getRun(spanId);
const parentRun = await this._prisma.taskRun.findFirst({
select: {
traceId: true,
},
where: {
friendlyId: runFriendlyId,
},
});

if (!parentRun) {
return;
}

const { traceId } = parentRun;

const run = await this.getRun(traceId, spanId);
if (run) {
return {
type: "run" as const,
Expand All @@ -48,7 +64,7 @@ export class SpanPresenter extends BasePresenter {
}

//get the run
const span = await this.getSpan(runFriendlyId, spanId);
const span = await this.getSpan(traceId, spanId);

if (!span) {
throw new Error("Span not found");
Expand All @@ -60,10 +76,17 @@ export class SpanPresenter extends BasePresenter {
};
}

async getRun(spanId: string) {
async getRun(traceId: string, spanId: string) {
const span = await eventRepository.getSpan(spanId, traceId);

if (!span) {
return;
}

const run = await this._replica.taskRun.findFirst({
select: {
id: true,
spanId: true,
traceId: true,
//metadata
number: true,
Expand Down Expand Up @@ -92,13 +115,15 @@ export class SpanPresenter extends BasePresenter {
//status + duration
status: true,
startedAt: true,
firstAttemptStartedAt: true,
createdAt: true,
updatedAt: true,
queuedAt: true,
completedAt: true,
logsDeletedAt: true,
//idempotency
idempotencyKey: true,
idempotencyKeyExpiresAt: true,
//delayed
delayUntil: true,
//ttl
Expand Down Expand Up @@ -161,9 +186,13 @@ export class SpanPresenter extends BasePresenter {
},
},
},
where: {
spanId,
},
where: span.originalRun
? {
friendlyId: span.originalRun,
}
: {
spanId,
},
});

if (!run) {
Expand Down Expand Up @@ -238,8 +267,6 @@ export class SpanPresenter extends BasePresenter {
}
}

const span = await eventRepository.getSpan(spanId, run.traceId);

const metadata = run.metadata
? await prettyPrintPacket(run.metadata, run.metadataType, {
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
Expand Down Expand Up @@ -296,6 +323,7 @@ export class SpanPresenter extends BasePresenter {
status: run.status,
createdAt: run.createdAt,
startedAt: run.startedAt,
firstAttemptStartedAt: run.firstAttemptStartedAt,
updatedAt: run.updatedAt,
delayUntil: run.delayUntil,
expiredAt: run.expiredAt,
Expand All @@ -307,6 +335,8 @@ export class SpanPresenter extends BasePresenter {
sdkVersion: run.lockedToVersion?.sdkVersion,
isTest: run.isTest,
environmentId: run.runtimeEnvironment.id,
idempotencyKey: run.idempotencyKey,
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
schedule: run.schedule
? {
friendlyId: run.schedule.friendlyId,
Expand Down Expand Up @@ -349,24 +379,13 @@ export class SpanPresenter extends BasePresenter {
engine: run.engine,
masterQueue: run.masterQueue,
secondaryMasterQueue: run.secondaryMasterQueue,
spanId: run.spanId,
isCached: !!span.originalRun,
};
}

async getSpan(runFriendlyId: string, spanId: string) {
const run = await this._prisma.taskRun.findFirst({
select: {
traceId: true,
},
where: {
friendlyId: runFriendlyId,
},
});

if (!run) {
return;
}

const span = await eventRepository.getSpan(spanId, run.traceId);
async getSpan(traceId: string, spanId: string) {
const span = await eventRepository.getSpan(spanId, traceId);

if (!span) {
return;
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const { action, loader } = createActionApiRoute(
return json(
{
id: run.friendlyId,
isCached: run.isCached,
},
{
headers: $responseHeaders,
Expand Down
12 changes: 1 addition & 11 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { determineEngineVersion } from "~/v3/engineVersion.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";

Expand Down Expand Up @@ -88,15 +86,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const version = await determineEngineVersion({
environment: authentication.environment,
version: engineVersion ?? undefined,
});

const service =
version === "V1"
? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined)
: new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
40 changes: 40 additions & 0 deletions apps/webapp/app/routes/api.v2.batches.$batchId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
batchId: z.string(),
});

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: (params, auth) => {
return $replica.batchTaskRun.findFirst({
where: {
friendlyId: params.batchId,
runtimeEnvironmentId: auth.environment.id,
},
});
},
authorization: {
action: "read",
resource: (batch) => ({ batch: batch.friendlyId }),
superScopes: ["read:runs", "read:all", "admin"],
},
},
async ({ resource: batch }) => {
return json({
id: batch.friendlyId,
status: batch.status,
idempotencyKey: batch.idempotencyKey ?? undefined,
createdAt: batch.createdAt,
updatedAt: batch.updatedAt,
runCount: batch.runCount,
runs: batch.runIds,
});
}
);
Loading