diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts index cc7bb3bd0e..ef3cabe64d 100644 --- a/apps/webapp/app/v3/scheduleEngine.server.ts +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -7,6 +7,7 @@ import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { TriggerTaskService } from "./services/triggerTask.server"; import { meter, tracer } from "./tracer.server"; +import { workerQueue } from "~/services/worker.server"; export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine); @@ -117,7 +118,24 @@ function createScheduleEngine() { } }, isDevEnvironmentConnectedHandler: isDevEnvironmentConnectedHandler, + onRegisterScheduleInstance: removeDeprecatedWorkerQueueItem, }); return engine; } + +async function removeDeprecatedWorkerQueueItem(instanceId: string) { + // We need to dequeue the instance from the existing workerQueue + try { + await workerQueue.dequeue(`scheduled-task-instance:${instanceId}`); + + logger.debug("Removed deprecated worker queue item", { + instanceId, + }); + } catch (error) { + logger.error("Error dequeuing scheduled task instance from deprecated queue", { + instanceId, + error: error instanceof Error ? error.message : String(error), + }); + } +} diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 3efafe73ca..47dc1e31e1 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -8,7 +8,7 @@ import { Tracer, } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; -import { PrismaClient, TaskSchedule, TaskScheduleInstance } from "@trigger.dev/database"; +import { PrismaClient } from "@trigger.dev/database"; import { Worker, type JobHandlerParams } from "@trigger.dev/redis-worker"; import { calculateDistributedExecutionTime } from "./distributedScheduling.js"; import { calculateNextScheduledTimestamp, nextScheduledTimestamps } from "./scheduleCalculation.js"; @@ -122,6 +122,19 @@ export class ScheduleEngine { return startSpan(this.tracer, "registerNextTaskScheduleInstance", async (span) => { const startTime = Date.now(); + if (this.options.onRegisterScheduleInstance) { + const [registerError] = await tryCatch( + this.options.onRegisterScheduleInstance(params.instanceId) + ); + + if (registerError) { + this.logger.error("Error calling the onRegisterScheduleInstance callback", { + instanceId: params.instanceId, + error: registerError, + }); + } + } + span.setAttribute("instanceId", params.instanceId); this.logger.debug("Starting schedule registration", { @@ -382,10 +395,11 @@ export class ScheduleEngine { span.setAttribute("skip_reason", skipReason); } - if (shouldTrigger) { - const scheduleTimestamp = - params.exactScheduleTime ?? instance.nextScheduledTimestamp ?? new Date(); + // Calculate the schedule timestamp that will be used (regardless of whether we trigger or not) + const scheduleTimestamp = + params.exactScheduleTime ?? instance.nextScheduledTimestamp ?? new Date(); + if (shouldTrigger) { const payload = { scheduleId: instance.taskSchedule.friendlyId, type: instance.taskSchedule.type as "DECLARATIVE" | "IMPERATIVE", @@ -427,7 +441,7 @@ export class ScheduleEngine { payload, scheduleInstanceId: instance.id, scheduleId: instance.taskSchedule.id, - exactScheduleTime: params.exactScheduleTime, + exactScheduleTime: scheduleTimestamp, }) ); @@ -523,7 +537,7 @@ export class ScheduleEngine { id: params.instanceId, }, data: { - lastScheduledTimestamp: instance.nextScheduledTimestamp, + lastScheduledTimestamp: scheduleTimestamp, }, }); diff --git a/internal-packages/schedule-engine/src/engine/types.ts b/internal-packages/schedule-engine/src/engine/types.ts index 6b62fd4175..8cad80c303 100644 --- a/internal-packages/schedule-engine/src/engine/types.ts +++ b/internal-packages/schedule-engine/src/engine/types.ts @@ -48,6 +48,7 @@ export interface ScheduleEngineOptions { meter?: Meter; onTriggerScheduledTask: TriggerScheduledTaskCallback; isDevEnvironmentConnectedHandler: (environmentId: string) => Promise; + onRegisterScheduleInstance?: (instanceId: string) => Promise; } export interface UpsertScheduleParams {