Skip to content

Commit bec58eb

Browse files
committed
Try and nack, if it fails then fail the run
1 parent 775a078 commit bec58eb

File tree

1 file changed

+79
-51
lines changed

1 file changed

+79
-51
lines changed

apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts

Lines changed: 79 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,85 +39,113 @@ export class TaskRunHeartbeatFailedService extends BaseService {
3939
});
4040

4141
if (!taskRun) {
42-
logger.error("[RequeueTaskRunService] Task run not found", {
42+
logger.error("[TaskRunHeartbeatFailedService] Task run not found", {
4343
runId,
4444
});
4545

4646
return;
4747
}
4848

49+
const service = new FailedTaskRunService();
50+
4951
switch (taskRun.status) {
5052
case "PENDING": {
51-
if (taskRun.lockedAt) {
52-
if (taskRun._count.attempts === 0) {
53-
//no attempts, so we can requeue
54-
logger.debug("[RequeueTaskRunService] Requeueing task run, there were no attempts.", {
53+
const backInQueue = await marqs?.nackMessage(taskRun.id);
54+
55+
if (backInQueue) {
56+
logger.debug(
57+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
58+
{
5559
taskRun,
56-
});
57-
58-
await marqs?.nackMessage(taskRun.id);
59-
} else {
60-
logger.debug(
61-
"[RequeueTaskRunService] Failing task run because the heartbeat failed, it's PENDING, locked, and has attempts",
62-
{ taskRun }
63-
);
64-
65-
const service = new FailedTaskRunService();
66-
67-
await service.call(taskRun.friendlyId, {
68-
ok: false,
69-
id: taskRun.friendlyId,
70-
retry: undefined,
71-
error: {
72-
type: "INTERNAL_ERROR",
73-
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
74-
message: "Did not receive a heartbeat from the worker in time",
75-
},
76-
});
77-
}
60+
}
61+
);
7862
} else {
79-
logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun });
80-
81-
await marqs?.nackMessage(taskRun.id);
63+
logger.debug(
64+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
65+
{ taskRun }
66+
);
67+
await service.call(taskRun.friendlyId, {
68+
ok: false,
69+
id: taskRun.friendlyId,
70+
retry: undefined,
71+
error: {
72+
type: "INTERNAL_ERROR",
73+
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
74+
message: "Did not receive a heartbeat from the worker in time",
75+
},
76+
});
8277
}
8378

8479
break;
8580
}
8681
case "EXECUTING":
8782
case "RETRYING_AFTER_FAILURE": {
88-
logger.debug("[RequeueTaskRunService] Failing task run", { taskRun });
89-
90-
const service = new FailedTaskRunService();
91-
92-
await service.call(taskRun.friendlyId, {
93-
ok: false,
94-
id: taskRun.friendlyId,
95-
retry: undefined,
96-
error: {
97-
type: "INTERNAL_ERROR",
98-
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
99-
message: "Did not receive a heartbeat from the worker in time",
100-
},
101-
});
83+
const backInQueue = await marqs?.nackMessage(taskRun.id);
84+
85+
if (backInQueue) {
86+
logger.debug(
87+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
88+
{
89+
taskRun,
90+
}
91+
);
92+
} else {
93+
logger.debug(
94+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
95+
{ taskRun }
96+
);
97+
await service.call(taskRun.friendlyId, {
98+
ok: false,
99+
id: taskRun.friendlyId,
100+
retry: undefined,
101+
error: {
102+
type: "INTERNAL_ERROR",
103+
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
104+
message: "Did not receive a heartbeat from the worker in time",
105+
},
106+
});
107+
}
102108

103109
break;
104110
}
105111
case "DELAYED":
106112
case "WAITING_FOR_DEPLOY": {
107-
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });
113+
logger.debug("[TaskRunHeartbeatFailedService] Removing task run from queue", { taskRun });
108114

109115
await marqs?.acknowledgeMessage(
110116
taskRun.id,
111-
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService"
117+
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in TaskRunHeartbeatFailedService"
112118
);
113119

114120
break;
115121
}
116122
case "WAITING_TO_RESUME":
117123
case "PAUSED": {
118-
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
124+
const backInQueue = await marqs?.nackMessage(taskRun.id);
119125

120-
await marqs?.nackMessage(taskRun.id);
126+
if (backInQueue) {
127+
logger.debug(
128+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
129+
{
130+
taskRun,
131+
}
132+
);
133+
} else {
134+
logger.debug(
135+
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
136+
{ taskRun }
137+
);
138+
await service.call(taskRun.friendlyId, {
139+
ok: false,
140+
id: taskRun.friendlyId,
141+
retry: undefined,
142+
error: {
143+
type: "INTERNAL_ERROR",
144+
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
145+
message: "Did not receive a heartbeat from the worker in time",
146+
},
147+
});
148+
}
121149

122150
break;
123151
}
@@ -129,11 +157,11 @@ export class TaskRunHeartbeatFailedService extends BaseService {
129157
case "EXPIRED":
130158
case "TIMED_OUT":
131159
case "CANCELED": {
132-
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
160+
logger.debug("[TaskRunHeartbeatFailedService] Task run is completed", { taskRun });
133161

134162
await marqs?.acknowledgeMessage(
135163
taskRun.id,
136-
"Task run is already completed in RequeueTaskRunService"
164+
"Task run is already completed in TaskRunHeartbeatFailedService"
137165
);
138166

139167
try {
@@ -149,7 +177,7 @@ export class TaskRunHeartbeatFailedService extends BaseService {
149177
delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined,
150178
});
151179
} catch (error) {
152-
logger.error("[RequeueTaskRunService] Error signaling run cancellation", {
180+
logger.error("[TaskRunHeartbeatFailedService] Error signaling run cancellation", {
153181
runId: taskRun.id,
154182
error: error instanceof Error ? error.message : error,
155183
});

0 commit comments

Comments
 (0)