diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index 3a82fcf93f..f87c2a143b 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,7 +1,9 @@ +import { nanoid } from "nanoid"; + export type DynamicFlushSchedulerConfig = { batchSize: number; flushInterval: number; - callback: (batch: T[]) => Promise; + callback: (flushId: string, batch: T[]) => Promise; }; export class DynamicFlushScheduler { @@ -10,7 +12,7 @@ export class DynamicFlushScheduler { private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; private flushTimer: NodeJS.Timeout | null; - private readonly callback: (batch: T[]) => Promise; + private readonly callback: (flushId: string, batch: T[]) => Promise; constructor(config: DynamicFlushSchedulerConfig) { this.batchQueue = []; @@ -57,7 +59,7 @@ export class DynamicFlushScheduler { const batchToFlush = this.batchQueue.shift(); try { - await this.callback(batchToFlush!); + await this.callback(nanoid(), batchToFlush!); if (this.batchQueue.length > 0) { this.flushNextBatch(); } diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 86dc446f38..4cfa215844 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -1,4 +1,4 @@ -import { Attributes, Link, TraceFlags } from "@opentelemetry/api"; +import { Attributes, Link, trace, TraceFlags, Tracer } from "@opentelemetry/api"; import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base"; import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"; import { @@ -32,6 +32,8 @@ import { singleton } from "~/utils/singleton"; import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server"; import { startActiveSpan } from "./tracer.server"; import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server"; +import { startSpan } from "./tracing.server"; +import { nanoid } from "nanoid"; const MAX_FLUSH_DEPTH = 5; @@ -99,6 +101,7 @@ export type EventRepoConfig = { batchInterval: number; redis: RedisWithClusterOptions; retentionInDays: number; + tracer?: Tracer; }; export type QueryOptions = Prisma.TaskEventWhereInput; @@ -202,6 +205,8 @@ export class EventRepository { private _randomIdGenerator = new RandomIdGenerator(); private _redisPublishClient: RedisClient; private _subscriberCount = 0; + private _tracer: Tracer; + private _lastFlushedAt: Date | undefined; get subscriberCount() { return this._subscriberCount; @@ -219,6 +224,7 @@ export class EventRepository { }); this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis); + this._tracer = _config.tracer ?? trace.getTracer("eventRepo", "0.0.1"); } async insert(event: CreatableEvent) { @@ -226,7 +232,7 @@ export class EventRepository { } async insertImmediate(event: CreatableEvent) { - await this.#flushBatch([event]); + await this.#flushBatch(nanoid(), [event]); } async insertMany(events: CreatableEvent[]) { @@ -234,7 +240,7 @@ export class EventRepository { } async insertManyImmediate(events: CreatableEvent[]) { - return await this.#flushBatch(events); + return await this.#flushBatch(nanoid(), events); } async completeEvent(spanId: string, options?: UpdateEventOptions) { @@ -1019,42 +1025,56 @@ export class EventRepository { }; } - async #flushBatch(batch: CreatableEvent[]) { - const events = excludePartialEventsWithCorrespondingFullEvent(batch); + async #flushBatch(flushId: string, batch: CreatableEvent[]) { + return await startSpan(this._tracer, "flushBatch", async (span) => { + const events = excludePartialEventsWithCorrespondingFullEvent(batch); - const flushedEvents = await this.#doFlushBatch(events); + span.setAttribute("flush_id", flushId); + span.setAttribute("event_count", events.length); + span.setAttribute("partial_event_count", batch.length - events.length); + span.setAttribute( + "last_flush_in_ms", + this._lastFlushedAt ? new Date().getTime() - this._lastFlushedAt.getTime() : 0 + ); - if (flushedEvents.length !== events.length) { - logger.debug("[EventRepository][flushBatch] Failed to insert all events", { - attemptCount: events.length, - successCount: flushedEvents.length, - }); - } + const flushedEvents = await this.#doFlushBatch(flushId, events); + + this._lastFlushedAt = new Date(); + + if (flushedEvents.length !== events.length) { + logger.debug("[EventRepository][flushBatch] Failed to insert all events", { + attemptCount: events.length, + successCount: flushedEvents.length, + }); + + span.setAttribute("failed_event_count", events.length - flushedEvents.length); + } - this.#publishToRedis(flushedEvents); + this.#publishToRedis(flushedEvents); + }); } - async #doFlushBatch(events: CreatableEvent[], depth: number = 1): Promise { - try { - await this.db.taskEvent.createMany({ - data: events as Prisma.TaskEventCreateManyInput[], - }); + async #doFlushBatch( + flushId: string, + events: CreatableEvent[], + depth: number = 1 + ): Promise { + return await startSpan(this._tracer, "doFlushBatch", async (span) => { + try { + span.setAttribute("event_count", events.length); + span.setAttribute("depth", depth); + span.setAttribute("flush_id", flushId); - return events; - } catch (error) { - if (error instanceof Prisma.PrismaClientUnknownRequestError) { - logger.error("Failed to insert events, most likely because of null characters", { - error: { - name: error.name, - message: error.message, - stack: error.stack, - clientVersion: error.clientVersion, - }, + await this.db.taskEvent.createMany({ + data: events as Prisma.TaskEventCreateManyInput[], }); - if (events.length === 1) { - logger.debug("Attempting to insert event individually and it failed", { - event: events[0], + span.setAttribute("inserted_event_count", events.length); + + return events; + } catch (error) { + if (error instanceof Prisma.PrismaClientUnknownRequestError) { + logger.error("Failed to insert events, most likely because of null characters", { error: { name: error.name, message: error.message, @@ -1063,38 +1083,62 @@ export class EventRepository { }, }); - return []; - } + if (events.length === 1) { + logger.debug("Attempting to insert event individually and it failed", { + event: events[0], + error: { + name: error.name, + message: error.message, + stack: error.stack, + clientVersion: error.clientVersion, + }, + }); - if (depth > MAX_FLUSH_DEPTH) { - logger.error("Failed to insert events, reached maximum depth", { - error: { - name: error.name, - message: error.message, - stack: error.stack, - clientVersion: error.clientVersion, - }, - depth, - eventsCount: events.length, - }); + span.setAttribute("failed_event_count", 1); - return []; - } + return []; + } - // Split the events into two batches, and recursively try to insert them. - const middle = Math.floor(events.length / 2); - const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)]; + if (depth > MAX_FLUSH_DEPTH) { + logger.error("Failed to insert events, reached maximum depth", { + error: { + name: error.name, + message: error.message, + stack: error.stack, + clientVersion: error.clientVersion, + }, + depth, + eventsCount: events.length, + }); - const [firstHalfEvents, secondHalfEvents] = await Promise.all([ - this.#doFlushBatch(firstHalf, depth + 1), - this.#doFlushBatch(secondHalf, depth + 1), - ]); + span.setAttribute("reached_max_flush_depth", true); + span.setAttribute("failed_event_count", events.length); - return firstHalfEvents.concat(secondHalfEvents); - } + return []; + } - throw error; - } + // Split the events into two batches, and recursively try to insert them. + const middle = Math.floor(events.length / 2); + const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)]; + + return await startSpan(this._tracer, "bisectBatch", async (span) => { + span.setAttribute("first_half_count", firstHalf.length); + span.setAttribute("second_half_count", secondHalf.length); + span.setAttribute("depth", depth); + span.setAttribute("flush_id", flushId); + + const [firstHalfEvents, secondHalfEvents] = await Promise.all([ + this.#doFlushBatch(flushId, firstHalf, depth + 1), + this.#doFlushBatch(flushId, secondHalf, depth + 1), + ]); + + return firstHalfEvents.concat(secondHalfEvents); + }); + } + + throw error; + } + }); } async #publishToRedis(events: CreatableEvent[]) { diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index e0cf474216..cfee68e88f 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -25,6 +25,8 @@ import { CreatableEventEnvironmentType, } from "./eventRepository.server"; import { logger } from "~/services/logger.server"; +import { trace, Tracer } from "@opentelemetry/api"; +import { startSpan } from "./tracing.server"; export type OTLPExporterConfig = { batchSize: number; @@ -32,51 +34,63 @@ export type OTLPExporterConfig = { }; class OTLPExporter { + private _tracer: Tracer; + constructor( private readonly _eventRepository: EventRepository, private readonly _verbose: boolean - ) {} + ) { + this._tracer = trace.getTracer("otlp-exporter"); + } async exportTraces( request: ExportTraceServiceRequest, immediate: boolean = false ): Promise { - this.#logExportTracesVerbose(request); + return await startSpan(this._tracer, "exportTraces", async (span) => { + this.#logExportTracesVerbose(request); - const events = this.#filterResourceSpans(request.resourceSpans).flatMap((resourceSpan) => { - return convertSpansToCreateableEvents(resourceSpan); - }); + const events = this.#filterResourceSpans(request.resourceSpans).flatMap((resourceSpan) => { + return convertSpansToCreateableEvents(resourceSpan); + }); - this.#logEventsVerbose(events); + this.#logEventsVerbose(events); - if (immediate) { - await this._eventRepository.insertManyImmediate(events); - } else { - await this._eventRepository.insertMany(events); - } + span.setAttribute("event_count", events.length); + + if (immediate) { + await this._eventRepository.insertManyImmediate(events); + } else { + await this._eventRepository.insertMany(events); + } - return ExportTraceServiceResponse.create(); + return ExportTraceServiceResponse.create(); + }); } async exportLogs( request: ExportLogsServiceRequest, immediate: boolean = false ): Promise { - this.#logExportLogsVerbose(request); + return await startSpan(this._tracer, "exportLogs", async (span) => { + this.#logExportLogsVerbose(request); - const events = this.#filterResourceLogs(request.resourceLogs).flatMap((resourceLog) => { - return convertLogsToCreateableEvents(resourceLog); - }); + const events = this.#filterResourceLogs(request.resourceLogs).flatMap((resourceLog) => { + return convertLogsToCreateableEvents(resourceLog); + }); - this.#logEventsVerbose(events); + this.#logEventsVerbose(events); - if (immediate) { - await this._eventRepository.insertManyImmediate(events); - } else { - await this._eventRepository.insertMany(events); - } + span.setAttribute("event_count", events.length); - return ExportLogsServiceResponse.create(); + if (immediate) { + await this._eventRepository.insertManyImmediate(events); + } else { + await this._eventRepository.insertMany(events); + } + + return ExportLogsServiceResponse.create(); + }); } #logEventsVerbose(events: CreatableEvent[]) { diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index d69d11c7d4..7215c4348e 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -122,9 +122,12 @@ export class FinalizeTaskRunService extends BaseService { const result = await resumeService.call({ id: run.id }); if (result.success) { - logger.log("FinalizeTaskRunService: Resumed dependent parents", { result, run }); + logger.log("FinalizeTaskRunService: Resumed dependent parents", { result, run: run.id }); } else { - logger.error("FinalizeTaskRunService: Failed to resume dependent parents", { result, run }); + logger.error("FinalizeTaskRunService: Failed to resume dependent parents", { + result, + run: run.id, + }); } //enqueue alert diff --git a/apps/webapp/app/v3/tracer.server.ts b/apps/webapp/app/v3/tracer.server.ts index 7b6db8240c..c40773cdd9 100644 --- a/apps/webapp/app/v3/tracer.server.ts +++ b/apps/webapp/app/v3/tracer.server.ts @@ -8,6 +8,7 @@ import { SpanKind, SpanOptions, SpanStatusCode, + Tracer, diag, trace, } from "@opentelemetry/api";