diff --git a/.changeset/short-tomatoes-beam.md b/.changeset/short-tomatoes-beam.md new file mode 100644 index 0000000000..6760f79ad8 --- /dev/null +++ b/.changeset/short-tomatoes-beam.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Add otel propagation headers "below" the API fetch span, to attribute the child runs with the proper parent span ID diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 7dd0260496..c9c2e8e231 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -835,7 +835,8 @@ export class EventRepository { options: TraceEventOptions & { incomplete?: boolean }, callback: ( e: EventBuilder, - traceContext: Record + traceContext: Record, + traceparent?: { traceId: string; spanId: string } ) => Promise ): Promise { const propagatedContext = extractContextFromCarrier(options.context ?? {}); @@ -892,7 +893,7 @@ export class EventRepository { }, }; - const result = await callback(eventBuilder, traceContext); + const result = await callback(eventBuilder, traceContext, propagatedContext?.traceparent); const duration = process.hrtime.bigint() - start; diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 01cdbed3be..177e1279ac 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -240,7 +240,7 @@ export class TriggerTaskService extends BaseService { incomplete: true, immediate: true, }, - async (event, traceContext) => { + async (event, traceContext, traceparent) => { const run = await autoIncrementCounter.incrementInTransaction( `v3-run:${environment.id}:${taskId}`, async (num, tx) => { @@ -307,6 +307,8 @@ export class TriggerTaskService extends BaseService { traceContext: traceContext, traceId: event.traceId, spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId, lockedToVersionId: lockedToBackgroundWorker?.id, concurrencyKey: body.options?.concurrencyKey, queue: queueName, diff --git a/packages/cli-v3/src/config.ts b/packages/cli-v3/src/config.ts index 6cab7dd45e..fc52cc3d9a 100644 --- a/packages/cli-v3/src/config.ts +++ b/packages/cli-v3/src/config.ts @@ -228,7 +228,7 @@ function validateConfig(config: TriggerConfig, warn = true) { if (config.additionalFiles && config.additionalFiles.length > 0) { warn && prettyWarning( - `The "additionalFiles" option is deprecated and will be removed. Use the "additionalFiles" build extension instead. See https://trigger.dev/docs/guides/new-build-system-preview#additionalfiles for more information.` + `The "additionalFiles" option is deprecated and will be removed. Use the "additionalFiles" build extension instead. See https://trigger.dev/docs/config/config-file#additionalfiles for more information.` ); config.build ??= {}; @@ -239,7 +239,7 @@ function validateConfig(config: TriggerConfig, warn = true) { if (config.additionalPackages && config.additionalPackages.length > 0) { warn && prettyWarning( - `The "additionalPackages" option is deprecated and will be removed. Use the "additionalPackages" build extension instead. See https://trigger.dev/docs/guides/new-build-system-preview#additionalpackages for more information.` + `The "additionalPackages" option is deprecated and will be removed. Use the "additionalPackages" build extension instead. See https://trigger.dev/docs/config/config-file#additionalpackages for more information.` ); config.build ??= {}; @@ -275,7 +275,7 @@ function validateConfig(config: TriggerConfig, warn = true) { if ("resolveEnvVars" in config && typeof config.resolveEnvVars === "function") { warn && prettyWarning( - `The "resolveEnvVars" option is deprecated and will be removed. Use the "syncEnvVars" build extension instead. See https://trigger.dev/docs/guides/new-build-system-preview#resolveenvvars for more information.` + `The "resolveEnvVars" option is deprecated and will be removed. Use the "syncEnvVars" build extension instead. See https://trigger.dev/docs/config/config-file#syncenvvars for more information.` ); const resolveEnvVarsFn = config.resolveEnvVars as ResolveEnvironmentVariablesFunction; diff --git a/packages/core/src/v3/apiClient/core.ts b/packages/core/src/v3/apiClient/core.ts index 888edcad89..e1c88c051b 100644 --- a/packages/core/src/v3/apiClient/core.ts +++ b/packages/core/src/v3/apiClient/core.ts @@ -4,7 +4,7 @@ import { RetryOptions } from "../schemas/index.js"; import { calculateNextRetryDelay } from "../utils/retries.js"; import { ApiConnectionError, ApiError, ApiSchemaValidationError } from "./errors.js"; -import { Attributes, Span } from "@opentelemetry/api"; +import { Attributes, Span, context, propagation } from "@opentelemetry/api"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { TriggerTracer } from "../tracer.js"; import { accessoryAttributes } from "../utils/styleAttributes.js"; @@ -184,9 +184,11 @@ async function _doZodFetch( requestInit?: PromiseOrValue, options?: ZodFetchOptions ): Promise>> { - const $requestInit = await requestInit; + let $requestInit = await requestInit; return traceZodFetch({ url, requestInit: $requestInit, options }, async (span) => { + $requestInit = injectPropagationHeadersIfInWorker($requestInit); + const result = await _doZodFetchWithRetries(schema, url, $requestInit, options); if (options?.onResponseBody && span) { @@ -577,3 +579,23 @@ export function isEmptyObj(obj: Object | null | undefined): boolean { export function hasOwn(obj: Object, key: string): boolean { return Object.prototype.hasOwnProperty.call(obj, key); } + +// If the requestInit has a header x-trigger-worker = true, then we will do +// propagation.inject(context.active(), headers); +// and return the new requestInit. +function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined { + const headers = new Headers(requestInit?.headers); + + if (headers.get("x-trigger-worker") !== "true") { + return requestInit; + } + + const headersObject = Object.fromEntries(headers.entries()); + + propagation.inject(context.active(), headersObject); + + return { + ...requestInit, + headers: new Headers(headersObject), + }; +} diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index aafc5b7ed0..7a91915684 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -1,4 +1,3 @@ -import { context, propagation } from "@opentelemetry/api"; import { z } from "zod"; import { AddTagsRequestBody, @@ -509,7 +508,6 @@ export class ApiClient { // Only inject the context if we are inside a task if (taskContext.isInsideTask) { headers["x-trigger-worker"] = "true"; - propagation.inject(context.active(), headers); if (spanParentAsLink) { headers["x-trigger-span-parent-as-link"] = "1"; diff --git a/packages/database/README.md b/packages/database/README.md new file mode 100644 index 0000000000..8f15e7ebb1 --- /dev/null +++ b/packages/database/README.md @@ -0,0 +1,38 @@ +## @trigger.dev/database + +This is the internal database package for the Trigger.dev project. It exports a generated prisma client that can be instantiated with a connection string. + +### How to add a new index on a large table + +1. Modify the Prisma.schema with a single index change (no other changes, just one index at a time) +2. Create a Prisma migration using `cd packages/database && pnpm run db:migrate:dev --create-only` +3. Modify the SQL file: add IF NOT EXISTS to it and CONCURRENTLY: + +```sql +CREATE INDEX CONCURRENTLY IF NOT EXISTS "JobRun_eventId_idx" ON "JobRun" ("eventId"); +``` + +4. Don’t apply the Prisma migration locally yet. This is a good opportunity to test the flow. +5. Manually apply the index to your database, by running the index command. +6. Then locally run `pnpm run db:migrate:deploy` + +#### Before deploying + +Run the index creation before deploying + +```sql +CREATE INDEX CONCURRENTLY IF NOT EXISTS "JobRun_eventId_idx" ON "JobRun" ("eventId"); +``` + +These commands are useful: + +```sql +-- creates an index safely, this can take a long time (2 mins maybe) +CREATE INDEX CONCURRENTLY IF NOT EXISTS "JobRun_eventId_idx" ON "JobRun" ("eventId"); +-- checks the status of an index +SELECT * FROM pg_stat_progress_create_index WHERE relid = '"JobRun"'::regclass; +-- checks if the index is there +SELECT * FROM pg_indexes WHERE tablename = 'JobRun' AND indexname = 'JobRun_eventId_idx'; +``` + +Now, when you deploy and prisma runs the migration, it will skip the index creation because it already exists. If you don't do this, there will be pain. diff --git a/packages/database/prisma/migrations/20240924125845_add_root_task_run_id_index/migration.sql b/packages/database/prisma/migrations/20240924125845_add_root_task_run_id_index/migration.sql new file mode 100644 index 0000000000..e5bbdcd20c --- /dev/null +++ b/packages/database/prisma/migrations/20240924125845_add_root_task_run_id_index/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TaskRun_rootTaskRunId_idx" ON "TaskRun"("rootTaskRunId"); \ No newline at end of file diff --git a/packages/database/prisma/migrations/20240924130558_add_parent_span_id_to_task_run/migration.sql b/packages/database/prisma/migrations/20240924130558_add_parent_span_id_to_task_run/migration.sql new file mode 100644 index 0000000000..6b0239b7a3 --- /dev/null +++ b/packages/database/prisma/migrations/20240924130558_add_parent_span_id_to_task_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN "parentSpanId" TEXT; diff --git a/packages/database/prisma/schema.prisma b/packages/database/prisma/schema.prisma index 1a3db9e451..9bfafe3ee3 100644 --- a/packages/database/prisma/schema.prisma +++ b/packages/database/prisma/schema.prisma @@ -1748,9 +1748,14 @@ model TaskRun { /// The depth of this task run in the task run hierarchy depth Int @default(0) + /// The span ID of the "trigger" span in the parent task run + parentSpanId String? + @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) // Finding child runs @@index([parentTaskRunId]) + // Finding ancestor runs + @@index([rootTaskRunId]) // Task activity graph @@index([projectId, createdAt, taskIdentifier]) //Runs list