diff --git a/config/smartapi_overrides.yaml b/config/smartapi_overrides.yaml index 3ac7e57..1e3f2c3 100644 --- a/config/smartapi_overrides.yaml +++ b/config/smartapi_overrides.yaml @@ -2,6 +2,7 @@ config: only_overrides: false apis: { + "1d288b3a3caf75d541ffaae3aab386c8": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/refs/heads/semmeddb_publication_refactor/semmeddb/smartapi.yaml", "8f08d1446e0bb9c2b323713ce83e2bd3": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/refs/heads/remove-clinical-trials/mychem.info/openapi_full.yml", "1138c3297e8e403b6ac10cff5609b319": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/refs/heads/remove-clinical-trials/repodb/smartapi.yaml" } diff --git a/package.json b/package.json index 6c10459..3d50019 100644 --- a/package.json +++ b/package.json @@ -81,14 +81,16 @@ "@bull-board/api": "^5.9.1", "@bull-board/express": "^5.9.1", "@opentelemetry/api": "^1.7.0", - "@opentelemetry/auto-instrumentations-node": "^0.44.0", "@opentelemetry/exporter-jaeger": "^1.19.0", "@opentelemetry/exporter-metrics-otlp-proto": "^0.50.0", "@opentelemetry/exporter-trace-otlp-proto": "^0.50.0", + "@opentelemetry/instrumentation-express": "^0.43.0", + "@opentelemetry/instrumentation-http": "^0.53.0", "@opentelemetry/resources": "^1.18.1", "@opentelemetry/sdk-metrics": "^1.18.1", "@opentelemetry/sdk-node": "^0.50.0", "@opentelemetry/sdk-trace-node": "^1.18.1", + "@opentelemetry/semantic-conventions": "^1.27.0", "@sentry/node": "^7.74.1", "@sentry/profiling-node": "^1.2.1", "axios": "^0.27.2", diff --git a/src/controllers/async/asyncquery.ts b/src/controllers/async/asyncquery.ts index f8703cd..4bfdc26 100644 --- a/src/controllers/async/asyncquery.ts +++ b/src/controllers/async/asyncquery.ts @@ -1,4 +1,5 @@ import axios, { AxiosError, AxiosResponse } from "axios"; +import { context, propagation } from "@opentelemetry/api"; import { customAlphabet } from "nanoid"; import * as utils from "../../utils/common"; import { redisClient } from "@biothings-explorer/utils"; @@ -42,8 +43,14 @@ export async function asyncquery( } const url = `${req.protocol}://${req.header("host")}/v1/asyncquery_status/${jobId}`; + // add OTel trace context + const otelData: Partial<{ traceparent: string; tracestate: string }> = {}; + propagation.inject(context.active(), otelData); + const { traceparent, tracestate } = otelData; + queueData = { ...queueData, traceparent, tracestate }; + const job = await queryQueue.add( - { ...queueData, url: url.replace("status", "response") }, + { ...queueData, traceparent: traceparent, tracestate: tracestate, url: url.replace("status", "response") }, { jobId: jobId, timeout: parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()), diff --git a/src/controllers/opentelemetry.ts b/src/controllers/opentelemetry.ts index 2acf1e9..557a5c0 100644 --- a/src/controllers/opentelemetry.ts +++ b/src/controllers/opentelemetry.ts @@ -1,20 +1,39 @@ import { NodeSDK } from "@opentelemetry/sdk-node"; -import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node"; +import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-node"; +import { HttpInstrumentation } from "@opentelemetry/instrumentation-http"; +import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express"; import { Resource } from "@opentelemetry/resources"; import Debug from "debug"; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'; const debug = Debug("bte:biothings-explorer:otel-init"); -import { JaegerExporter } from "@opentelemetry/exporter-jaeger"; +import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; + +const jaegerHost = process.env.JAEGER_HOST ?? 'jaeger-otel-collector.sri'; +const jaegerPort = process.env.JAEGER_PORT ?? 4318; +const jaegerResName = process.env.JAEGER_RES_NAME ?? '/v1/traces'; + +const traceExporter = new OTLPTraceExporter({ + url: `http://${jaegerHost}:${jaegerPort}${jaegerResName}` +}); +const spanProcessor = new BatchSpanProcessor(traceExporter); debug("Initializing Opentelemetry instrumentation..."); const sdk = new NodeSDK({ - traceExporter: new JaegerExporter({ - host: process.env.JAEGER_HOST ?? "jaeger-otel-agent.sri", - port: parseInt(process.env.JAEGER_PORT ?? "6832"), - }), - instrumentations: [getNodeAutoInstrumentations()], + // metrics, if needed, shall be exported on a different endpoint + // trace a subset of instrumentations to avoid performance overhead + instrumentations: [new HttpInstrumentation(), new ExpressInstrumentation()], resource: new Resource({ - "service.name": "biothings-explorer", + [ATTR_SERVICE_NAME]: "biothings-explorer", }), + // @ts-ignore because MetinSeylan/Nestjs-OpenTelemetry#63 + spanProcessors: [spanProcessor], }); +debug(`OTel URL http://${jaegerHost}:${jaegerPort}${jaegerResName}`); sdk.start(); debug("Opentelemetry instrumentation initialized."); + +export async function flushRemainingSpans(): Promise { + // avoid losing any spans in the buffer when taskHandler exits + debug("Flushing remaining spans..."); + await spanProcessor.forceFlush(); +} diff --git a/src/controllers/threading/taskHandler.ts b/src/controllers/threading/taskHandler.ts index 9fcc41b..b22ae41 100644 --- a/src/controllers/threading/taskHandler.ts +++ b/src/controllers/threading/taskHandler.ts @@ -14,9 +14,10 @@ import { tasks } from "../../routes/index"; import { getQueryQueue } from "../async/asyncquery_queue"; import * as Sentry from "@sentry/node"; import { ProfilingIntegration } from "@sentry/profiling-node"; -import OpenTelemetry, { Span } from "@opentelemetry/api"; +import { Span, trace, context, propagation, Context, Tracer } from "@opentelemetry/api"; import { Telemetry } from "@biothings-explorer/utils"; import { InnerTaskData } from "@biothings-explorer/types"; +import { flushRemainingSpans } from "../opentelemetry"; // use SENTRY_DSN environment variable try { @@ -90,13 +91,15 @@ async function runTask({ scope.setSpan(transaction); }); - span = OpenTelemetry.trace - .getTracer("biothings-explorer-thread") - .startSpan( - routeNames[route], - undefined, - OpenTelemetry.propagation.extract(OpenTelemetry.context.active(), { traceparent, tracestate }), - ); + let activeContext: Context = propagation.extract(context.active(), { traceparent, tracestate }); + debug(`OTel task context: ${traceparent} and ${tracestate}`); + let tracer: Tracer = trace.getTracer("biothings-explorer-thread") + span = tracer.startSpan( + routeNames[route], + {kind: 1}, // specifies internal span + activeContext, + ); + span.setAttribute("bte.requestData", JSON.stringify(req.data.queryGraph)); Telemetry.setOtelSpan(span); } catch (error) { @@ -111,6 +114,7 @@ async function runTask({ transaction.finish(); span.end(); Telemetry.removeOtelSpan(); + await flushRemainingSpans(); } catch (error) { debug("Sentry/OpenTelemetry transaction finish error. This does not affect execution."); debug(error); diff --git a/src/controllers/threading/threadHandler.ts b/src/controllers/threading/threadHandler.ts index 8bf711e..6a6916a 100644 --- a/src/controllers/threading/threadHandler.ts +++ b/src/controllers/threading/threadHandler.ts @@ -1,6 +1,6 @@ import { MessageChannel, threadId } from "worker_threads"; import Debug from "debug"; -import { context, propagation } from "@opentelemetry/api"; +import { context, propagation, trace, Context, Span } from "@opentelemetry/api"; const debug = Debug("bte:biothings-explorer-trapi:threading"); import path from "path"; import { redisClient } from "@biothings-explorer/utils"; @@ -104,11 +104,8 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri const abortController = new AbortController(); const { port1: toWorker, port2: fromWorker } = new MessageChannel(); - // get otel context - - const otelData: Partial<{ traceparent: string; tracestate: string }> = {}; - propagation.inject(context.active(), otelData); - const { traceparent, tracestate } = otelData; + const traceparent: string = taskInfo.data.traceparent; + const tracestate: string = taskInfo.data.tracestate; const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: toWorker }; taskData.req.data.options = {...taskData.req.data.options, metakg: global.metakg?.ops, smartapi: global.smartapi} as QueryHandlerOptions; @@ -219,6 +216,11 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri export async function runTask(req: Request, res: Response, route: string, useBullSync = true): Promise { const queryQueue: Queue = global.queryQueue.bte_sync_query_queue; + + const otelData: Partial<{ traceparent: string; tracestate: string }> = {}; + propagation.inject(context.active(), otelData); + const { traceparent, tracestate } = otelData; + const taskInfo: TaskInfo = { data: { route, @@ -233,6 +235,8 @@ export async function runTask(req: Request, res: Response, route: string, useBul }, params: req.params, endpoint: req.originalUrl, + traceparent: traceparent, + tracestate: tracestate, }, }; @@ -240,12 +244,15 @@ export async function runTask(req: Request, res: Response, route: string, useBul taskInfo.data.options.caching = false; } + debug(`OTel ${traceparent} and ${tracestate}`); if (process.env.USE_THREADING === "false") { // Threading disabled, just use the provided function in main event loop + debug("OTel enter no threading") const response = (await tasks[route](taskInfo)) as TrapiResponse; return response; } else if (!(queryQueue && useBullSync)) { // Redis unavailable or query not to sync queue such as asyncquery_status + debug("OTel enter queueTaskToWorkers") const response = await queueTaskToWorkers( useBullSync ? global.threadpool.sync : global.threadpool.misc, taskInfo, @@ -294,6 +301,7 @@ export async function runTask(req: Request, res: Response, route: string, useBul throw new ServerOverloadedError(message, expectedWaitTime); } + debug("OTel enter queryQueue.add") const job = await queryQueue.add(taskInfo.data, jobOpts); try { const response: TrapiResponse = await (job.finished() as Promise);