Skip to content

Commit e8a5785

Browse files
committed
Heartbeats should only reschedule existing heartbeat jobs
1 parent f62cd8f commit e8a5785

File tree

9 files changed

+302
-199
lines changed

9 files changed

+302
-199
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ export class MarQS {
499499
return;
500500
}
501501

502-
await this.options.visibilityTimeoutStrategy.heartbeat(
502+
await this.options.visibilityTimeoutStrategy.startHeartbeat(
503503
messageData.messageId,
504504
this.visibilityTimeoutInMs
505505
);
@@ -588,7 +588,7 @@ export class MarQS {
588588

589589
await this.options.subscriber?.messageDequeued(message);
590590

591-
await this.options.visibilityTimeoutStrategy.heartbeat(
591+
await this.options.visibilityTimeoutStrategy.startHeartbeat(
592592
messageData.messageId,
593593
this.visibilityTimeoutInMs
594594
);

apps/webapp/app/v3/marqs/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ export interface MessageQueueSubscriber {
9999
}
100100

101101
export interface VisibilityTimeoutStrategy {
102+
startHeartbeat(messageId: string, timeoutInMs: number): Promise<void>;
102103
heartbeat(messageId: string, timeoutInMs: number): Promise<void>;
103104
cancelHeartbeat(messageId: string): Promise<void>;
104105
}

apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server"
33
import { VisibilityTimeoutStrategy } from "./types";
44

55
export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy {
6+
async startHeartbeat(messageId: string, timeoutInMs: number): Promise<void> {
7+
await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
8+
}
9+
610
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
711
await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
812
}
@@ -13,7 +17,7 @@ export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy {
1317
}
1418

1519
export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy {
16-
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
20+
async startHeartbeat(messageId: string, timeoutInMs: number): Promise<void> {
1721
await legacyRunEngineWorker.enqueue({
1822
id: `heartbeat:${messageId}`,
1923
job: "runHeartbeat",
@@ -22,6 +26,13 @@ export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeo
2226
});
2327
}
2428

29+
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
30+
await legacyRunEngineWorker.reschedule(
31+
`heartbeat:${messageId}`,
32+
new Date(Date.now() + timeoutInMs)
33+
);
34+
}
35+
2536
async cancelHeartbeat(messageId: string): Promise<void> {
2637
await legacyRunEngineWorker.ack(`heartbeat:${messageId}`);
2738
}

internal-packages/redis-worker/src/queue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
213213
}
214214
}
215215

216+
async reschedule(id: string, availableAt: Date): Promise<void> {
217+
await this.redis.zadd(`queue`, "XX", availableAt.getTime(), id);
218+
}
219+
216220
async size({ includeFuture = false }: { includeFuture?: boolean } = {}): Promise<number> {
217221
try {
218222
if (includeFuture) {

internal-packages/redis-worker/src/worker.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,26 @@ class Worker<TCatalog extends WorkerCatalog> {
163163
);
164164
}
165165

166+
/**
167+
* Reschedules an existing job to a new available date.
168+
* If the job isn't in the queue, it will be ignored.
169+
*/
170+
reschedule(id: string, availableAt: Date) {
171+
return startSpan(
172+
this.tracer,
173+
"reschedule",
174+
async (span) => {
175+
return this.queue.reschedule(id, availableAt);
176+
},
177+
{
178+
kind: SpanKind.PRODUCER,
179+
attributes: {
180+
job_id: id,
181+
},
182+
}
183+
);
184+
}
185+
166186
ack(id: string) {
167187
return startSpan(
168188
this.tracer,
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
import { BatchResult, queue, task, wait } from "@trigger.dev/sdk/v3";
2+
3+
export const recursiveTask = task({
4+
id: "recursive-task",
5+
queue: {
6+
concurrencyLimit: 1,
7+
},
8+
retry: {
9+
maxAttempts: 1,
10+
},
11+
run: async (
12+
{ delayMs, depth, useBatch = false }: { delayMs: number; depth: number; useBatch: boolean },
13+
{ ctx }
14+
) => {
15+
if (depth === 0) {
16+
return;
17+
}
18+
19+
await new Promise((resolve) => setTimeout(resolve, delayMs));
20+
21+
if (useBatch) {
22+
const batchResult = await recursiveTask.batchTriggerAndWait([
23+
{
24+
payload: { delayMs, depth: depth - 1, useBatch },
25+
options: { tags: ["recursive"] },
26+
},
27+
]);
28+
29+
const firstRun = batchResult.runs[0] as any;
30+
31+
return {
32+
ok: firstRun.ok,
33+
};
34+
} else {
35+
const result = (await recursiveTask.triggerAndWait({
36+
delayMs,
37+
depth: depth - 1,
38+
useBatch,
39+
})) as any;
40+
41+
return {
42+
ok: result.ok,
43+
};
44+
}
45+
},
46+
});
47+
48+
export const singleQueue = queue({
49+
name: "single-queue",
50+
concurrencyLimit: 1,
51+
});
52+
53+
export const delayTask = task({
54+
id: "delay-task",
55+
retry: {
56+
maxAttempts: 1,
57+
},
58+
run: async (payload: { delayMs: number }, { ctx }) => {
59+
await new Promise((resolve) => setTimeout(resolve, payload.delayMs));
60+
},
61+
});
62+
63+
export const retryTask = task({
64+
id: "retry-task",
65+
queue: singleQueue,
66+
retry: {
67+
maxAttempts: 2,
68+
},
69+
run: async (
70+
payload: { delayMs: number; throwError: boolean; failureCount: number; retryDelayMs?: number },
71+
{ ctx }
72+
) => {
73+
await new Promise((resolve) => setTimeout(resolve, payload.delayMs));
74+
75+
if (payload.throwError && ctx.attempt.number <= payload.failureCount) {
76+
throw new Error("Error");
77+
}
78+
},
79+
handleError: async (payload, error, { ctx }) => {
80+
if (!payload.throwError) {
81+
return {
82+
skipRetrying: true,
83+
};
84+
} else {
85+
return {
86+
retryDelayInMs: payload.retryDelayMs,
87+
};
88+
}
89+
},
90+
});
91+
92+
export const durationWaitTask = task({
93+
id: "duration-wait-task",
94+
queue: {
95+
concurrencyLimit: 1,
96+
},
97+
run: async (
98+
{
99+
waitDurationInSeconds = 5,
100+
doWait = true,
101+
}: { waitDurationInSeconds: number; doWait: boolean },
102+
{ ctx }
103+
) => {
104+
if (doWait) {
105+
await wait.for({ seconds: waitDurationInSeconds });
106+
} else {
107+
await new Promise((resolve) => setTimeout(resolve, waitDurationInSeconds * 1000));
108+
}
109+
},
110+
});
111+
112+
export const resumeParentTask = task({
113+
id: "resume-parent-task",
114+
queue: {
115+
concurrencyLimit: 1,
116+
},
117+
run: async (
118+
{
119+
delayMs = 5_000,
120+
triggerChildTask,
121+
useBatch = false,
122+
}: { delayMs: number; triggerChildTask: boolean; useBatch: boolean },
123+
{ ctx }
124+
) => {
125+
if (triggerChildTask) {
126+
if (useBatch) {
127+
const batchResult = await resumeChildTask.batchTriggerAndWait([
128+
{
129+
payload: { delayMs },
130+
options: { tags: ["resume-child"] },
131+
},
132+
]);
133+
134+
unwrapBatchResult(batchResult);
135+
} else {
136+
await resumeChildTask.triggerAndWait({ delayMs }, { tags: ["resume-child"] }).unwrap();
137+
}
138+
} else {
139+
await new Promise((resolve) => setTimeout(resolve, delayMs));
140+
}
141+
},
142+
});
143+
144+
export const resumeChildTask = task({
145+
id: "resume-child-task",
146+
run: async (payload: { delayMs: number }, { ctx }) => {
147+
await new Promise((resolve) => setTimeout(resolve, payload.delayMs));
148+
},
149+
});
150+
151+
export const genericParentTask = task({
152+
id: "generic-parent-task",
153+
run: async (
154+
{
155+
delayMs = 5_000,
156+
triggerChildTask,
157+
useBatch = false,
158+
}: { delayMs: number; triggerChildTask: boolean; useBatch: boolean },
159+
{ ctx }
160+
) => {
161+
if (triggerChildTask) {
162+
if (useBatch) {
163+
const batchResult = await genericChildTask.batchTriggerAndWait([
164+
{
165+
payload: { delayMs },
166+
options: { tags: ["resume-child"] },
167+
},
168+
]);
169+
170+
return unwrapBatchResult(batchResult);
171+
} else {
172+
await genericChildTask.triggerAndWait({ delayMs }, { tags: ["resume-child"] }).unwrap();
173+
}
174+
} else {
175+
await new Promise((resolve) => setTimeout(resolve, delayMs));
176+
}
177+
},
178+
});
179+
180+
function unwrapBatchResult(batchResult: BatchResult<string, any>) {
181+
if (batchResult.runs.some((run) => !run.ok)) {
182+
throw new Error(`Child task failed: ${batchResult.runs.find((run) => !run.ok)?.error}`);
183+
}
184+
185+
return batchResult.runs;
186+
}
187+
188+
export const genericChildTask = task({
189+
id: "generic-child-task",
190+
run: async (payload: { delayMs: number }, { ctx }) => {
191+
await new Promise((resolve) => setTimeout(resolve, payload.delayMs));
192+
},
193+
});
194+
195+
export const eventLoopLagTask = task({
196+
id: "event-loop-lag-task",
197+
run: async ({ delayMs }: { delayMs: number }, { ctx }) => {
198+
const start = Date.now();
199+
while (Date.now() - start < delayMs) {
200+
// Do nothing
201+
}
202+
},
203+
});
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { waitForRunStatus } from "@/utils.js";
2+
import { logger, task } from "@trigger.dev/sdk/v3";
3+
import assert from "assert";
4+
import { genericChildTask } from "./helpers.js";
5+
6+
export const describeHeartbeats = task({
7+
id: "describe/heartbeats",
8+
retry: {
9+
maxAttempts: 1,
10+
},
11+
run: async (
12+
{ visibilityTimeoutSeconds = 100 }: { visibilityTimeoutSeconds?: number },
13+
{ ctx }
14+
) => {
15+
await testHeartbeats.triggerAndWait({ visibilityTimeoutSeconds }).unwrap();
16+
},
17+
});
18+
19+
export const testHeartbeats = task({
20+
id: "test/heartbeats",
21+
retry: {
22+
maxAttempts: 1,
23+
},
24+
run: async (
25+
{ visibilityTimeoutSeconds = 100 }: { visibilityTimeoutSeconds?: number },
26+
{ ctx }
27+
) => {
28+
const run = await genericChildTask.trigger({
29+
delayMs: visibilityTimeoutSeconds * 1_000 + 5 * 1000,
30+
});
31+
32+
await waitForRunStatus(run.id, ["EXECUTING"]);
33+
34+
logger.info("Heartbeat test: run is executing");
35+
36+
const completedRun = await waitForRunStatus(
37+
run.id,
38+
["COMPLETED", "FAILED", "SYSTEM_FAILURE", "CRASHED"],
39+
visibilityTimeoutSeconds + 30,
40+
5_000
41+
);
42+
43+
assert(completedRun.status === "COMPLETED", `Run failed with status ${completedRun.status}`);
44+
45+
logger.info("Heartbeat test: run is completed");
46+
},
47+
});

0 commit comments

Comments
 (0)