From 269fbd46db4317807d2765182e97cd73e2f93965 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Thu, 30 Jan 2025 19:11:04 -0600 Subject: [PATCH] Include river errors in server side spans --- router/client.ts | 1 + router/server.ts | 16 ++- testUtil/index.ts | 2 + tracing/index.ts | 35 ++++-- tracing/tracing.test.ts | 104 +++++++++++++++--- transport/client.ts | 34 +++--- transport/server.ts | 1 + transport/sessionStateMachine/common.ts | 6 +- .../sessionStateMachine/stateMachine.test.ts | 3 + transport/sessionStateMachine/transitions.ts | 22 +++- transport/transport.ts | 4 + 11 files changed, 184 insertions(+), 44 deletions(-) diff --git a/router/client.ts b/router/client.ts index 0c318b68..6e013109 100644 --- a/router/client.ts +++ b/router/client.ts @@ -288,6 +288,7 @@ function handleProc( const procClosesWithInit = procType === 'rpc' || procType === 'subscription'; const streamId = generateId(); const { span, ctx } = createProcTelemetryInfo( + transport.tracer, session, procType, serviceName, diff --git a/router/server.ts b/router/server.ts index 31667f26..7c6e53bf 100644 --- a/router/server.ts +++ b/router/server.ts @@ -36,8 +36,12 @@ import { Value, ValueError } from '@sinclair/typebox/value'; import { Err, Result, Ok, ErrResult } from './result'; import { EventMap } from '../transport/events'; import { coerceErrorString } from '../transport/stringifyError'; -import { Span, SpanStatusCode } from '@opentelemetry/api'; -import { createHandlerSpan, PropagationContext } from '../tracing'; +import { Span } from '@opentelemetry/api'; +import { + createHandlerSpan, + PropagationContext, + recordRiverError, +} from '../tracing'; import { ServerHandshakeOptions } from './handshake'; import { Connection } from '../transport/connection'; import { ServerTransport } from '../transport/server'; @@ -198,6 +202,7 @@ class RiverServer // if its not a cancelled stream, validate and create a new stream createHandlerSpan( + transport.tracer, newStreamProps.initialSession, newStreamProps.procedure.type, newStreamProps.serviceName, @@ -423,6 +428,8 @@ class RiverServer }; const onServerCancel = (e: Static) => { + recordRiverError(span, e); + if (reqReadable.isClosed() && resWritable.isClosed()) { // Everything already closed, no-op. return; @@ -476,6 +483,10 @@ class RiverServer Result, Static> >({ writeCb: (response) => { + if (!response.ok) { + recordRiverError(span, response.payload); + } + sessionScopedSend({ streamId, controlFlags: procClosesWithResponse @@ -519,7 +530,6 @@ class RiverServer const errorMsg = coerceErrorString(err); span.recordException(err instanceof Error ? err : new Error(errorMsg)); - span.setStatus({ code: SpanStatusCode.ERROR }); this.log?.error( `${serviceName}.${procedureName} handler threw an uncaught error`, diff --git a/testUtil/index.ts b/testUtil/index.ts index ad9c7d65..eddc5119 100644 --- a/testUtil/index.ts +++ b/testUtil/index.ts @@ -19,6 +19,7 @@ import { SessionStateGraph } from '../transport/sessionStateMachine/transitions' import { BaseErrorSchemaType } from '../router/errors'; import { ClientTransport } from '../transport/client'; import { ServerTransport } from '../transport/server'; +import { getTracer } from '../tracing'; export { createMockTransportNetwork, @@ -186,6 +187,7 @@ export function dummySession() { }, testingSessionOptions, currentProtocolVersion, + getTracer(), ); } diff --git a/tracing/index.ts b/tracing/index.ts index ca493f85..b6c6ba92 100644 --- a/tracing/index.ts +++ b/tracing/index.ts @@ -2,16 +2,18 @@ import { Context, Span, SpanKind, + SpanStatusCode, context, propagation, trace, + Tracer, } from '@opentelemetry/api'; -import { version as RIVER_VERSION } from '../package.json'; -import { ValidProcType } from '../router'; +import { BaseErrorSchemaType, RIVER_VERSION, ValidProcType } from '../router'; import { Connection } from '../transport'; import { MessageMetadata } from '../logging'; import { ClientSession } from '../transport/sessionStateMachine/transitions'; import { IdentifiedSession } from '../transport/sessionStateMachine/common'; +import { Static } from '@sinclair/typebox'; export interface PropagationContext { traceparent: string; @@ -36,6 +38,7 @@ export function getPropagationContext( } export function createSessionTelemetryInfo( + tracer: Tracer, sessionId: string, to: string, from: string, @@ -46,7 +49,7 @@ export function createSessionTelemetryInfo( : context.active(); const span = tracer.startSpan( - `session ${sessionId}`, + `river.session.${sessionId}`, { attributes: { component: 'river', @@ -64,6 +67,7 @@ export function createSessionTelemetryInfo( } export function createConnectionTelemetryInfo( + tracer: Tracer, connection: Connection, info: TelemetryInfo, ): TelemetryInfo { @@ -85,6 +89,7 @@ export function createConnectionTelemetryInfo( } export function createProcTelemetryInfo( + tracer: Tracer, session: ClientSession, kind: ValidProcType, serviceName: string, @@ -93,7 +98,7 @@ export function createProcTelemetryInfo( ): TelemetryInfo { const baseCtx = context.active(); const span = tracer.startSpan( - `procedure call ${serviceName}.${procedureName}`, + `river.client.${serviceName}.${procedureName}`, { attributes: { component: 'river', @@ -131,6 +136,7 @@ export function createProcTelemetryInfo( } export function createHandlerSpan unknown>( + tracer: Tracer, session: IdentifiedSession, kind: ValidProcType, serviceName: string, @@ -144,7 +150,7 @@ export function createHandlerSpan unknown>( : context.active(); return tracer.startActiveSpan( - `procedure handler ${serviceName}.${procedureName}`, + `river.server.${serviceName}.${procedureName}`, { attributes: { component: 'river', @@ -162,5 +168,20 @@ export function createHandlerSpan unknown>( ); } -const tracer = trace.getTracer('river', RIVER_VERSION); -export default tracer; +export function recordRiverError( + span: Span, + error: Static, +): void { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message, + }); + span.setAttributes({ + 'river.error_code': error.code, + 'river.error_message': error.message, + }); +} + +export function getTracer(): Tracer { + return trace.getTracer('river', RIVER_VERSION); +} diff --git a/tracing/tracing.test.ts b/tracing/tracing.test.ts index e6614f4c..af9a1037 100644 --- a/tracing/tracing.test.ts +++ b/tracing/tracing.test.ts @@ -1,4 +1,10 @@ -import { trace, context, propagation, Span } from '@opentelemetry/api'; +import { + trace, + context, + propagation, + Span, + SpanStatusCode, +} from '@opentelemetry/api'; import { describe, test, expect, vi, assert, beforeEach } from 'vitest'; import { dummySession, readNextResult } from '../testUtil'; @@ -9,10 +15,7 @@ import { } from '@opentelemetry/sdk-trace-base'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; -import tracer, { - createSessionTelemetryInfo, - getPropagationContext, -} from './index'; +import { createSessionTelemetryInfo, getPropagationContext } from './index'; import { testMatrix } from '../testUtil/fixtures/matrix'; import { cleanupTransports, @@ -27,19 +30,19 @@ import { createClient } from '../router/client'; import { UNCAUGHT_ERROR_CODE } from '../router'; import { LogFn } from '../logging'; -describe('Basic tracing tests', () => { - const provider = new BasicTracerProvider(); - provider.addSpanProcessor( - new SimpleSpanProcessor(new InMemorySpanExporter()), - ); - const contextManager = new AsyncHooksContextManager(); - contextManager.enable(); - trace.setGlobalTracerProvider(provider); - context.setGlobalContextManager(contextManager); - propagation.setGlobalPropagator(new W3CTraceContextPropagator()); +const provider = new BasicTracerProvider(); +const spanExporter = new InMemorySpanExporter(); +provider.addSpanProcessor(new SimpleSpanProcessor(spanExporter)); +const contextManager = new AsyncHooksContextManager(); +contextManager.enable(); +trace.setGlobalTracerProvider(provider); +context.setGlobalContextManager(contextManager); +propagation.setGlobalPropagator(new W3CTraceContextPropagator()); +describe('Basic tracing tests', () => { test('createSessionTelemetryInfo', () => { const parentCtx = context.active(); + const tracer = trace.getTracer('test'); const span = tracer.startSpan('empty span', {}, parentCtx); const ctx = trace.setSpan(parentCtx, span); @@ -47,6 +50,7 @@ describe('Basic tracing tests', () => { expect(propCtx?.traceparent).toBeTruthy(); const session = dummySession(); const teleInfo = createSessionTelemetryInfo( + tracer, session.id, session.to, session.from, @@ -75,6 +79,7 @@ describe.each(testMatrix())( const setup = await transport.setup({ client: opts, server: opts }); getClientTransport = setup.getClientTransport; getServerTransport = setup.getServerTransport; + spanExporter.reset(); return async () => { await postTestCleanup(); @@ -189,5 +194,74 @@ describe.each(testMatrix())( server, }); }); + + test('river errors are recorded on handler spans', async () => { + // setup + const clientTransport = getClientTransport('client'); + const clientMockLogger = vi.fn(); + clientTransport.bindLogger(clientMockLogger); + const serverTransport = getServerTransport(); + const serverMockLogger = vi.fn(); + serverTransport.bindLogger(serverMockLogger); + const services = { + fallible: FallibleServiceSchema, + }; + const server = createServer(serverTransport, services); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + const { reqWritable, resReadable } = client.fallible.echo.stream({}); + + reqWritable.write({ + msg: 'abc', + throwResult: false, + throwError: false, + }); + let result = await readNextResult(resReadable); + expect(result).toStrictEqual({ + ok: true, + payload: { + response: 'abc', + }, + }); + + // this isn't the first message so doesn't have telemetry info on the message itself + reqWritable.write({ + msg: 'def', + throwResult: false, + throwError: true, + }); + + result = await readNextResult(resReadable); + expect(result).toStrictEqual({ + ok: false, + payload: { + code: UNCAUGHT_ERROR_CODE, + message: 'some message', + }, + }); + + const spans = spanExporter.getFinishedSpans(); + + const errSpan = spans.find( + (span) => + span.name === 'river.server.fallible.echo' && + span.status.code === SpanStatusCode.ERROR, + ); + expect(errSpan).toBeTruthy(); + expect(errSpan?.attributes['river.error_code']).toBe(UNCAUGHT_ERROR_CODE); + expect(errSpan?.attributes['river.error_message']).toBe('some message'); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + server, + }); + }); }, ); diff --git a/transport/client.ts b/transport/client.ts index f80abb65..5025fd73 100644 --- a/transport/client.ts +++ b/transport/client.ts @@ -18,7 +18,7 @@ import { Transport } from './transport'; import { coerceErrorString } from './stringifyError'; import { ProtocolError } from './events'; import { Value } from '@sinclair/typebox/value'; -import tracer, { getPropagationContext } from '../tracing'; +import { getPropagationContext } from '../tracing'; import { Connection } from './connection'; import { MessageMetadata } from '../logging'; import { SessionConnecting } from './sessionStateMachine/SessionConnecting'; @@ -110,6 +110,7 @@ export abstract class ClientTransport< }, this.options, currentProtocolVersion, + this.tracer, this.log, ); @@ -377,20 +378,23 @@ export abstract class ClientTransport< } protected onBackoffFinished(session: SessionBackingOff) { - const connPromise = tracer.startActiveSpan('connect', async (span) => { - try { - return await this.createNewOutgoingConnection(session.to); - } catch (err) { - // rethrow the error so that the promise is rejected - // as it was before we wrapped it in a span - const errStr = coerceErrorString(err); - span.recordException(errStr); - span.setStatus({ code: SpanStatusCode.ERROR }); - throw err; - } finally { - span.end(); - } - }); + const connPromise = session.tracer.startActiveSpan( + 'connect', + async (span) => { + try { + return await this.createNewOutgoingConnection(session.to); + } catch (err) { + // rethrow the error so that the promise is rejected + // as it was before we wrapped it in a span + const errStr = coerceErrorString(err); + span.recordException(errStr); + span.setStatus({ code: SpanStatusCode.ERROR }); + throw err; + } finally { + span.end(); + } + }, + ); // transition to connecting const connectingSession = diff --git a/transport/server.ts b/transport/server.ts index d184e3c4..e26980cf 100644 --- a/transport/server.ts +++ b/transport/server.ts @@ -164,6 +164,7 @@ export abstract class ServerTransport< }, }, this.options, + this.tracer, this.log, ); diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index d12fc884..54019ea9 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -11,6 +11,7 @@ import { import { Value } from '@sinclair/typebox/value'; import { Codec } from '../../codec'; import { generateId } from '../id'; +import { Tracer } from '@opentelemetry/api'; export const enum SessionState { NoConnection = 'NoConnection', @@ -147,6 +148,7 @@ export interface SessionOptions { export interface CommonSessionProps { from: TransportClientId; options: SessionOptions; + tracer: Tracer; log: Logger | undefined; } @@ -154,14 +156,16 @@ export abstract class CommonSession extends StateMachineState { readonly from: TransportClientId; readonly options: SessionOptions; + tracer: Tracer; log?: Logger; abstract get loggingMetadata(): MessageMetadata; - constructor({ from, options, log }: CommonSessionProps) { + constructor({ from, options, log, tracer }: CommonSessionProps) { super(); this.from = from; this.options = options; this.log = log; + this.tracer = tracer; } parseMsg(msg: Uint8Array): OpaqueTransportMessage | null { diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index c6dac186..f381edf5 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -35,6 +35,7 @@ import { SessionBackingOff, SessionBackingOffListeners, } from './SessionBackingOff'; +import { getTracer } from '../../tracing'; function persistedSessionState(session: IdentifiedSession) { return { @@ -145,6 +146,7 @@ function createSessionNoConnection() { listeners, testingSessionOptions, currentProtocolVersion, + getTracer(), ); return { session, ...listeners }; @@ -221,6 +223,7 @@ function createSessionWaitingForHandshake() { conn, listeners, testingSessionOptions, + getTracer(), ); return { session, ...listeners }; diff --git a/transport/sessionStateMachine/transitions.ts b/transport/sessionStateMachine/transitions.ts index 595959f9..5cc03590 100644 --- a/transport/sessionStateMachine/transitions.ts +++ b/transport/sessionStateMachine/transitions.ts @@ -39,6 +39,7 @@ import { SessionBackingOffListeners, } from './SessionBackingOff'; import { ProtocolVersion } from '../message'; +import { Tracer } from '@opentelemetry/api'; function inheritSharedSession( session: IdentifiedSession, @@ -53,6 +54,7 @@ function inheritSharedSession( telemetry: session.telemetry, options: session.options, log: session.log, + tracer: session.tracer, protocolVersion: session.protocolVersion, }; } @@ -74,10 +76,11 @@ export const SessionStateGraph = { listeners: SessionNoConnectionListeners, options: SessionOptions, protocolVersion: ProtocolVersion, + tracer: Tracer, log?: Logger, ) => { const id = `session-${generateId()}`; - const telemetry = createSessionTelemetryInfo(id, to, from); + const telemetry = createSessionTelemetryInfo(tracer, id, to, from); const sendBuffer: Array = []; const session = new SessionNoConnection({ @@ -92,6 +95,7 @@ export const SessionStateGraph = { telemetry, options, protocolVersion, + tracer, log, }); @@ -107,6 +111,7 @@ export const SessionStateGraph = { conn: ConnType, listeners: SessionWaitingForHandshakeListeners, options: SessionOptions, + tracer: Tracer, log?: Logger, ): SessionWaitingForHandshake => { const session = new SessionWaitingForHandshake({ @@ -114,6 +119,7 @@ export const SessionStateGraph = { listeners, from, options, + tracer, log, }); @@ -191,7 +197,11 @@ export const SessionStateGraph = { ...carriedState, }); - conn.telemetry = createConnectionTelemetryInfo(conn, session.telemetry); + conn.telemetry = createConnectionTelemetryInfo( + session.tracer, + conn, + session.telemetry, + ); session.log?.info( `session ${session.id} transition from Connecting to Handshaking`, { @@ -249,12 +259,14 @@ export const SessionStateGraph = { ack: 0, sendBuffer: [], telemetry: createSessionTelemetryInfo( + pendingSession.tracer, sessionId, to, from, propagationCtx, ), options, + tracer: pendingSession.tracer, log: pendingSession.log, protocolVersion, }; @@ -268,7 +280,11 @@ export const SessionStateGraph = { ...carriedState, }); - conn.telemetry = createConnectionTelemetryInfo(conn, session.telemetry); + conn.telemetry = createConnectionTelemetryInfo( + session.tracer, + conn, + session.telemetry, + ); session.log?.info( `session ${session.id} transition from WaitingForHandshake to Connected`, { diff --git a/transport/transport.ts b/transport/transport.ts index 548db290..fd230a58 100644 --- a/transport/transport.ts +++ b/transport/transport.ts @@ -26,6 +26,8 @@ import { import { Connection } from './connection'; import { Session, SessionStateGraph } from './sessionStateMachine/transitions'; import { SessionId } from './sessionStateMachine/common'; +import { Tracer } from '@opentelemetry/api'; +import { getTracer } from '../tracing'; /** * Represents the possible states of a transport. @@ -84,6 +86,7 @@ export abstract class Transport { */ protected options: TransportOptions; log?: Logger; + tracer: Tracer; sessions: Map>; @@ -101,6 +104,7 @@ export abstract class Transport { this.clientId = clientId; this.status = 'open'; this.sessions = new Map(); + this.tracer = getTracer(); } bindLogger(fn: LogFn | Logger, level?: LoggingLevel) {