Skip to content

Commit 6da5e7a

Browse files
authored
Various fixes for run engine v1 (#1643)
* Various fixes for run engine v1 - Make sure there are connected providers before sending a scheduled attempt message, nack and retry if there are not - Fail runs that fail task heartbeats when pending and locked - More and better logging around shared queue consumer - Fix bug when failing a task run with no attempt * Prevent findUnique from bringing down our database
1 parent 75753ec commit 6da5e7a

8 files changed

+163
-46
lines changed

apps/webapp/app/v3/failedTaskRun.server.ts

+13-12
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,21 @@ import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.ser
1414
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
1515
import * as semver from "semver";
1616

17-
const includeAttempts = {
18-
attempts: {
19-
orderBy: {
20-
createdAt: "desc",
17+
const FailedTaskRunRetryGetPayload = {
18+
select: {
19+
id: true,
20+
attempts: {
21+
orderBy: {
22+
createdAt: "desc",
23+
},
24+
take: 1,
2125
},
22-
take: 1,
26+
lockedById: true, // task
27+
lockedToVersionId: true, // worker
2328
},
24-
lockedBy: true, // task
25-
lockedToVersion: true, // worker
26-
} satisfies Prisma.TaskRunInclude;
29+
} as const;
2730

28-
type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
29-
include: typeof includeAttempts;
30-
}>;
31+
type TaskRunWithAttempts = Prisma.TaskRunGetPayload<typeof FailedTaskRunRetryGetPayload>;
3132

3233
export class FailedTaskRunService extends BaseService {
3334
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
@@ -92,7 +93,7 @@ export class FailedTaskRunRetryHelper extends BaseService {
9293
where: {
9394
id: runId,
9495
},
95-
include: includeAttempts,
96+
...FailedTaskRunRetryGetPayload,
9697
});
9798

9899
if (!taskRun) {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

+60-29
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import {
1313
MachinePreset,
1414
ProdTaskRunExecution,
1515
ProdTaskRunExecutionPayload,
16-
QueueOptions,
1716
TaskRunError,
1817
TaskRunErrorCodes,
1918
TaskRunExecution,
@@ -29,13 +28,13 @@ import {
2928
BackgroundWorker,
3029
BackgroundWorkerTask,
3130
Prisma,
32-
TaskQueue,
3331
TaskRunStatus,
3432
} from "@trigger.dev/database";
3533
import { z } from "zod";
3634
import { $replica, prisma } from "~/db.server";
3735
import { env } from "~/env.server";
3836
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
37+
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
3938
import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
4039
import { logger } from "~/services/logger.server";
4140
import { singleton } from "~/utils/singleton";
@@ -67,7 +66,6 @@ import {
6766
import { tracer } from "../tracer.server";
6867
import { getMaxDuration } from "../utils/maxDuration";
6968
import { MessagePayload } from "./types";
70-
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
7169

7270
const WithTraceContext = z.object({
7371
traceparent: z.string().optional(),
@@ -323,6 +321,14 @@ export class SharedQueueConsumer {
323321
ROOT_CONTEXT
324322
);
325323

324+
logger.debug("SharedQueueConsumer starting new trace", {
325+
reasonStats: this._reasonStats,
326+
actionStats: this._actionStats,
327+
outcomeStats: this._outcomeStats,
328+
iterationCount: this._iterationsCount,
329+
consumerId: this._id,
330+
});
331+
326332
// Get the span trace context
327333
this._currentSpanContext = trace.setSpan(ROOT_CONTEXT, this._currentSpan);
328334

@@ -351,6 +357,10 @@ export class SharedQueueConsumer {
351357
try {
352358
const result = await this.#doWorkInternal();
353359

360+
if (result.reason !== "no_message_dequeued") {
361+
logger.debug("SharedQueueConsumer doWorkInternal result", { result });
362+
}
363+
354364
this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1;
355365
this._outcomeStats[result.outcome] = (this._outcomeStats[result.outcome] ?? 0) + 1;
356366

@@ -371,6 +381,9 @@ export class SharedQueueConsumer {
371381
if (result.error) {
372382
span.recordException(result.error);
373383
span.setStatus({ code: SpanStatusCode.ERROR });
384+
this._currentSpan?.recordException(result.error);
385+
this._currentSpan?.setStatus({ code: SpanStatusCode.ERROR });
386+
this._endSpanInNextIteration = true;
374387
}
375388

376389
if (typeof result.interval === "number") {
@@ -755,7 +768,7 @@ export class SharedQueueConsumer {
755768
);
756769

757770
if (!queue) {
758-
logger.debug("SharedQueueConsumer queue not found, so nacking message", {
771+
logger.debug("SharedQueueConsumer queue not found, so acking message", {
759772
queueMessage: message,
760773
taskRunQueue: lockedTaskRun.queue,
761774
runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
@@ -876,33 +889,49 @@ export class SharedQueueConsumer {
876889
machinePresetFromRun(lockedTaskRun) ??
877890
machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {});
878891

879-
await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => {
880-
await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", {
881-
backgroundWorkerId: worker.friendlyId,
882-
data: {
883-
type: "SCHEDULE_ATTEMPT",
884-
image: imageReference,
885-
version: deployment.version,
886-
machine,
887-
nextAttemptNumber,
888-
// identifiers
889-
id: "placeholder", // TODO: Remove this completely in a future release
890-
envId: lockedTaskRun.runtimeEnvironment.id,
891-
envType: lockedTaskRun.runtimeEnvironment.type,
892-
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
893-
projectId: lockedTaskRun.runtimeEnvironment.projectId,
894-
runId: lockedTaskRun.id,
895-
},
892+
return await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => {
893+
span.setAttributes({
894+
run_id: lockedTaskRun.id,
896895
});
897-
});
898896

899-
return {
900-
action: "noop",
901-
reason: "scheduled_attempt",
902-
attrs: {
903-
next_attempt_number: nextAttemptNumber,
904-
},
905-
};
897+
if (await this._providerSender.validateCanSendMessage()) {
898+
await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", {
899+
backgroundWorkerId: worker.friendlyId,
900+
data: {
901+
type: "SCHEDULE_ATTEMPT",
902+
image: imageReference,
903+
version: deployment.version,
904+
machine,
905+
nextAttemptNumber,
906+
// identifiers
907+
id: "placeholder", // TODO: Remove this completely in a future release
908+
envId: lockedTaskRun.runtimeEnvironment.id,
909+
envType: lockedTaskRun.runtimeEnvironment.type,
910+
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
911+
projectId: lockedTaskRun.runtimeEnvironment.projectId,
912+
runId: lockedTaskRun.id,
913+
},
914+
});
915+
916+
return {
917+
action: "noop",
918+
reason: "scheduled_attempt",
919+
attrs: {
920+
next_attempt_number: nextAttemptNumber,
921+
},
922+
};
923+
} else {
924+
return {
925+
action: "nack_and_do_more_work",
926+
reason: "provider_not_connected",
927+
attrs: {
928+
run_id: lockedTaskRun.id,
929+
},
930+
interval: this._options.nextTickInterval,
931+
retryInMs: 5_000,
932+
};
933+
}
934+
});
906935
}
907936
} catch (e) {
908937
// We now need to unlock the task run and delete the task run attempt
@@ -929,6 +958,8 @@ export class SharedQueueConsumer {
929958
action: "nack_and_do_more_work",
930959
reason: "failed_to_schedule_attempt",
931960
error: e instanceof Error ? e : String(e),
961+
interval: this._options.nextTickInterval,
962+
retryInMs: 5_000,
932963
};
933964
}
934965
}

apps/webapp/app/v3/requeueTaskRun.server.ts

+23-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class RequeueTaskRunService extends BaseService {
1919
id: true,
2020
friendlyId: true,
2121
status: true,
22+
lockedAt: true,
2223
runtimeEnvironment: {
2324
select: {
2425
type: true,
@@ -42,9 +43,29 @@ export class RequeueTaskRunService extends BaseService {
4243

4344
switch (taskRun.status) {
4445
case "PENDING": {
45-
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
46+
if (taskRun.lockedAt) {
47+
logger.debug(
48+
"[RequeueTaskRunService] Failing task run because the heartbeat failed and it's PENDING but locked",
49+
{ taskRun }
50+
);
51+
52+
const service = new FailedTaskRunService();
53+
54+
await service.call(taskRun.friendlyId, {
55+
ok: false,
56+
id: taskRun.friendlyId,
57+
retry: undefined,
58+
error: {
59+
type: "INTERNAL_ERROR",
60+
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
61+
message: "Did not receive a heartbeat from the worker in time",
62+
},
63+
});
64+
} else {
65+
logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun });
4666

47-
await marqs?.nackMessage(taskRun.id);
67+
await marqs?.nackMessage(taskRun.id);
68+
}
4869

4970
break;
5071
}

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,12 @@ async function getAuthenticatedEnvironmentFromRun(
254254
friendlyId: string,
255255
prismaClient?: PrismaClientOrTransaction
256256
) {
257-
const taskRun = await (prismaClient ?? prisma).taskRun.findUnique({
257+
const isFriendlyId = friendlyId.startsWith("run_");
258+
259+
const taskRun = await (prismaClient ?? prisma).taskRun.findFirst({
258260
where: {
259-
friendlyId,
261+
id: !isFriendlyId ? friendlyId : undefined,
262+
friendlyId: isFriendlyId ? friendlyId : undefined,
260263
},
261264
include: {
262265
runtimeEnvironment: {

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { logger } from "~/services/logger.server";
66

77
export class ResumeTaskDependencyService extends BaseService {
88
public async call(dependencyId: string, sourceTaskAttemptId: string) {
9-
const dependency = await this._prisma.taskRunDependency.findUnique({
9+
const dependency = await this._prisma.taskRunDependency.findFirst({
1010
where: { id: dependencyId },
1111
include: {
1212
taskRun: {

apps/webapp/app/v3/sharedSocketConnection.ts

+8
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ export class SharedSocketConnection {
8080
}
8181
});
8282
},
83+
canSendMessage() {
84+
// Return true if there is at least 1 connected socket on the namespace
85+
if (opts.namespace.sockets.size === 0) {
86+
return false;
87+
}
88+
89+
return Array.from(opts.namespace.sockets.values()).some((socket) => socket.connected);
90+
},
8391
});
8492

8593
logger.debug("Starting SharedQueueConsumer pool", {

packages/core/src/logger.ts

+43
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,17 @@ export class Logger {
9696
// Get the current context from trace if it exists
9797
const currentSpan = trace.getSpan(context.active());
9898

99+
const structuredError = extractStructuredErrorFromArgs(...args);
100+
const structuredMessage = extractStructuredMessageFromArgs(...args);
101+
99102
const structuredLog = {
100103
...structureArgs(safeJsonClone(args) as Record<string, unknown>[], this.#filteredKeys),
101104
...this.#additionalFields(),
105+
...(structuredError ? { error: structuredError } : {}),
102106
timestamp: new Date(),
103107
name: this.#name,
104108
message,
109+
...(structuredMessage ? { $message: structuredMessage } : {}),
105110
level,
106111
traceId:
107112
currentSpan && currentSpan.isRecording() ? currentSpan?.spanContext().traceId : undefined,
@@ -118,6 +123,44 @@ export class Logger {
118123
}
119124
}
120125

126+
// Detect if args is an error object
127+
// Or if args contains an error object at the "error" key
128+
// In both cases, return the error object as a structured error
129+
function extractStructuredErrorFromArgs(...args: Array<Record<string, unknown> | undefined>) {
130+
const error = args.find((arg) => arg instanceof Error) as Error | undefined;
131+
132+
if (error) {
133+
return {
134+
message: error.message,
135+
stack: error.stack,
136+
name: error.name,
137+
};
138+
}
139+
140+
const structuredError = args.find((arg) => arg?.error);
141+
142+
if (structuredError && structuredError.error instanceof Error) {
143+
return {
144+
message: structuredError.error.message,
145+
stack: structuredError.error.stack,
146+
name: structuredError.error.name,
147+
};
148+
}
149+
150+
return;
151+
}
152+
153+
function extractStructuredMessageFromArgs(...args: Array<Record<string, unknown> | undefined>) {
154+
// Check to see if there is a `message` key in the args, and if so, return it
155+
const structuredMessage = args.find((arg) => arg?.message);
156+
157+
if (structuredMessage) {
158+
return structuredMessage.message;
159+
}
160+
161+
return;
162+
}
163+
121164
function createReplacer(replacer?: (key: string, value: unknown) => unknown) {
122165
return (key: string, value: unknown) => {
123166
if (typeof value === "bigint") {

packages/core/src/v3/zodMessageHandler.ts

+10
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,25 @@ type ZodMessageSenderCallback<TMessageCatalog extends ZodMessageCatalogSchema> =
239239
export type ZodMessageSenderOptions<TMessageCatalog extends ZodMessageCatalogSchema> = {
240240
schema: TMessageCatalog;
241241
sender: ZodMessageSenderCallback<TMessageCatalog>;
242+
canSendMessage?: () => Promise<boolean> | boolean;
242243
};
243244

244245
export class ZodMessageSender<TMessageCatalog extends ZodMessageCatalogSchema> {
245246
#schema: TMessageCatalog;
246247
#sender: ZodMessageSenderCallback<TMessageCatalog>;
248+
#canSendMessage?: ZodMessageSenderOptions<TMessageCatalog>["canSendMessage"];
247249

248250
constructor(options: ZodMessageSenderOptions<TMessageCatalog>) {
249251
this.#schema = options.schema;
250252
this.#sender = options.sender;
253+
this.#canSendMessage = options.canSendMessage;
254+
}
255+
256+
public async validateCanSendMessage(): Promise<boolean> {
257+
if (!this.#canSendMessage) {
258+
return true;
259+
}
260+
return await this.#canSendMessage();
251261
}
252262

253263
public async send<K extends keyof TMessageCatalog>(

0 commit comments

Comments
 (0)