Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Oct 11, 2024
2 parents 82a4546 + cc22b21 commit 96a8634
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 24 deletions.
1 change: 1 addition & 0 deletions config/smartapi_overrides.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
config:
only_overrides: false
apis: {
"1d288b3a3caf75d541ffaae3aab386c8": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/refs/heads/semmeddb_publication_refactor/semmeddb/smartapi.yaml",
"8f08d1446e0bb9c2b323713ce83e2bd3": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/refs/heads/remove-clinical-trials/mychem.info/openapi_full.yml",
"1138c3297e8e403b6ac10cff5609b319": "https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/refs/heads/remove-clinical-trials/repodb/smartapi.yaml"
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@
"@bull-board/api": "^5.9.1",
"@bull-board/express": "^5.9.1",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/auto-instrumentations-node": "^0.44.0",
"@opentelemetry/exporter-jaeger": "^1.19.0",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.50.0",
"@opentelemetry/exporter-trace-otlp-proto": "^0.50.0",
"@opentelemetry/instrumentation-express": "^0.43.0",
"@opentelemetry/instrumentation-http": "^0.53.0",
"@opentelemetry/resources": "^1.18.1",
"@opentelemetry/sdk-metrics": "^1.18.1",
"@opentelemetry/sdk-node": "^0.50.0",
"@opentelemetry/sdk-trace-node": "^1.18.1",
"@opentelemetry/semantic-conventions": "^1.27.0",
"@sentry/node": "^7.74.1",
"@sentry/profiling-node": "^1.2.1",
"axios": "^0.27.2",
Expand Down
9 changes: 8 additions & 1 deletion src/controllers/async/asyncquery.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios, { AxiosError, AxiosResponse } from "axios";
import { context, propagation } from "@opentelemetry/api";
import { customAlphabet } from "nanoid";
import * as utils from "../../utils/common";
import { redisClient } from "@biothings-explorer/utils";
Expand Down Expand Up @@ -42,8 +43,14 @@ export async function asyncquery(
}
const url = `${req.protocol}://${req.header("host")}/v1/asyncquery_status/${jobId}`;

// add OTel trace context
const otelData: Partial<{ traceparent: string; tracestate: string }> = {};
propagation.inject(context.active(), otelData);
const { traceparent, tracestate } = otelData;
queueData = { ...queueData, traceparent, tracestate };

const job = await queryQueue.add(
{ ...queueData, url: url.replace("status", "response") },
{ ...queueData, traceparent: traceparent, tracestate: tracestate, url: url.replace("status", "response") },
{
jobId: jobId,
timeout: parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()),
Expand Down
35 changes: 27 additions & 8 deletions src/controllers/opentelemetry.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
import { NodeSDK } from "@opentelemetry/sdk-node";
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-node";
import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express";
import { Resource } from "@opentelemetry/resources";
import Debug from "debug";
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
const debug = Debug("bte:biothings-explorer:otel-init");
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';

const jaegerHost = process.env.JAEGER_HOST ?? 'jaeger-otel-collector.sri';
const jaegerPort = process.env.JAEGER_PORT ?? 4318;
const jaegerResName = process.env.JAEGER_RES_NAME ?? '/v1/traces';

const traceExporter = new OTLPTraceExporter({
url: `http://${jaegerHost}:${jaegerPort}${jaegerResName}`
});
const spanProcessor = new BatchSpanProcessor(traceExporter);

debug("Initializing Opentelemetry instrumentation...");
const sdk = new NodeSDK({
traceExporter: new JaegerExporter({
host: process.env.JAEGER_HOST ?? "jaeger-otel-agent.sri",
port: parseInt(process.env.JAEGER_PORT ?? "6832"),
}),
instrumentations: [getNodeAutoInstrumentations()],
// metrics, if needed, shall be exported on a different endpoint
// trace a subset of instrumentations to avoid performance overhead
instrumentations: [new HttpInstrumentation(), new ExpressInstrumentation()],
resource: new Resource({
"service.name": "biothings-explorer",
[ATTR_SERVICE_NAME]: "biothings-explorer",
}),
// @ts-ignore because MetinSeylan/Nestjs-OpenTelemetry#63
spanProcessors: [spanProcessor],
});
debug(`OTel URL http://${jaegerHost}:${jaegerPort}${jaegerResName}`);
sdk.start();
debug("Opentelemetry instrumentation initialized.");

export async function flushRemainingSpans(): Promise<void> {
// avoid losing any spans in the buffer when taskHandler exits
debug("Flushing remaining spans...");
await spanProcessor.forceFlush();
}
20 changes: 12 additions & 8 deletions src/controllers/threading/taskHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import { tasks } from "../../routes/index";
import { getQueryQueue } from "../async/asyncquery_queue";
import * as Sentry from "@sentry/node";
import { ProfilingIntegration } from "@sentry/profiling-node";
import OpenTelemetry, { Span } from "@opentelemetry/api";
import { Span, trace, context, propagation, Context, Tracer } from "@opentelemetry/api";
import { Telemetry } from "@biothings-explorer/utils";
import { InnerTaskData } from "@biothings-explorer/types";
import { flushRemainingSpans } from "../opentelemetry";

// use SENTRY_DSN environment variable
try {
Expand Down Expand Up @@ -90,13 +91,15 @@ async function runTask({
scope.setSpan(transaction);
});

span = OpenTelemetry.trace
.getTracer("biothings-explorer-thread")
.startSpan(
routeNames[route],
undefined,
OpenTelemetry.propagation.extract(OpenTelemetry.context.active(), { traceparent, tracestate }),
);
let activeContext: Context = propagation.extract(context.active(), { traceparent, tracestate });
debug(`OTel task context: ${traceparent} and ${tracestate}`);
let tracer: Tracer = trace.getTracer("biothings-explorer-thread")
span = tracer.startSpan(
routeNames[route],
{kind: 1}, // specifies internal span
activeContext,
);

span.setAttribute("bte.requestData", JSON.stringify(req.data.queryGraph));
Telemetry.setOtelSpan(span);
} catch (error) {
Expand All @@ -111,6 +114,7 @@ async function runTask({
transaction.finish();
span.end();
Telemetry.removeOtelSpan();
await flushRemainingSpans();
} catch (error) {
debug("Sentry/OpenTelemetry transaction finish error. This does not affect execution.");
debug(error);
Expand Down
20 changes: 14 additions & 6 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MessageChannel, threadId } from "worker_threads";
import Debug from "debug";
import { context, propagation } from "@opentelemetry/api";
import { context, propagation, trace, Context, Span } from "@opentelemetry/api";
const debug = Debug("bte:biothings-explorer-trapi:threading");
import path from "path";
import { redisClient } from "@biothings-explorer/utils";
Expand Down Expand Up @@ -104,11 +104,8 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
const abortController = new AbortController();
const { port1: toWorker, port2: fromWorker } = new MessageChannel();

// get otel context

const otelData: Partial<{ traceparent: string; tracestate: string }> = {};
propagation.inject(context.active(), otelData);
const { traceparent, tracestate } = otelData;
const traceparent: string = taskInfo.data.traceparent;
const tracestate: string = taskInfo.data.tracestate;

const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: toWorker };
taskData.req.data.options = {...taskData.req.data.options, metakg: global.metakg?.ops, smartapi: global.smartapi} as QueryHandlerOptions;
Expand Down Expand Up @@ -219,6 +216,11 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri

export async function runTask(req: Request, res: Response, route: string, useBullSync = true): Promise<TrapiResponse> {
const queryQueue: Queue = global.queryQueue.bte_sync_query_queue;

const otelData: Partial<{ traceparent: string; tracestate: string }> = {};
propagation.inject(context.active(), otelData);
const { traceparent, tracestate } = otelData;

const taskInfo: TaskInfo = {
data: {
route,
Expand All @@ -233,19 +235,24 @@ export async function runTask(req: Request, res: Response, route: string, useBul
},
params: req.params,
endpoint: req.originalUrl,
traceparent: traceparent,
tracestate: tracestate,
},
};

if ((req.body as TrapiQuery)?.bypass_cache) {
taskInfo.data.options.caching = false;
}

debug(`OTel ${traceparent} and ${tracestate}`);
if (process.env.USE_THREADING === "false") {
// Threading disabled, just use the provided function in main event loop
debug("OTel enter no threading")
const response = (await tasks[route](taskInfo)) as TrapiResponse;
return response;
} else if (!(queryQueue && useBullSync)) {
// Redis unavailable or query not to sync queue such as asyncquery_status
debug("OTel enter queueTaskToWorkers")
const response = await queueTaskToWorkers(
useBullSync ? global.threadpool.sync : global.threadpool.misc,
taskInfo,
Expand Down Expand Up @@ -294,6 +301,7 @@ export async function runTask(req: Request, res: Response, route: string, useBul
throw new ServerOverloadedError(message, expectedWaitTime);
}

debug("OTel enter queryQueue.add")
const job = await queryQueue.add(taskInfo.data, jobOpts);
try {
const response: TrapiResponse = await (job.finished() as Promise<TrapiResponse>);
Expand Down

0 comments on commit 96a8634

Please sign in to comment.