Skip to content

Commit 9c08764

Browse files
authored
fix(engine): prevent race condition that prevents triggerAndWait runs from resuming by atomically creating associated waitpoint records (#2519)
1 parent 691903c commit 9c08764

File tree

6 files changed

+464
-44
lines changed

6 files changed

+464
-44
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,19 @@ import type {
3737
QueueManager,
3838
RunNumberIncrementer,
3939
TraceEventConcern,
40+
TriggerRacepoints,
41+
TriggerRacepointSystem,
4042
TriggerTaskRequest,
4143
TriggerTaskValidator,
4244
} from "../types";
4345
import { ServiceValidationError } from "~/v3/services/common.server";
4446

47+
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
48+
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
49+
return;
50+
}
51+
}
52+
4553
export class RunEngineTriggerTaskService {
4654
private readonly queueConcern: QueueManager;
4755
private readonly validator: TriggerTaskValidator;
@@ -52,6 +60,7 @@ export class RunEngineTriggerTaskService {
5260
private readonly engine: RunEngine;
5361
private readonly tracer: Tracer;
5462
private readonly traceEventConcern: TraceEventConcern;
63+
private readonly triggerRacepointSystem: TriggerRacepointSystem;
5564
private readonly metadataMaximumSize: number;
5665

5766
constructor(opts: {
@@ -65,6 +74,7 @@ export class RunEngineTriggerTaskService {
6574
traceEventConcern: TraceEventConcern;
6675
tracer: Tracer;
6776
metadataMaximumSize: number;
77+
triggerRacepointSystem?: TriggerRacepointSystem;
6878
}) {
6979
this.prisma = opts.prisma;
7080
this.engine = opts.engine;
@@ -76,6 +86,7 @@ export class RunEngineTriggerTaskService {
7686
this.tracer = opts.tracer;
7787
this.traceEventConcern = opts.traceEventConcern;
7888
this.metadataMaximumSize = opts.metadataMaximumSize;
89+
this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem();
7990
}
8091

8192
public async call({
@@ -196,19 +207,16 @@ export class RunEngineTriggerTaskService {
196207

197208
const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult;
198209

210+
if (idempotencyKey) {
211+
await this.triggerRacepointSystem.waitForRacepoint({
212+
racepoint: "idempotencyKey",
213+
id: idempotencyKey,
214+
});
215+
}
216+
199217
if (!options.skipChecks) {
200218
const queueSizeGuard = await this.queueConcern.validateQueueLimits(environment);
201219

202-
logger.debug("Queue size guard result", {
203-
queueSizeGuard,
204-
environment: {
205-
id: environment.id,
206-
type: environment.type,
207-
organization: environment.organization,
208-
project: environment.project,
209-
},
210-
});
211-
212220
if (!queueSizeGuard.ok) {
213221
throw new ServiceValidationError(
214222
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`

apps/webapp/app/runEngine/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,9 @@ export interface TraceEventConcern {
156156
callback: (span: TracedEventSpan) => Promise<T>
157157
): Promise<T>;
158158
}
159+
160+
export type TriggerRacepoints = "idempotencyKey";
161+
162+
export interface TriggerRacepointSystem {
163+
waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void>;
164+
}

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 250 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ vi.mock("~/services/platform.v3.server", async (importOriginal) => {
1616

1717
import { RunEngine } from "@internal/run-engine";
1818
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "@internal/run-engine/tests";
19-
import { containerTest } from "@internal/testcontainers";
19+
import { assertNonNullable, containerTest } from "@internal/testcontainers";
2020
import { trace } from "@opentelemetry/api";
2121
import { IOPacket } from "@trigger.dev/core/v3";
2222
import { TaskRun } from "@trigger.dev/database";
@@ -31,11 +31,15 @@ import {
3131
TagValidationParams,
3232
TracedEventSpan,
3333
TraceEventConcern,
34+
TriggerRacepoints,
35+
TriggerRacepointSystem,
3436
TriggerTaskRequest,
3537
TriggerTaskValidator,
3638
ValidationResult,
3739
} from "~/runEngine/types";
3840
import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server";
41+
import { promiseWithResolvers } from "@trigger.dev/core";
42+
import { setTimeout } from "node:timers/promises";
3943

4044
vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
4145

@@ -108,6 +112,29 @@ class MockTraceEventConcern implements TraceEventConcern {
108112
}
109113
}
110114

115+
type TriggerRacepoint = { promise: Promise<void>; resolve: (value: void) => void };
116+
117+
class MockTriggerRacepointSystem implements TriggerRacepointSystem {
118+
private racepoints: Record<string, TriggerRacepoint | undefined> = {};
119+
120+
async waitForRacepoint({ id }: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
121+
const racepoint = this.racepoints[id];
122+
123+
if (racepoint) {
124+
return racepoint.promise;
125+
}
126+
127+
return Promise.resolve();
128+
}
129+
130+
registerRacepoint(racepoint: TriggerRacepoints, id: string): TriggerRacepoint {
131+
const { promise, resolve } = promiseWithResolvers<void>();
132+
this.racepoints[id] = { promise, resolve };
133+
134+
return { promise, resolve };
135+
}
136+
}
137+
111138
describe("RunEngineTriggerTaskService", () => {
112139
containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => {
113140
const engine = new RunEngine({
@@ -312,6 +339,228 @@ describe("RunEngineTriggerTaskService", () => {
312339
await engine.quit();
313340
});
314341

342+
containerTest(
343+
"should handle idempotency keys when the engine throws an RunDuplicateIdempotencyKeyError",
344+
async ({ prisma, redisOptions }) => {
345+
const engine = new RunEngine({
346+
prisma,
347+
worker: {
348+
redis: redisOptions,
349+
workers: 1,
350+
tasksPerWorker: 10,
351+
pollIntervalMs: 100,
352+
},
353+
queue: {
354+
redis: redisOptions,
355+
},
356+
runLock: {
357+
redis: redisOptions,
358+
},
359+
machines: {
360+
defaultMachine: "small-1x",
361+
machines: {
362+
"small-1x": {
363+
name: "small-1x" as const,
364+
cpu: 0.5,
365+
memory: 0.5,
366+
centsPerMs: 0.0001,
367+
},
368+
},
369+
baseCostInCents: 0.0005,
370+
},
371+
tracer: trace.getTracer("test", "0.0.0"),
372+
logLevel: "debug",
373+
});
374+
375+
const parentTask = "parent-task";
376+
377+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
378+
379+
const taskIdentifier = "test-task";
380+
381+
//create background worker
382+
await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, taskIdentifier]);
383+
384+
const parentRun1 = await engine.trigger(
385+
{
386+
number: 1,
387+
friendlyId: "run_p1",
388+
environment: authenticatedEnvironment,
389+
taskIdentifier: parentTask,
390+
payload: "{}",
391+
payloadType: "application/json",
392+
context: {},
393+
traceContext: {},
394+
traceId: "t12345",
395+
spanId: "s12345",
396+
queue: `task/${parentTask}`,
397+
isTest: false,
398+
tags: [],
399+
workerQueue: "main",
400+
},
401+
prisma
402+
);
403+
404+
//dequeue parent and create the attempt
405+
await setTimeout(500);
406+
const dequeued = await engine.dequeueFromWorkerQueue({
407+
consumerId: "test_12345",
408+
workerQueue: "main",
409+
});
410+
await engine.startRunAttempt({
411+
runId: parentRun1.id,
412+
snapshotId: dequeued[0].snapshot.id,
413+
});
414+
415+
const parentRun2 = await engine.trigger(
416+
{
417+
number: 2,
418+
friendlyId: "run_p2",
419+
environment: authenticatedEnvironment,
420+
taskIdentifier: parentTask,
421+
payload: "{}",
422+
payloadType: "application/json",
423+
context: {},
424+
traceContext: {},
425+
traceId: "t12346",
426+
spanId: "s12346",
427+
queue: `task/${parentTask}`,
428+
isTest: false,
429+
tags: [],
430+
workerQueue: "main",
431+
},
432+
prisma
433+
);
434+
435+
await setTimeout(500);
436+
const dequeued2 = await engine.dequeueFromWorkerQueue({
437+
consumerId: "test_12345",
438+
workerQueue: "main",
439+
});
440+
await engine.startRunAttempt({
441+
runId: parentRun2.id,
442+
snapshotId: dequeued2[0].snapshot.id,
443+
});
444+
445+
const queuesManager = new DefaultQueueManager(prisma, engine);
446+
447+
const idempotencyKeyConcern = new IdempotencyKeyConcern(
448+
prisma,
449+
engine,
450+
new MockTraceEventConcern()
451+
);
452+
453+
const triggerRacepointSystem = new MockTriggerRacepointSystem();
454+
455+
const triggerTaskService = new RunEngineTriggerTaskService({
456+
engine,
457+
prisma,
458+
runNumberIncrementer: new MockRunNumberIncrementer(),
459+
payloadProcessor: new MockPayloadProcessor(),
460+
queueConcern: queuesManager,
461+
idempotencyKeyConcern,
462+
validator: new MockTriggerTaskValidator(),
463+
traceEventConcern: new MockTraceEventConcern(),
464+
tracer: trace.getTracer("test", "0.0.0"),
465+
metadataMaximumSize: 1024 * 1024 * 1, // 1MB
466+
triggerRacepointSystem,
467+
});
468+
469+
const idempotencyKey = "test-idempotency-key";
470+
471+
const racepoint = triggerRacepointSystem.registerRacepoint("idempotencyKey", idempotencyKey);
472+
473+
const childTriggerPromise1 = triggerTaskService.call({
474+
taskId: taskIdentifier,
475+
environment: authenticatedEnvironment,
476+
body: {
477+
payload: { test: "test" },
478+
options: {
479+
idempotencyKey,
480+
parentRunId: parentRun1.friendlyId,
481+
resumeParentOnCompletion: true,
482+
},
483+
},
484+
});
485+
486+
const childTriggerPromise2 = triggerTaskService.call({
487+
taskId: taskIdentifier,
488+
environment: authenticatedEnvironment,
489+
body: {
490+
payload: { test: "test" },
491+
options: {
492+
idempotencyKey,
493+
parentRunId: parentRun2.friendlyId,
494+
resumeParentOnCompletion: true,
495+
},
496+
},
497+
});
498+
499+
await setTimeout(500);
500+
501+
// Now we can resolve the racepoint
502+
racepoint.resolve();
503+
504+
const result = await childTriggerPromise1;
505+
const result2 = await childTriggerPromise2;
506+
507+
expect(result).toBeDefined();
508+
expect(result?.run.friendlyId).toBeDefined();
509+
expect(result?.run.status).toBe("PENDING");
510+
511+
const run = await prisma.taskRun.findUnique({
512+
where: {
513+
id: result?.run.id,
514+
},
515+
});
516+
517+
expect(run).toBeDefined();
518+
expect(run?.friendlyId).toBe(result?.run.friendlyId);
519+
expect(run?.engine).toBe("V2");
520+
expect(run?.queuedAt).toBeDefined();
521+
expect(run?.queue).toBe(`task/${taskIdentifier}`);
522+
523+
expect(result2).toBeDefined();
524+
expect(result2?.run.friendlyId).toBe(result?.run.friendlyId);
525+
526+
const parent1ExecutionData = await engine.getRunExecutionData({ runId: parentRun1.id });
527+
assertNonNullable(parent1ExecutionData);
528+
expect(parent1ExecutionData.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
529+
530+
const parent2ExecutionData = await engine.getRunExecutionData({ runId: parentRun2.id });
531+
assertNonNullable(parent2ExecutionData);
532+
expect(parent2ExecutionData.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
533+
534+
const parent1RunWaitpoint = await prisma.taskRunWaitpoint.findFirst({
535+
where: {
536+
taskRunId: parentRun1.id,
537+
},
538+
include: {
539+
waitpoint: true,
540+
},
541+
});
542+
543+
assertNonNullable(parent1RunWaitpoint);
544+
expect(parent1RunWaitpoint.waitpoint.type).toBe("RUN");
545+
expect(parent1RunWaitpoint.waitpoint.completedByTaskRunId).toBe(result?.run.id);
546+
547+
const parent2RunWaitpoint = await prisma.taskRunWaitpoint.findFirst({
548+
where: {
549+
taskRunId: parentRun2.id,
550+
},
551+
include: {
552+
waitpoint: true,
553+
},
554+
});
555+
556+
assertNonNullable(parent2RunWaitpoint);
557+
expect(parent2RunWaitpoint.waitpoint.type).toBe("RUN");
558+
expect(parent2RunWaitpoint.waitpoint.completedByTaskRunId).toBe(result2?.run.id);
559+
560+
await engine.quit();
561+
}
562+
);
563+
315564
containerTest(
316565
"should resolve queue names correctly when locked to version",
317566
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)