diff --git a/.changeset/thick-poets-yawn.md b/.changeset/thick-poets-yawn.md new file mode 100644 index 0000000000..56f1151b54 --- /dev/null +++ b/.changeset/thick-poets-yawn.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Specify a region override when triggering a run diff --git a/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts b/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts index 7f69298774..c304597bb1 100644 --- a/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts @@ -30,7 +30,7 @@ export class RegionsPresenter extends BasePresenter { id: true, organizationId: true, defaultWorkerGroupId: true, - allowedMasterQueues: true, + allowedWorkerQueues: true, }, where: { slug: projectSlug, @@ -70,9 +70,9 @@ export class RegionsPresenter extends BasePresenter { where: isAdmin ? undefined : // Hide hidden unless they're allowed to use them - project.allowedMasterQueues.length > 0 + project.allowedWorkerQueues.length > 0 ? { - masterQueue: { in: project.allowedMasterQueues }, + masterQueue: { in: project.allowedWorkerQueues }, } : { hidden: false, diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a3a5747846..a00ffa3f3c 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -1,11 +1,11 @@ import { - MachinePreset, + type MachinePreset, prettyPrintPacket, SemanticInternalAttributes, - TaskRunContext, + type TaskRunContext, TaskRunError, TriggerTraceContext, - V3TaskRunContext, + type V3TaskRunContext, } from "@trigger.dev/core/v3"; import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; @@ -176,6 +176,22 @@ export class SpanPresenter extends BasePresenter { const externalTraceId = this.#getExternalTraceId(run.traceContext); + let region: { name: string; location: string | null } | null = null; + + if (run.runtimeEnvironment.type !== "DEVELOPMENT" && run.engine !== "V1") { + const workerGroup = await this._replica.workerInstanceGroup.findFirst({ + select: { + name: true, + location: true, + }, + where: { + masterQueue: run.workerQueue, + }, + }); + + region = workerGroup ?? null; + } + return { id: run.id, friendlyId: run.friendlyId, @@ -233,6 +249,7 @@ export class SpanPresenter extends BasePresenter { maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds), batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined, engine: run.engine, + region, workerQueue: run.workerQueue, spanId: run.spanId, isCached: !!span.originalRun, diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts index 1584d1accc..0897c30c21 100644 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts +++ b/apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts @@ -59,7 +59,12 @@ export async function action({ request }: ActionFunctionArgs) { throw new Error("Runs replication instance not found"); } - await runsReplicationInstance.backfill(runs); + await runsReplicationInstance.backfill( + runs.map((run) => ({ + ...run, + masterQueue: run.workerQueue, + })) + ); logger.info("Backfilled runs", { runs }); diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 4e232b888f..904f5c508b 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -76,6 +76,7 @@ import { } from "~/utils/pathBuilder"; import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; import { CompleteWaitpointForm } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.$waitpointFriendlyId.complete/route"; +import { FlagIcon } from "~/assets/icons/RegionIcons"; export const loader = async ({ request, params }: LoaderFunctionArgs) => { const { projectParam, organizationSlug, envParam, runParam, spanParam } = @@ -701,6 +702,19 @@ function RunBody({ + {run.region && ( + + Region + + + {run.region.location ? ( + + ) : null} + {run.region.name} + + + + )} Run invocation cost diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index c8bda40d22..60cf20b14f 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -14,6 +14,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv import type { RunEngine } from "~/v3/runEngine.server"; import { env } from "~/env.server"; import { EngineServiceValidationError } from "./errors"; +import { tryCatch } from "@trigger.dev/core/v3"; export class DefaultQueueManager implements QueueManager { constructor( @@ -196,7 +197,10 @@ export class DefaultQueueManager implements QueueManager { }; } - async getWorkerQueue(environment: AuthenticatedEnvironment): Promise { + async getWorkerQueue( + environment: AuthenticatedEnvironment, + regionOverride?: string + ): Promise { if (environment.type === "DEVELOPMENT") { return environment.id; } @@ -206,9 +210,16 @@ export class DefaultQueueManager implements QueueManager { engine: this.engine, }); - const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ - projectId: environment.projectId, - }); + const [error, workerGroup] = await tryCatch( + workerGroupService.getDefaultWorkerGroupForProject({ + projectId: environment.projectId, + regionOverride, + }) + ); + + if (error) { + throw new EngineServiceValidationError(error.message); + } if (!workerGroup) { throw new EngineServiceValidationError("No worker group found"); diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index ea41c12c79..5ec3f29dd8 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService { const depth = parentRun ? parentRun.depth + 1 : 0; - const workerQueue = await this.queueConcern.getWorkerQueue(environment); + const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); try { return await this.traceEventConcern.traceRun(triggerRequest, async (event) => { diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index 3564a5d717..40a70678e0 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -67,7 +67,10 @@ export interface QueueManager { ): Promise; getQueueName(request: TriggerTaskRequest): Promise; validateQueueLimits(env: AuthenticatedEnvironment): Promise; - getWorkerQueue(env: AuthenticatedEnvironment): Promise; + getWorkerQueue( + env: AuthenticatedEnvironment, + regionOverride?: string + ): Promise; } export interface PayloadProcessor { diff --git a/apps/webapp/app/services/runsBackfiller.server.ts b/apps/webapp/app/services/runsBackfiller.server.ts index a566b44bb3..7fc824f3d3 100644 --- a/apps/webapp/app/services/runsBackfiller.server.ts +++ b/apps/webapp/app/services/runsBackfiller.server.ts @@ -73,7 +73,12 @@ export class RunsBackfillerService { lastCreatedAt: runs[runs.length - 1].createdAt, }); - await this.runsReplicationInstance.backfill(runs); + await this.runsReplicationInstance.backfill( + runs.map((run) => ({ + ...run, + masterQueue: run.workerQueue, + })) + ); const lastRun = runs[runs.length - 1]; diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 60badb2ebc..b9eeeab256 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -59,7 +59,13 @@ export type RunsReplicationServiceOptions = { insertMaxDelayMs?: number; }; -type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" }; +type PostgresTaskRun = TaskRun & { masterQueue: string }; + +type TaskRunInsert = { + _version: bigint; + run: PostgresTaskRun; + event: "insert" | "update" | "delete"; +}; export type RunsReplicationServiceEvents = { message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }]; @@ -243,7 +249,7 @@ export class RunsReplicationService { } } - async backfill(runs: TaskRun[]) { + async backfill(runs: PostgresTaskRun[]) { // divide into batches of 50 to get data from Postgres const flushId = nanoid(); // Use current timestamp as LSN (high enough to be above existing data) @@ -352,7 +358,7 @@ export class RunsReplicationService { const replicationLagMs = Date.now() - Number(message.commitTime / 1000n); this._currentTransaction.commitEndLsn = message.commitEndLsn; this._currentTransaction.replicationLagMs = replicationLagMs; - const transaction = this._currentTransaction as Transaction; + const transaction = this._currentTransaction as Transaction; this._currentTransaction = null; if (transaction.commitEndLsn) { @@ -370,7 +376,7 @@ export class RunsReplicationService { } } - #handleTransaction(transaction: Transaction) { + #handleTransaction(transaction: Transaction) { if (this._isShutDownComplete) return; if (this._isShuttingDown) { @@ -764,7 +770,7 @@ export class RunsReplicationService { } async #prepareTaskRunInsert( - run: TaskRun, + run: PostgresTaskRun, organizationId: string, environmentType: string, event: "insert" | "update" | "delete", @@ -814,6 +820,7 @@ export class RunsReplicationService { output, concurrency_key: run.concurrencyKey ?? "", bulk_action_group_ids: run.bulkActionGroupIds ?? [], + worker_queue: run.masterQueue, _version: _version.toString(), _is_deleted: event === "delete" ? 1 : 0, }; diff --git a/apps/webapp/app/v3/services/setDefaultRegion.server.ts b/apps/webapp/app/v3/services/setDefaultRegion.server.ts index 8e4eb5b258..cada819452 100644 --- a/apps/webapp/app/v3/services/setDefaultRegion.server.ts +++ b/apps/webapp/app/v3/services/setDefaultRegion.server.ts @@ -32,8 +32,8 @@ export class SetDefaultRegionService extends BaseService { // If their project is restricted, only allow them to set default regions that are allowed if (!isAdmin) { - if (project.allowedMasterQueues.length > 0) { - if (!project.allowedMasterQueues.includes(workerGroup.masterQueue)) { + if (project.allowedWorkerQueues.length > 0) { + if (!project.allowedWorkerQueues.includes(workerGroup.masterQueue)) { throw new ServiceValidationError("You're not allowed to set this region as default"); } } else if (workerGroup.hidden) { diff --git a/apps/webapp/app/v3/services/worker/workerGroupService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupService.server.ts index e33c3056fe..f05c8783ec 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupService.server.ts @@ -195,10 +195,12 @@ export class WorkerGroupService extends WithRunEngine { async getDefaultWorkerGroupForProject({ projectId, + regionOverride, }: { projectId: string; + regionOverride?: string; }): Promise { - const project = await this._prisma.project.findUnique({ + const project = await this._prisma.project.findFirst({ where: { id: projectId, }, @@ -208,8 +210,39 @@ export class WorkerGroupService extends WithRunEngine { }); if (!project) { - logger.error("[WorkerGroupService] Project not found", { projectId }); - return; + throw new Error("Project not found."); + } + + // If they've specified a region, we need to check they have access to it + if (regionOverride) { + const workerGroup = await this._prisma.workerInstanceGroup.findFirst({ + where: { + masterQueue: regionOverride, + }, + }); + + if (!workerGroup) { + throw new Error(`The region you specified doesn't exist ("${regionOverride}").`); + } + + // If they're restricted, check they have access + if (project.allowedWorkerQueues.length > 0) { + if (project.allowedWorkerQueues.includes(workerGroup.masterQueue)) { + return workerGroup; + } + + throw new Error( + `You don't have access to this region ("${regionOverride}"). You can use the following regions: ${project.allowedWorkerQueues.join( + ", " + )}.` + ); + } + + if (workerGroup.hidden) { + throw new Error(`The region you specified isn't available to you ("${regionOverride}").`); + } + + return workerGroup; } if (project.defaultWorkerGroup) { diff --git a/docs/triggering.mdx b/docs/triggering.mdx index 235383796f..cd263eac97 100644 --- a/docs/triggering.mdx +++ b/docs/triggering.mdx @@ -980,6 +980,18 @@ View our [metadata doc](/runs/metadata) for more information. View our [maxDuration doc](/runs/max-duration) for more information. +### `region` + +You can override the default region when you trigger a run: + +```ts +await yourTask.trigger(payload, { region: "eu-central-1" }); +``` + +If you don't specify a region it will use the default for your project. Go to the "Regions" page in the dashboard to see available regions or switch your default. + +The region is where your runs are executed, it does not change where the run payload, output, tags, logs, or are any other data is stored. + ## Large Payloads We recommend keeping your task payloads as small as possible. We currently have a hard limit on task payloads above 10MB. diff --git a/internal-packages/clickhouse/schema/006_add_task_runs_v2_workerqueue.sql b/internal-packages/clickhouse/schema/006_add_task_runs_v2_workerqueue.sql new file mode 100644 index 0000000000..840ab78575 --- /dev/null +++ b/internal-packages/clickhouse/schema/006_add_task_runs_v2_workerqueue.sql @@ -0,0 +1,10 @@ +-- +goose Up +/* +Add worker_queue column. + */ +ALTER TABLE trigger_dev.task_runs_v2 +ADD COLUMN worker_queue String DEFAULT ''; + +-- +goose Down +ALTER TABLE trigger_dev.task_runs_v2 +DROP COLUMN worker_queue; \ No newline at end of file diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 1d11477208..2363c691fd 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -44,6 +44,7 @@ export const TaskRunV2 = z.object({ is_test: z.boolean().default(false), concurrency_key: z.string().default(""), bulk_action_group_ids: z.array(z.string()).default([]), + worker_queue: z.string().default(""), _version: z.string(), _is_deleted: z.number().int().default(0), }); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 11018e20a4..ba69f0f04f 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -336,7 +336,7 @@ model Project { defaultWorkerGroupId String? /// The master queues they are allowed to use (impacts what they can set as default and trigger runs with) - allowedMasterQueues String[] @default([]) + allowedWorkerQueues String[] @default([]) @map("allowedMasterQueues") environments RuntimeEnvironment[] backgroundWorkers BackgroundWorker[] diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index ce0f8abe4d..08bc64b8f0 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -210,6 +210,7 @@ export class RunAttemptSystem { parentTaskRunId: true, rootTaskRunId: true, batchId: true, + workerQueue: true, }, }); @@ -261,6 +262,7 @@ export class RunAttemptSystem { priority: run.priorityMs === 0 ? undefined : run.priorityMs / 1_000, parentTaskRunId: run.parentTaskRunId ? RunId.toFriendlyId(run.parentTaskRunId) : undefined, rootTaskRunId: run.rootTaskRunId ? RunId.toFriendlyId(run.rootTaskRunId) : undefined, + region: run.runtimeEnvironment.type !== "DEVELOPMENT" ? run.workerQueue : undefined, }, attempt: { number: run.attemptNumber ?? 1, @@ -428,6 +430,7 @@ export class RunAttemptSystem { }, parentTaskRunId: true, rootTaskRunId: true, + workerQueue: true, }, }); @@ -574,6 +577,10 @@ export class RunAttemptSystem { rootTaskRunId: updatedRun.rootTaskRunId ? RunId.toFriendlyId(updatedRun.rootTaskRunId) : undefined, + region: + updatedRun.runtimeEnvironment.type !== "DEVELOPMENT" + ? updatedRun.workerQueue + : undefined, }, task, queue, diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index cdf455cbfe..7fde77c41c 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -134,6 +134,7 @@ export const TriggerTaskRequestBody = z.object({ ttl: z.string().or(z.number().nonnegative().int()).optional(), priority: z.number().optional(), bulkActionId: z.string().optional(), + region: z.string().optional(), }) .optional(), }); @@ -181,6 +182,7 @@ export const BatchTriggerTaskItem = z.object({ test: z.boolean().optional(), ttl: z.string().or(z.number().nonnegative().int()).optional(), priority: z.number().optional(), + region: z.string().optional(), }) .optional(), }); diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 2928995606..41a095648f 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -229,6 +229,8 @@ export const TaskRun = z.object({ // These are only used during execution, not in run.ctx durationMs: z.number().optional(), costInCents: z.number().optional(), + + region: z.string().optional(), }); export type TaskRun = z.infer; diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index 66c9d98d5f..5a527b9471 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -855,6 +855,21 @@ export type TriggerOptions = { * to the same version as the parent task that is triggering the child tasks. */ version?: string; + + /** + * Specify the region to run the task in. This overrides the default region set for your project in the dashboard. + * + * Check the Regions page in the dashboard for regions that are available to you. + * + * In DEV this won't do anything, so it's fine to set it in your code. + * + * @example + * + * ```ts + * await myTask.trigger({ foo: "bar" }, { region: "us-east-1" }); + * ``` + */ + region?: string; }; export type TriggerAndWaitOptions = Omit; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 05300dc4f9..1a05d766cb 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -627,6 +627,7 @@ export async function batchTriggerById( idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, machine: item.options?.machine, priority: item.options?.priority, + region: item.options?.region, lockToVersion: item.options?.version ?? getEnvVar("TRIGGER_VERSION"), }, } satisfies BatchTriggerTaskV2RequestBody["items"][0]; @@ -796,6 +797,7 @@ export async function batchTriggerByIdAndWait( idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, machine: item.options?.machine, priority: item.options?.priority, + region: item.options?.region, }, } satisfies BatchTriggerTaskV2RequestBody["items"][0]; }) @@ -955,6 +957,7 @@ export async function batchTriggerTasks( idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, machine: item.options?.machine, priority: item.options?.priority, + region: item.options?.region, lockToVersion: item.options?.version ?? getEnvVar("TRIGGER_VERSION"), }, } satisfies BatchTriggerTaskV2RequestBody["items"][0]; @@ -1126,6 +1129,7 @@ export async function batchTriggerAndWaitTasks( parentRunId: taskContext.ctx?.run.id, machine: options?.machine, priority: options?.priority, + region: options?.region, lockToVersion: options?.version ?? getEnvVar("TRIGGER_VERSION"), }, }, @@ -1270,6 +1275,7 @@ async function batchTrigger_internal( idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL, machine: item.options?.machine, priority: item.options?.priority, + region: item.options?.region, lockToVersion: item.options?.version ?? getEnvVar("TRIGGER_VERSION"), }, } satisfies BatchTriggerTaskV2RequestBody["items"][0]; @@ -1354,6 +1360,7 @@ async function triggerAndWait_internal { + await fixedLengthTask.triggerAndWait({ waitSeconds: 1 }, { region }); + }, +});