Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include river errors in server side spans #283

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ function handleProc(
const procClosesWithInit = procType === 'rpc' || procType === 'subscription';
const streamId = generateId();
const { span, ctx } = createProcTelemetryInfo(
transport.tracer,
session,
procType,
serviceName,
Expand Down
16 changes: 13 additions & 3 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ import { Value, ValueError } from '@sinclair/typebox/value';
import { Err, Result, Ok, ErrResult } from './result';
import { EventMap } from '../transport/events';
import { coerceErrorString } from '../transport/stringifyError';
import { Span, SpanStatusCode } from '@opentelemetry/api';
import { createHandlerSpan, PropagationContext } from '../tracing';
import { Span } from '@opentelemetry/api';
import {
createHandlerSpan,
PropagationContext,
recordRiverError,
} from '../tracing';
import { ServerHandshakeOptions } from './handshake';
import { Connection } from '../transport/connection';
import { ServerTransport } from '../transport/server';
Expand Down Expand Up @@ -198,6 +202,7 @@ class RiverServer<Services extends AnyServiceSchemaMap>

// if its not a cancelled stream, validate and create a new stream
createHandlerSpan(
transport.tracer,
newStreamProps.initialSession,
newStreamProps.procedure.type,
newStreamProps.serviceName,
Expand Down Expand Up @@ -423,6 +428,8 @@ class RiverServer<Services extends AnyServiceSchemaMap>
};

const onServerCancel = (e: Static<typeof ReaderErrorSchema>) => {
recordRiverError(span, e);

if (reqReadable.isClosed() && resWritable.isClosed()) {
// Everything already closed, no-op.
return;
Expand Down Expand Up @@ -476,6 +483,10 @@ class RiverServer<Services extends AnyServiceSchemaMap>
Result<Static<PayloadType>, Static<BaseErrorSchemaType>>
>({
writeCb: (response) => {
if (!response.ok) {
recordRiverError(span, response.payload);
}

sessionScopedSend({
streamId,
controlFlags: procClosesWithResponse
Expand Down Expand Up @@ -519,7 +530,6 @@ class RiverServer<Services extends AnyServiceSchemaMap>
const errorMsg = coerceErrorString(err);

span.recordException(err instanceof Error ? err : new Error(errorMsg));
span.setStatus({ code: SpanStatusCode.ERROR });

this.log?.error(
`${serviceName}.${procedureName} handler threw an uncaught error`,
Expand Down
2 changes: 2 additions & 0 deletions testUtil/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { SessionStateGraph } from '../transport/sessionStateMachine/transitions'
import { BaseErrorSchemaType } from '../router/errors';
import { ClientTransport } from '../transport/client';
import { ServerTransport } from '../transport/server';
import { getTracer } from '../tracing';

export {
createMockTransportNetwork,
Expand Down Expand Up @@ -186,6 +187,7 @@ export function dummySession() {
},
testingSessionOptions,
currentProtocolVersion,
getTracer(),
);
}

Expand Down
35 changes: 28 additions & 7 deletions tracing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ import {
Context,
Span,
SpanKind,
SpanStatusCode,
context,
propagation,
trace,
Tracer,
} from '@opentelemetry/api';
import { version as RIVER_VERSION } from '../package.json';
import { ValidProcType } from '../router';
import { BaseErrorSchemaType, RIVER_VERSION, ValidProcType } from '../router';
import { Connection } from '../transport';
import { MessageMetadata } from '../logging';
import { ClientSession } from '../transport/sessionStateMachine/transitions';
import { IdentifiedSession } from '../transport/sessionStateMachine/common';
import { Static } from '@sinclair/typebox';

export interface PropagationContext {
traceparent: string;
Expand All @@ -36,6 +38,7 @@ export function getPropagationContext(
}

export function createSessionTelemetryInfo(
tracer: Tracer,
sessionId: string,
to: string,
from: string,
Expand All @@ -46,7 +49,7 @@ export function createSessionTelemetryInfo(
: context.active();

const span = tracer.startSpan(
`session ${sessionId}`,
`river.session.${sessionId}`,
{
attributes: {
component: 'river',
Expand All @@ -64,6 +67,7 @@ export function createSessionTelemetryInfo(
}

export function createConnectionTelemetryInfo(
tracer: Tracer,
connection: Connection,
info: TelemetryInfo,
): TelemetryInfo {
Expand All @@ -85,6 +89,7 @@ export function createConnectionTelemetryInfo(
}

export function createProcTelemetryInfo(
tracer: Tracer,
session: ClientSession<Connection>,
kind: ValidProcType,
serviceName: string,
Expand All @@ -93,7 +98,7 @@ export function createProcTelemetryInfo(
): TelemetryInfo {
const baseCtx = context.active();
const span = tracer.startSpan(
`procedure call ${serviceName}.${procedureName}`,
`river.client.${serviceName}.${procedureName}`,
{
attributes: {
component: 'river',
Expand Down Expand Up @@ -131,6 +136,7 @@ export function createProcTelemetryInfo(
}

export function createHandlerSpan<Fn extends (span: Span) => unknown>(
tracer: Tracer,
session: IdentifiedSession,
kind: ValidProcType,
serviceName: string,
Expand All @@ -144,7 +150,7 @@ export function createHandlerSpan<Fn extends (span: Span) => unknown>(
: context.active();

return tracer.startActiveSpan<Fn>(
`procedure handler ${serviceName}.${procedureName}`,
`river.server.${serviceName}.${procedureName}`,
{
attributes: {
component: 'river',
Expand All @@ -162,5 +168,20 @@ export function createHandlerSpan<Fn extends (span: Span) => unknown>(
);
}

const tracer = trace.getTracer('river', RIVER_VERSION);
export default tracer;
export function recordRiverError(
span: Span,
error: Static<BaseErrorSchemaType>,
): void {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
span.setAttributes({
'river.error_code': error.code,
'river.error_message': error.message,
});
}

export function getTracer(): Tracer {
return trace.getTracer('river', RIVER_VERSION);
}
104 changes: 89 additions & 15 deletions tracing/tracing.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { trace, context, propagation, Span } from '@opentelemetry/api';
import {
trace,
context,
propagation,
Span,
SpanStatusCode,
} from '@opentelemetry/api';
import { describe, test, expect, vi, assert, beforeEach } from 'vitest';
import { dummySession, readNextResult } from '../testUtil';

Expand All @@ -9,10 +15,7 @@ import {
} from '@opentelemetry/sdk-trace-base';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
import tracer, {
createSessionTelemetryInfo,
getPropagationContext,
} from './index';
import { createSessionTelemetryInfo, getPropagationContext } from './index';
import { testMatrix } from '../testUtil/fixtures/matrix';
import {
cleanupTransports,
Expand All @@ -27,26 +30,27 @@ import { createClient } from '../router/client';
import { UNCAUGHT_ERROR_CODE } from '../router';
import { LogFn } from '../logging';

describe('Basic tracing tests', () => {
const provider = new BasicTracerProvider();
provider.addSpanProcessor(
new SimpleSpanProcessor(new InMemorySpanExporter()),
);
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
trace.setGlobalTracerProvider(provider);
context.setGlobalContextManager(contextManager);
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
const provider = new BasicTracerProvider();
const spanExporter = new InMemorySpanExporter();
provider.addSpanProcessor(new SimpleSpanProcessor(spanExporter));
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
trace.setGlobalTracerProvider(provider);
context.setGlobalContextManager(contextManager);
propagation.setGlobalPropagator(new W3CTraceContextPropagator());

describe('Basic tracing tests', () => {
test('createSessionTelemetryInfo', () => {
const parentCtx = context.active();
const tracer = trace.getTracer('test');
const span = tracer.startSpan('empty span', {}, parentCtx);
const ctx = trace.setSpan(parentCtx, span);

const propCtx = getPropagationContext(ctx);
expect(propCtx?.traceparent).toBeTruthy();
const session = dummySession();
const teleInfo = createSessionTelemetryInfo(
tracer,
session.id,
session.to,
session.from,
Expand Down Expand Up @@ -75,6 +79,7 @@ describe.each(testMatrix())(
const setup = await transport.setup({ client: opts, server: opts });
getClientTransport = setup.getClientTransport;
getServerTransport = setup.getServerTransport;
spanExporter.reset();

return async () => {
await postTestCleanup();
Expand Down Expand Up @@ -189,5 +194,74 @@ describe.each(testMatrix())(
server,
});
});

test('river errors are recorded on handler spans', async () => {
// setup
const clientTransport = getClientTransport('client');
const clientMockLogger = vi.fn<LogFn>();
clientTransport.bindLogger(clientMockLogger);
const serverTransport = getServerTransport();
const serverMockLogger = vi.fn<LogFn>();
serverTransport.bindLogger(serverMockLogger);
const services = {
fallible: FallibleServiceSchema,
};
const server = createServer(serverTransport, services);
const client = createClient<typeof services>(
clientTransport,
serverTransport.clientId,
);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

const { reqWritable, resReadable } = client.fallible.echo.stream({});

reqWritable.write({
msg: 'abc',
throwResult: false,
throwError: false,
});
let result = await readNextResult(resReadable);
expect(result).toStrictEqual({
ok: true,
payload: {
response: 'abc',
},
});

// this isn't the first message so doesn't have telemetry info on the message itself
reqWritable.write({
msg: 'def',
throwResult: false,
throwError: true,
});

result = await readNextResult(resReadable);
expect(result).toStrictEqual({
ok: false,
payload: {
code: UNCAUGHT_ERROR_CODE,
message: 'some message',
},
});

const spans = spanExporter.getFinishedSpans();

const errSpan = spans.find(
(span) =>
span.name === 'river.server.fallible.echo' &&
span.status.code === SpanStatusCode.ERROR,
);
expect(errSpan).toBeTruthy();
expect(errSpan?.attributes['river.error_code']).toBe(UNCAUGHT_ERROR_CODE);
expect(errSpan?.attributes['river.error_message']).toBe('some message');

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});
},
);
34 changes: 19 additions & 15 deletions transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { Transport } from './transport';
import { coerceErrorString } from './stringifyError';
import { ProtocolError } from './events';
import { Value } from '@sinclair/typebox/value';
import tracer, { getPropagationContext } from '../tracing';
import { getPropagationContext } from '../tracing';
import { Connection } from './connection';
import { MessageMetadata } from '../logging';
import { SessionConnecting } from './sessionStateMachine/SessionConnecting';
Expand Down Expand Up @@ -110,6 +110,7 @@ export abstract class ClientTransport<
},
this.options,
currentProtocolVersion,
this.tracer,
this.log,
);

Expand Down Expand Up @@ -377,20 +378,23 @@ export abstract class ClientTransport<
}

protected onBackoffFinished(session: SessionBackingOff) {
const connPromise = tracer.startActiveSpan('connect', async (span) => {
try {
return await this.createNewOutgoingConnection(session.to);
} catch (err) {
// rethrow the error so that the promise is rejected
// as it was before we wrapped it in a span
const errStr = coerceErrorString(err);
span.recordException(errStr);
span.setStatus({ code: SpanStatusCode.ERROR });
throw err;
} finally {
span.end();
}
});
const connPromise = session.tracer.startActiveSpan(
'connect',
async (span) => {
try {
return await this.createNewOutgoingConnection(session.to);
} catch (err) {
// rethrow the error so that the promise is rejected
// as it was before we wrapped it in a span
const errStr = coerceErrorString(err);
span.recordException(errStr);
span.setStatus({ code: SpanStatusCode.ERROR });
throw err;
} finally {
span.end();
}
},
);

// transition to connecting
const connectingSession =
Expand Down
1 change: 1 addition & 0 deletions transport/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export abstract class ServerTransport<
},
},
this.options,
this.tracer,
this.log,
);

Expand Down
Loading