Skip to content

Commit 12b666b

Browse files
committed
Fixed task trigger queue update logic to only update an existing queue if the concurrency limit changes, instead of on every single trigger task call
1 parent 2bdbedc commit 12b666b

File tree

3 files changed

+58
-42
lines changed

3 files changed

+58
-42
lines changed

apps/webapp/app/v3/services/completeAttempt.server.ts

+19-21
Original file line numberDiff line numberDiff line change
@@ -131,30 +131,28 @@ export class CompleteAttemptService extends BaseService {
131131
taskRunAttempt: NonNullable<FoundAttempt>,
132132
env?: AuthenticatedEnvironment
133133
): Promise<"COMPLETED"> {
134-
await $transaction(this._prisma, async (tx) => {
135-
await tx.taskRunAttempt.update({
136-
where: { id: taskRunAttempt.id },
137-
data: {
138-
status: "COMPLETED",
139-
completedAt: new Date(),
140-
output: completion.output,
141-
outputType: completion.outputType,
142-
usageDurationMs: completion.usage?.durationMs,
143-
taskRun: {
144-
update: {
145-
output: completion.output,
146-
outputType: completion.outputType,
147-
},
134+
await this._prisma.taskRunAttempt.update({
135+
where: { id: taskRunAttempt.id },
136+
data: {
137+
status: "COMPLETED",
138+
completedAt: new Date(),
139+
output: completion.output,
140+
outputType: completion.outputType,
141+
usageDurationMs: completion.usage?.durationMs,
142+
taskRun: {
143+
update: {
144+
output: completion.output,
145+
outputType: completion.outputType,
148146
},
149147
},
150-
});
148+
},
149+
});
151150

152-
const finalizeService = new FinalizeTaskRunService(tx);
153-
await finalizeService.call({
154-
id: taskRunAttempt.taskRunId,
155-
status: "COMPLETED_SUCCESSFULLY",
156-
completedAt: new Date(),
157-
});
151+
const finalizeService = new FinalizeTaskRunService();
152+
await finalizeService.call({
153+
id: taskRunAttempt.taskRunId,
154+
status: "COMPLETED_SUCCESSFULLY",
155+
completedAt: new Date(),
158156
});
159157

160158
// Now we need to "complete" the task run event/span

apps/webapp/app/v3/services/triggerTask.server.ts

+38-20
Original file line numberDiff line numberDiff line change
@@ -414,20 +414,40 @@ export class TriggerTaskService extends BaseService {
414414
},
415415
});
416416

417+
const existingConcurrencyLimit =
418+
typeof taskQueue?.concurrencyLimit === "number"
419+
? taskQueue.concurrencyLimit
420+
: undefined;
421+
417422
if (taskQueue) {
418-
taskQueue = await tx.taskQueue.update({
419-
where: {
420-
id: taskQueue.id,
421-
},
422-
data: {
423-
concurrencyLimit,
424-
rateLimit: body.options.queue.rateLimit,
425-
},
426-
});
423+
if (existingConcurrencyLimit !== concurrencyLimit) {
424+
taskQueue = await tx.taskQueue.update({
425+
where: {
426+
id: taskQueue.id,
427+
},
428+
data: {
429+
concurrencyLimit:
430+
typeof concurrencyLimit === "number" ? concurrencyLimit : null,
431+
rateLimit: body.options.queue.rateLimit,
432+
},
433+
});
434+
435+
if (typeof taskQueue.concurrencyLimit === "number") {
436+
await marqs?.updateQueueConcurrencyLimits(
437+
environment,
438+
taskQueue.name,
439+
taskQueue.concurrencyLimit
440+
);
441+
} else {
442+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
443+
}
444+
}
427445
} else {
446+
const queueId = generateFriendlyId("queue");
447+
428448
taskQueue = await tx.taskQueue.create({
429449
data: {
430-
friendlyId: generateFriendlyId("queue"),
450+
friendlyId: queueId,
431451
name: queueName,
432452
concurrencyLimit,
433453
runtimeEnvironmentId: environment.id,
@@ -436,16 +456,14 @@ export class TriggerTaskService extends BaseService {
436456
type: "NAMED",
437457
},
438458
});
439-
}
440459

441-
if (typeof taskQueue.concurrencyLimit === "number") {
442-
await marqs?.updateQueueConcurrencyLimits(
443-
environment,
444-
taskQueue.name,
445-
taskQueue.concurrencyLimit
446-
);
447-
} else {
448-
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
460+
if (typeof taskQueue.concurrencyLimit === "number") {
461+
await marqs?.updateQueueConcurrencyLimits(
462+
environment,
463+
taskQueue.name,
464+
taskQueue.concurrencyLimit
465+
);
466+
}
449467
}
450468
}
451469

@@ -599,7 +617,7 @@ export class TriggerTaskService extends BaseService {
599617

600618
const filename = `${pathPrefix}/payload.json`;
601619

602-
await uploadToObjectStore(filename, packet.data, packet.dataType, environment);
620+
await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment);
603621

604622
return {
605623
data: filename,

references/v3-catalog/src/trigger/batch.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export const batchChildTask = task({
5555
retry: {
5656
maxAttempts: 2,
5757
},
58-
run: async (payload: string, { ctx }) => {
58+
run: async (payload: any, { ctx }) => {
5959
logger.info("Processing child task", { payload });
6060

6161
await wait.for({ seconds: 1 });

0 commit comments

Comments
 (0)