Skip to content

Add custom telemetry exporter support #1602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/rich-trainers-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Add otel exporter support
6 changes: 4 additions & 2 deletions packages/cli-v3/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,10 @@ function adaptResolveEnvVarsToSyncEnvVarsExtension(
function getInstrumentedPackageNames(config: ResolvedConfig): Array<string> {
const packageNames = [];

if (config.instrumentations) {
for (const instrumentation of config.instrumentations) {
if (config.instrumentations ?? config.telemetry?.instrumentations) {
for (const instrumentation of config.telemetry?.instrumentations ??
config.instrumentations ??
[]) {
const moduleDefinitions = (
instrumentation as any
).getModuleDefinitions?.() as Array<InstrumentationModuleDefinition>;
Expand Down
3 changes: 2 additions & 1 deletion packages/cli-v3/src/entryPoints/deploy-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ async function bootstrap() {

const tracingSDK = new TracingSDK({
url: env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318",
instrumentations: config.instrumentations ?? [],
instrumentations: config.telemetry?.instrumentations ?? config.instrumentations ?? [],
exporters: config.telemetry?.exporters ?? [],
diagLogLevel: (env.OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none",
forceFlushTimeoutMillis: 30_000,
});
Expand Down
3 changes: 2 additions & 1 deletion packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ async function bootstrap() {

const tracingSDK = new TracingSDK({
url: env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318",
instrumentations: config.instrumentations ?? [],
instrumentations: config.telemetry?.instrumentations ?? config.instrumentations ?? [],
exporters: config.telemetry?.exporters ?? [],
diagLogLevel: (env.OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none",
forceFlushTimeoutMillis: 30_000,
});
Expand Down
19 changes: 19 additions & 0 deletions packages/core/src/v3/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Instrumentation } from "@opentelemetry/instrumentation";
import type { SpanExporter } from "@opentelemetry/sdk-trace-base";
import type { BuildExtension } from "./build/extensions.js";
import type { MachinePresetName } from "./schemas/common.js";
import type { LogLevel } from "./logger/taskLogger.js";
Expand Down Expand Up @@ -53,9 +54,27 @@ export type TriggerConfig = {
* Instrumentations to use for OpenTelemetry. This is useful if you want to add custom instrumentations to your tasks.
*
* @see https://trigger.dev/docs/config/config-file#instrumentations
*
* @deprecated Use the `telemetry.instrumentations` option instead.
*/
instrumentations?: Array<Instrumentation>;

telemetry?: {
/**
* Instrumentations to use for OpenTelemetry. This is useful if you want to add custom instrumentations to your tasks.
*
* @see https://trigger.dev/docs/config/config-file#instrumentations
*/
instrumentations?: Array<Instrumentation>;

/**
* Exporters to use for OpenTelemetry. This is useful if you want to add custom exporters to your tasks.
*
* @see https://trigger.dev/docs/config/config-file#exporters
*/
exporters?: Array<SpanExporter>;
};

/**
* Specify a custom path to your tsconfig file. This is useful if you have a custom tsconfig file that you want to use.
*/
Expand Down
69 changes: 69 additions & 0 deletions packages/core/src/v3/otel/tracingSDK.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import {
BatchSpanProcessor,
NodeTracerProvider,
ReadableSpan,
SimpleSpanProcessor,
SpanExporter,
} from "@opentelemetry/sdk-trace-node";
Expand Down Expand Up @@ -85,6 +86,7 @@ export type TracingSDKConfig = {
forceFlushTimeoutMillis?: number;
resource?: IResource;
instrumentations?: Instrumentation[];
exporters?: SpanExporter[];
diagLogLevel?: TracingDiagnosticLogLevel;
};

Expand All @@ -111,6 +113,8 @@ export class TracingSDK {
.merge(
new Resource({
[SemanticResourceAttributes.CLOUD_PROVIDER]: "trigger.dev",
[SemanticResourceAttributes.SERVICE_NAME]:
getEnvVar("OTEL_SERVICE_NAME") ?? "trigger.dev",
[SemanticInternalAttributes.TRIGGER]: true,
[SemanticInternalAttributes.CLI_VERSION]: VERSION,
})
Expand Down Expand Up @@ -153,6 +157,25 @@ export class TracingSDK {
)
);

const externalTraceId = crypto.randomUUID();

for (const exporter of config.exporters ?? []) {
traceProvider.addSpanProcessor(
getEnvVar("OTEL_BATCH_PROCESSING_ENABLED") === "1"
? new BatchSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId), {
maxExportBatchSize: parseInt(getEnvVar("OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"),
scheduledDelayMillis: parseInt(
getEnvVar("OTEL_SPAN_SCHEDULED_DELAY_MILLIS") ?? "200"
),
exportTimeoutMillis: parseInt(
getEnvVar("OTEL_SPAN_EXPORT_TIMEOUT_MILLIS") ?? "30000"
),
maxQueueSize: parseInt(getEnvVar("OTEL_SPAN_MAX_QUEUE_SIZE") ?? "512"),
})
: new SimpleSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId))
);
}
Comment on lines +160 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation and error handling for exporters configuration

The exporter initialization has several potential issues:

  1. Missing validation for the exporters array
  2. No error handling for invalid environment variable values
  3. Potential memory pressure from unbounded queue sizes

Consider these improvements:

+    // Validate exporters
+    if (config.exporters?.length) {
+      if (!Array.isArray(config.exporters)) {
+        throw new Error("exporters must be an array");
+      }
+      
+      for (const exporter of config.exporters) {
+        if (!exporter.export || typeof exporter.export !== "function") {
+          throw new Error("Invalid exporter: missing export function");
+        }
+      }
+    }

+    // Parse and validate env vars with defaults
+    const batchSize = Math.min(
+      parseInt(getEnvVar("OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"),
+      100 // Maximum safe batch size
+    );
+    const queueSize = Math.min(
+      parseInt(getEnvVar("OTEL_SPAN_MAX_QUEUE_SIZE") ?? "512"),
+      1000 // Maximum safe queue size
+    );

     const externalTraceId = crypto.randomUUID();

     for (const exporter of config.exporters ?? []) {
       traceProvider.addSpanProcessor(
         getEnvVar("OTEL_BATCH_PROCESSING_ENABLED") === "1"
           ? new BatchSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId), {
-              maxExportBatchSize: parseInt(getEnvVar("OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"),
+              maxExportBatchSize: batchSize,
               scheduledDelayMillis: parseInt(
                 getEnvVar("OTEL_SPAN_SCHEDULED_DELAY_MILLIS") ?? "200"
               ),
               exportTimeoutMillis: parseInt(
                 getEnvVar("OTEL_SPAN_EXPORT_TIMEOUT_MILLIS") ?? "30000"
               ),
-              maxQueueSize: parseInt(getEnvVar("OTEL_SPAN_MAX_QUEUE_SIZE") ?? "512"),
+              maxQueueSize: queueSize,
             })
           : new SimpleSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId))
       );
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const externalTraceId = crypto.randomUUID();
for (const exporter of config.exporters ?? []) {
traceProvider.addSpanProcessor(
getEnvVar("OTEL_BATCH_PROCESSING_ENABLED") === "1"
? new BatchSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId), {
maxExportBatchSize: parseInt(getEnvVar("OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"),
scheduledDelayMillis: parseInt(
getEnvVar("OTEL_SPAN_SCHEDULED_DELAY_MILLIS") ?? "200"
),
exportTimeoutMillis: parseInt(
getEnvVar("OTEL_SPAN_EXPORT_TIMEOUT_MILLIS") ?? "30000"
),
maxQueueSize: parseInt(getEnvVar("OTEL_SPAN_MAX_QUEUE_SIZE") ?? "512"),
})
: new SimpleSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId))
);
}
// Validate exporters
if (config.exporters?.length) {
if (!Array.isArray(config.exporters)) {
throw new Error("exporters must be an array");
}
for (const exporter of config.exporters) {
if (!exporter.export || typeof exporter.export !== "function") {
throw new Error("Invalid exporter: missing export function");
}
}
}
// Parse and validate env vars with defaults
const batchSize = Math.min(
parseInt(getEnvVar("OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"),
100 // Maximum safe batch size
);
const queueSize = Math.min(
parseInt(getEnvVar("OTEL_SPAN_MAX_QUEUE_SIZE") ?? "512"),
1000 // Maximum safe queue size
);
const externalTraceId = crypto.randomUUID();
for (const exporter of config.exporters ?? []) {
traceProvider.addSpanProcessor(
getEnvVar("OTEL_BATCH_PROCESSING_ENABLED") === "1"
? new BatchSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId), {
maxExportBatchSize: batchSize,
scheduledDelayMillis: parseInt(
getEnvVar("OTEL_SPAN_SCHEDULED_DELAY_MILLIS") ?? "200"
),
exportTimeoutMillis: parseInt(
getEnvVar("OTEL_SPAN_EXPORT_TIMEOUT_MILLIS") ?? "30000"
),
maxQueueSize: queueSize,
})
: new SimpleSpanProcessor(new ExternalSpanExporterWrapper(exporter, externalTraceId))
);
}


traceProvider.register();

registerInstrumentations({
Expand Down Expand Up @@ -236,3 +259,49 @@ function setLogLevel(level: TracingDiagnosticLogLevel) {

diag.setLogger(new DiagConsoleLogger(), diagLogLevel);
}

class ExternalSpanExporterWrapper {
constructor(
private underlyingExporter: SpanExporter,
private externalTraceId: string
) {}

private transformSpan(span: ReadableSpan): ReadableSpan | undefined {
if (span.attributes[SemanticInternalAttributes.SPAN_PARTIAL]) {
// Skip partial spans
return;
}

const spanContext = span.spanContext();

return {
...span,
spanContext: () => ({ ...spanContext, traceId: this.externalTraceId }),
parentSpanId: span.attributes[SemanticInternalAttributes.SPAN_ATTEMPT]
? undefined
: span.parentSpanId,
};
}

export(spans: any[], resultCallback: (result: any) => void): void {
try {
const modifiedSpans = spans.map(this.transformSpan.bind(this));
this.underlyingExporter.export(
modifiedSpans.filter(Boolean) as ReadableSpan[],
resultCallback
);
} catch (e) {
console.error(e);
}
}

shutdown(): Promise<void> {
return this.underlyingExporter.shutdown();
}

forceFlush?(): Promise<void> {
return this.underlyingExporter.forceFlush
? this.underlyingExporter.forceFlush()
: Promise.resolve();
}
}
Comment on lines +263 to +307
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve type safety and error handling in ExternalSpanExporterWrapper

The wrapper implementation has several areas for improvement:

  1. Uses any[] type instead of proper span types
  2. Basic error handling that only logs errors
  3. Missing documentation explaining the wrapper's purpose

Consider these improvements:

+/**
+ * Wraps a SpanExporter to transform spans before export:
+ * - Replaces trace IDs with a consistent external ID
+ * - Filters out partial spans
+ * - Handles parent span relationships
+ */
 class ExternalSpanExporterWrapper {
   constructor(
     private underlyingExporter: SpanExporter,
     private externalTraceId: string
   ) {}

   private transformSpan(span: ReadableSpan): ReadableSpan | undefined {
     if (span.attributes[SemanticInternalAttributes.SPAN_PARTIAL]) {
       // Skip partial spans
       return;
     }

     const spanContext = span.spanContext();

     return {
       ...span,
       spanContext: () => ({ ...spanContext, traceId: this.externalTraceId }),
       parentSpanId: span.attributes[SemanticInternalAttributes.SPAN_ATTEMPT]
         ? undefined
         : span.parentSpanId,
     };
   }

-  export(spans: any[], resultCallback: (result: any) => void): void {
+  export(
+    spans: ReadableSpan[],
+    resultCallback: (result: ExportResult) => void
+  ): void {
     try {
       const modifiedSpans = spans.map(this.transformSpan.bind(this));
       this.underlyingExporter.export(
         modifiedSpans.filter(Boolean) as ReadableSpan[],
         resultCallback
       );
     } catch (e) {
-      console.error(e);
+      console.error("Error in ExternalSpanExporterWrapper.export:", e);
+      resultCallback({ code: ExportResultCode.FAILED, error: e as Error });
     }
   }

   shutdown(): Promise<void> {
-    return this.underlyingExporter.shutdown();
+    return this.underlyingExporter.shutdown().catch((e) => {
+      console.error("Error in ExternalSpanExporterWrapper.shutdown:", e);
+      throw e;
+    });
   }

   forceFlush?(): Promise<void> {
-    return this.underlyingExporter.forceFlush
+    return this.underlyingExporter.forceFlush?.()
+      .catch((e) => {
+        console.error("Error in ExternalSpanExporterWrapper.forceFlush:", e);
+        throw e;
+      })
       ? this.underlyingExporter.forceFlush()
       : Promise.resolve();
   }
 }

Committable suggestion skipped: line range outside the PR's diff.

1 change: 1 addition & 0 deletions packages/core/src/v3/semanticInternalAttributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ export const SemanticInternalAttributes = {
RATE_LIMIT_LIMIT: "response.rateLimit.limit",
RATE_LIMIT_REMAINING: "response.rateLimit.remaining",
RATE_LIMIT_RESET: "response.rateLimit.reset",
SPAN_ATTEMPT: "$span.attempt",
};
1 change: 1 addition & 0 deletions packages/core/src/v3/workers/taskExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ export class TaskExecutor {
kind: SpanKind.CONSUMER,
attributes: {
[SemanticInternalAttributes.STYLE_ICON]: "attempt",
[SemanticInternalAttributes.SPAN_ATTEMPT]: true,
},
},
this._tracer.extractContext(traceContext),
Expand Down
Loading
Loading