Skip to content

Run engine: Using root queue timestamp to prioritize completing runs #1818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions apps/webapp/app/models/taskRunTag.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { prisma } from "~/db.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
import { PrismaClientOrTransaction } from "@trigger.dev/database";

export const MAX_TAGS_PER_RUN = 10;

export async function createTag({ tag, projectId }: { tag: string; projectId: string }) {
export async function createTag(
{ tag, projectId }: { tag: string; projectId: string },
prismaClient: PrismaClientOrTransaction = prisma
) {
if (tag.trim().length === 0) return;
return prisma.taskRunTag.upsert({
return prismaClient.taskRunTag.upsert({
where: {
projectId_name: {
projectId: projectId,
Expand All @@ -21,6 +25,48 @@ export async function createTag({ tag, projectId }: { tag: string; projectId: st
});
}

export type TagRecord = {
id: string;
name: string;
};

export async function createTags(
{
tags,
projectId,
}: {
tags: string | string[] | undefined;
projectId: string;
},
prismaClient: PrismaClientOrTransaction = prisma
): Promise<TagRecord[]> {
if (!tags) {
return [];
}

const tagsArray = typeof tags === "string" ? [tags] : tags;

if (tagsArray.length === 0) {
return [];
}

const tagRecords: TagRecord[] = [];
for (const tag of tagsArray) {
const tagRecord = await createTag(
{
tag,
projectId,
},
prismaClient
);
if (tagRecord) {
tagRecords.push({ id: tagRecord.id, name: tagRecord.name });
}
}

return tagRecords;
}

export async function getTagsForRunId({
friendlyId,
environmentId,
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
import { BatchTriggerV4Service } from "~/v3/services/batchTriggerV4.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";

const { action, loader } = createActionApiRoute(
{
Expand Down Expand Up @@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute(
? { traceparent, tracestate }
: undefined;

const service = new BatchTriggerV4Service(batchProcessingStrategy ?? undefined);
const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { getEntitlement } from "~/services/platform.v3.server";
import { workerQueue } from "~/services/worker.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server";
import { startActiveSpan } from "../tracer.server";
import { ServiceValidationError, WithRunEngine } from "./baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
import { startActiveSpan } from "../../v3/tracer.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";

const PROCESSING_BATCH_SIZE = 50;
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;
Expand Down Expand Up @@ -49,7 +49,7 @@ export type BatchTriggerTaskServiceOptions = {
/**
* Larger batches, used in Run Engine v2
*/
export class BatchTriggerV4Service extends WithRunEngine {
export class RunEngineBatchTriggerService extends WithRunEngine {
private _batchProcessingStrategy: BatchProcessingStrategy;

constructor(
Expand Down Expand Up @@ -643,7 +643,7 @@ export class BatchTriggerV4Service extends WithRunEngine {
}

async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
await workerQueue.enqueue("v3.processBatchTaskRunV3", options, {
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
tx,
jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RunEngine, RunDuplicateIdempotencyKeyError } from "@internal/run-engine";
import { RunDuplicateIdempotencyKeyError, RunEngine } from "@internal/run-engine";
import {
IOPacket,
packetRequiresOffloading,
Expand All @@ -14,34 +14,33 @@ import {
sanitizeQueueName,
stringifyDuration,
} from "@trigger.dev/core/v3/isomorphic";
import { Prisma, TaskRun } from "@trigger.dev/database";
import { Prisma } from "@trigger.dev/database";
import { env } from "~/env.server";
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { createTags, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
import { logger } from "~/services/logger.server";
import { getEntitlement } from "~/services/platform.v3.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { handleMetadataPacket } from "~/utils/packets";
import { eventRepository } from "../eventRepository.server";
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
import { uploadPacketToObjectStore } from "../r2.server";
import { isFinalRunStatus } from "../taskStatus";
import { startActiveSpan } from "../tracer.server";
import { clampMaxDuration } from "../utils/maxDuration";
import { ServiceValidationError, WithRunEngine } from "./baseService.server";
import { eventRepository } from "../../v3/eventRepository.server";
import { findCurrentWorkerFromEnvironment } from "../../v3/models/workerDeployment.server";
import { uploadPacketToObjectStore } from "../../v3/r2.server";
import { getTaskEventStore } from "../../v3/taskEventStore.server";
import { isFinalRunStatus } from "../../v3/taskStatus";
import { startActiveSpan } from "../../v3/tracer.server";
import { clampMaxDuration } from "../../v3/utils/maxDuration";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import {
MAX_ATTEMPTS,
OutOfEntitlementError,
TriggerTaskServiceOptions,
TriggerTaskServiceResult,
} from "./triggerTask.server";
import { WorkerGroupService } from "./worker/workerGroupService.server";
import { getTaskEventStore } from "../taskEventStore.server";
} from "../../v3/services/triggerTask.server";
import { WorkerGroupService } from "../../v3/services/worker/workerGroupService.server";

/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */
export class TriggerTaskServiceV2 extends WithRunEngine {
export class RunEngineTriggerTaskService extends WithRunEngine {
public async call({
taskId,
environment,
Expand Down Expand Up @@ -299,20 +298,13 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
span.setAttribute("queueName", queueName);

//upsert tags
let tags: { id: string; name: string }[] = [];
const bodyTags =
typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags;
if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
const tagRecord = await createTag({
tag,
projectId: environment.projectId,
});
if (tagRecord) {
tags.push(tagRecord);
}
}
}
const tags = await createTags(
{
tags: body.options?.tags,
projectId: environment.projectId,
},
this._prisma
);

const depth = parentRun ? parentRun.depth + 1 : 0;

Expand Down Expand Up @@ -372,6 +364,10 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
machine: body.options?.machine,
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
releaseConcurrency: body.options?.releaseConcurrency,
queueTimestamp:
parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined,
},
this._prisma
);
Expand Down
12 changes: 6 additions & 6 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import { reportInvocationUsage } from "./platform.v3.server";
import { logger } from "./logger.server";
import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
import {
BatchProcessingOptions as BatchProcessingOptionsV4,
BatchTriggerV4Service,
} from "~/v3/services/batchTriggerV4.server";
BatchProcessingOptions as RunEngineBatchProcessingOptions,
RunEngineBatchTriggerService,
} from "~/runEngine/services/batchTrigger.server";

const workerCatalog = {
scheduleEmail: DeliverEmailSchema,
Expand Down Expand Up @@ -99,7 +99,7 @@ const workerCatalog = {
}),
"v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions,
"v3.processBatchTaskRun": BatchProcessingOptions,
"v3.processBatchTaskRunV3": BatchProcessingOptionsV4,
"runengine.processBatchTaskRun": RunEngineBatchProcessingOptions,
};

let workerQueue: ZodWorker<typeof workerCatalog>;
Expand Down Expand Up @@ -341,11 +341,11 @@ function getWorkerQueue() {
await service.processBatchTaskRun(payload);
},
},
"v3.processBatchTaskRunV3": {
"runengine.processBatchTaskRun": {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV4Service(payload.strategy);
const service = new RunEngineBatchTriggerService(payload.strategy);

await service.processBatchTaskRun(payload);
},
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { determineEngineVersion } from "../engineVersion.server";
import { WithRunEngine } from "./baseService.server";
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
import { TriggerTaskServiceV2 } from "./triggerTaskV2.server";
import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Updated task service to support queue timestamp prioritization

The code has been updated to use the new RunEngineTriggerTaskService instead of the previous TriggerTaskServiceV2, incorporating the system for using queue timestamps to prioritize runs. The service is now initialized with both Prisma and the run engine instance, providing better integration with the run engine.

Also applies to: 81-84


🏁 Script executed:

#!/bin/bash
# Check to ensure there are no references to the old TriggerTaskServiceV2 that might need updating
rg "TriggerTaskServiceV2" --type ts

Length of output: 145


Action Required: Resolve Remaining Reference to Deprecated Service

  • The service update in apps/webapp/app/v3/services/triggerTask.server.ts correctly imports and uses RunEngineTriggerTaskService for queue timestamp prioritization.
  • However, the search output shows that TriggerTaskServiceV2 still exists in apps/webapp/app/v3/services/triggerTaskV2.server.ts. Please verify whether this file should be updated or removed.
  • Also, ensure that any usages of TriggerTaskServiceV2 in the referenced code (e.g., lines 81-84) are also updated to the new service implementation.


export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -78,7 +78,7 @@ export class TriggerTaskService extends WithRunEngine {
body: TriggerTaskRequestBody,
options: TriggerTaskServiceOptions = {}
): Promise<TriggerTaskServiceResult | undefined> {
const service = new TriggerTaskServiceV2({
const service = new RunEngineTriggerTaskService({
prisma: this._prisma,
engine: this._engine,
});
Expand Down
3 changes: 2 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ export class RunEngine {
maxAttempts,
taskEventStore,
priorityMs,
queueTimestamp,
ttl,
tags,
parentTaskRunId,
Expand Down Expand Up @@ -414,6 +415,7 @@ export class RunEngine {
maxAttempts,
taskEventStore,
priorityMs,
queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(),
ttl,
tags:
tags.length === 0
Expand Down Expand Up @@ -520,7 +522,6 @@ export class RunEngine {
await this.enqueueSystem.enqueueRun({
run: taskRun,
env: environment,
timestamp: Date.now() - taskRun.priorityMs,
workerId,
runnerId,
tx: prisma,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ export class CheckpointSystem {
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
timestamp: run.createdAt.getTime() - run.priorityMs,
snapshot: {
status: "QUEUED",
description:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ export class DelayedRunSystem {
await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
timestamp: run.createdAt.getTime() - run.priorityMs,
batchId: run.batchId ?? undefined,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export class EnqueueSystem {
public async enqueueRun({
run,
env,
timestamp,
tx,
snapshot,
previousSnapshotId,
Expand All @@ -37,7 +36,6 @@ export class EnqueueSystem {
}: {
run: TaskRun;
env: MinimalAuthenticatedEnvironment;
timestamp: number;
tx?: PrismaClientOrTransaction;
snapshot?: {
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
Expand Down Expand Up @@ -81,6 +79,8 @@ export class EnqueueSystem {
masterQueues.push(run.secondaryMasterQueue);
}

const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;

await this.$.runQueue.enqueueMessage({
env,
masterQueues,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ export class PendingVersionSystem {
await this.enqueueSystem.enqueueRun({
run: updatedRun,
env: backgroundWorker.runtimeEnvironment,
//add to the queue using the original run created time
//this should ensure they're in the correct order in the queue
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
tx,
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ export class WaitpointSystem {
await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
timestamp: run.createdAt.getTime() - run.priorityMs,
snapshot: {
status: "QUEUED_EXECUTING",
description: "Run can continue, but is waiting for concurrency",
Expand All @@ -564,7 +563,6 @@ export class WaitpointSystem {
await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
timestamp: run.createdAt.getTime() - run.priorityMs,
snapshot: {
description: "Run was QUEUED, because all waitpoints are completed",
},
Expand Down
Loading