Skip to content

Commit

Permalink
Include river errors in server side spans (#283)
Browse files Browse the repository at this point in the history
## Why

This includes errors on the spans generated by the server so we have
more error info.

I also changed the names of the spans to match the format that
river-python is using.

In order to write a test for this, I had to make it so the tracer is
created per-transport rather than having a global one.

## What changed

- De-global the tracer. The transport will construct and own a tracer
instance (obtained from the global tracer provider).
- Add river errors to the server side spans
- Update the server/client spans to match the format we are using for
river-python
- This may make it harder to look at historical span data, let me know
if you think this is a bad idea and I can revert it

## Versioning

- [ ] Breaking protocol change
- [ ] Breaking ts/js API change

<!-- Kind reminder to add tests and updated documentation if needed -->
  • Loading branch information
cbrewster authored Feb 3, 2025
1 parent 1007a55 commit 31c0800
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 44 deletions.
1 change: 1 addition & 0 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ function handleProc(
const procClosesWithInit = procType === 'rpc' || procType === 'subscription';
const streamId = generateId();
const { span, ctx } = createProcTelemetryInfo(
transport.tracer,
session,
procType,
serviceName,
Expand Down
16 changes: 13 additions & 3 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ import { Value, ValueError } from '@sinclair/typebox/value';
import { Err, Result, Ok, ErrResult } from './result';
import { EventMap } from '../transport/events';
import { coerceErrorString } from '../transport/stringifyError';
import { Span, SpanStatusCode } from '@opentelemetry/api';
import { createHandlerSpan, PropagationContext } from '../tracing';
import { Span } from '@opentelemetry/api';
import {
createHandlerSpan,
PropagationContext,
recordRiverError,
} from '../tracing';
import { ServerHandshakeOptions } from './handshake';
import { Connection } from '../transport/connection';
import { ServerTransport } from '../transport/server';
Expand Down Expand Up @@ -198,6 +202,7 @@ class RiverServer<Services extends AnyServiceSchemaMap>

// if its not a cancelled stream, validate and create a new stream
createHandlerSpan(
transport.tracer,
newStreamProps.initialSession,
newStreamProps.procedure.type,
newStreamProps.serviceName,
Expand Down Expand Up @@ -423,6 +428,8 @@ class RiverServer<Services extends AnyServiceSchemaMap>
};

const onServerCancel = (e: Static<typeof ReaderErrorSchema>) => {
recordRiverError(span, e);

if (reqReadable.isClosed() && resWritable.isClosed()) {
// Everything already closed, no-op.
return;
Expand Down Expand Up @@ -476,6 +483,10 @@ class RiverServer<Services extends AnyServiceSchemaMap>
Result<Static<PayloadType>, Static<BaseErrorSchemaType>>
>({
writeCb: (response) => {
if (!response.ok) {
recordRiverError(span, response.payload);
}

sessionScopedSend({
streamId,
controlFlags: procClosesWithResponse
Expand Down Expand Up @@ -519,7 +530,6 @@ class RiverServer<Services extends AnyServiceSchemaMap>
const errorMsg = coerceErrorString(err);

span.recordException(err instanceof Error ? err : new Error(errorMsg));
span.setStatus({ code: SpanStatusCode.ERROR });

this.log?.error(
`${serviceName}.${procedureName} handler threw an uncaught error`,
Expand Down
2 changes: 2 additions & 0 deletions testUtil/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { SessionStateGraph } from '../transport/sessionStateMachine/transitions'
import { BaseErrorSchemaType } from '../router/errors';
import { ClientTransport } from '../transport/client';
import { ServerTransport } from '../transport/server';
import { getTracer } from '../tracing';

export {
createMockTransportNetwork,
Expand Down Expand Up @@ -186,6 +187,7 @@ export function dummySession() {
},
testingSessionOptions,
currentProtocolVersion,
getTracer(),
);
}

Expand Down
35 changes: 28 additions & 7 deletions tracing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ import {
Context,
Span,
SpanKind,
SpanStatusCode,
context,
propagation,
trace,
Tracer,
} from '@opentelemetry/api';
import { version as RIVER_VERSION } from '../package.json';
import { ValidProcType } from '../router';
import { BaseErrorSchemaType, RIVER_VERSION, ValidProcType } from '../router';
import { Connection } from '../transport';
import { MessageMetadata } from '../logging';
import { ClientSession } from '../transport/sessionStateMachine/transitions';
import { IdentifiedSession } from '../transport/sessionStateMachine/common';
import { Static } from '@sinclair/typebox';

export interface PropagationContext {
traceparent: string;
Expand All @@ -36,6 +38,7 @@ export function getPropagationContext(
}

export function createSessionTelemetryInfo(
tracer: Tracer,
sessionId: string,
to: string,
from: string,
Expand All @@ -46,7 +49,7 @@ export function createSessionTelemetryInfo(
: context.active();

const span = tracer.startSpan(
`session ${sessionId}`,
`river.session.${sessionId}`,
{
attributes: {
component: 'river',
Expand All @@ -64,6 +67,7 @@ export function createSessionTelemetryInfo(
}

export function createConnectionTelemetryInfo(
tracer: Tracer,
connection: Connection,
info: TelemetryInfo,
): TelemetryInfo {
Expand All @@ -85,6 +89,7 @@ export function createConnectionTelemetryInfo(
}

export function createProcTelemetryInfo(
tracer: Tracer,
session: ClientSession<Connection>,
kind: ValidProcType,
serviceName: string,
Expand All @@ -93,7 +98,7 @@ export function createProcTelemetryInfo(
): TelemetryInfo {
const baseCtx = context.active();
const span = tracer.startSpan(
`procedure call ${serviceName}.${procedureName}`,
`river.client.${serviceName}.${procedureName}`,
{
attributes: {
component: 'river',
Expand Down Expand Up @@ -131,6 +136,7 @@ export function createProcTelemetryInfo(
}

export function createHandlerSpan<Fn extends (span: Span) => unknown>(
tracer: Tracer,
session: IdentifiedSession,
kind: ValidProcType,
serviceName: string,
Expand All @@ -144,7 +150,7 @@ export function createHandlerSpan<Fn extends (span: Span) => unknown>(
: context.active();

return tracer.startActiveSpan<Fn>(
`procedure handler ${serviceName}.${procedureName}`,
`river.server.${serviceName}.${procedureName}`,
{
attributes: {
component: 'river',
Expand All @@ -162,5 +168,20 @@ export function createHandlerSpan<Fn extends (span: Span) => unknown>(
);
}

const tracer = trace.getTracer('river', RIVER_VERSION);
export default tracer;
export function recordRiverError(
span: Span,
error: Static<BaseErrorSchemaType>,
): void {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
span.setAttributes({
'river.error_code': error.code,
'river.error_message': error.message,
});
}

export function getTracer(): Tracer {
return trace.getTracer('river', RIVER_VERSION);
}
104 changes: 89 additions & 15 deletions tracing/tracing.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { trace, context, propagation, Span } from '@opentelemetry/api';
import {
trace,
context,
propagation,
Span,
SpanStatusCode,
} from '@opentelemetry/api';
import { describe, test, expect, vi, assert, beforeEach } from 'vitest';
import { dummySession, readNextResult } from '../testUtil';

Expand All @@ -9,10 +15,7 @@ import {
} from '@opentelemetry/sdk-trace-base';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
import tracer, {
createSessionTelemetryInfo,
getPropagationContext,
} from './index';
import { createSessionTelemetryInfo, getPropagationContext } from './index';
import { testMatrix } from '../testUtil/fixtures/matrix';
import {
cleanupTransports,
Expand All @@ -27,26 +30,27 @@ import { createClient } from '../router/client';
import { UNCAUGHT_ERROR_CODE } from '../router';
import { LogFn } from '../logging';

describe('Basic tracing tests', () => {
const provider = new BasicTracerProvider();
provider.addSpanProcessor(
new SimpleSpanProcessor(new InMemorySpanExporter()),
);
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
trace.setGlobalTracerProvider(provider);
context.setGlobalContextManager(contextManager);
propagation.setGlobalPropagator(new W3CTraceContextPropagator());
const provider = new BasicTracerProvider();
const spanExporter = new InMemorySpanExporter();
provider.addSpanProcessor(new SimpleSpanProcessor(spanExporter));
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
trace.setGlobalTracerProvider(provider);
context.setGlobalContextManager(contextManager);
propagation.setGlobalPropagator(new W3CTraceContextPropagator());

describe('Basic tracing tests', () => {
test('createSessionTelemetryInfo', () => {
const parentCtx = context.active();
const tracer = trace.getTracer('test');
const span = tracer.startSpan('empty span', {}, parentCtx);
const ctx = trace.setSpan(parentCtx, span);

const propCtx = getPropagationContext(ctx);
expect(propCtx?.traceparent).toBeTruthy();
const session = dummySession();
const teleInfo = createSessionTelemetryInfo(
tracer,
session.id,
session.to,
session.from,
Expand Down Expand Up @@ -75,6 +79,7 @@ describe.each(testMatrix())(
const setup = await transport.setup({ client: opts, server: opts });
getClientTransport = setup.getClientTransport;
getServerTransport = setup.getServerTransport;
spanExporter.reset();

return async () => {
await postTestCleanup();
Expand Down Expand Up @@ -189,5 +194,74 @@ describe.each(testMatrix())(
server,
});
});

test('river errors are recorded on handler spans', async () => {
// setup
const clientTransport = getClientTransport('client');
const clientMockLogger = vi.fn<LogFn>();
clientTransport.bindLogger(clientMockLogger);
const serverTransport = getServerTransport();
const serverMockLogger = vi.fn<LogFn>();
serverTransport.bindLogger(serverMockLogger);
const services = {
fallible: FallibleServiceSchema,
};
const server = createServer(serverTransport, services);
const client = createClient<typeof services>(
clientTransport,
serverTransport.clientId,
);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

const { reqWritable, resReadable } = client.fallible.echo.stream({});

reqWritable.write({
msg: 'abc',
throwResult: false,
throwError: false,
});
let result = await readNextResult(resReadable);
expect(result).toStrictEqual({
ok: true,
payload: {
response: 'abc',
},
});

// this isn't the first message so doesn't have telemetry info on the message itself
reqWritable.write({
msg: 'def',
throwResult: false,
throwError: true,
});

result = await readNextResult(resReadable);
expect(result).toStrictEqual({
ok: false,
payload: {
code: UNCAUGHT_ERROR_CODE,
message: 'some message',
},
});

const spans = spanExporter.getFinishedSpans();

const errSpan = spans.find(
(span) =>
span.name === 'river.server.fallible.echo' &&
span.status.code === SpanStatusCode.ERROR,
);
expect(errSpan).toBeTruthy();
expect(errSpan?.attributes['river.error_code']).toBe(UNCAUGHT_ERROR_CODE);
expect(errSpan?.attributes['river.error_message']).toBe('some message');

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});
},
);
34 changes: 19 additions & 15 deletions transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { Transport } from './transport';
import { coerceErrorString } from './stringifyError';
import { ProtocolError } from './events';
import { Value } from '@sinclair/typebox/value';
import tracer, { getPropagationContext } from '../tracing';
import { getPropagationContext } from '../tracing';
import { Connection } from './connection';
import { MessageMetadata } from '../logging';
import { SessionConnecting } from './sessionStateMachine/SessionConnecting';
Expand Down Expand Up @@ -110,6 +110,7 @@ export abstract class ClientTransport<
},
this.options,
currentProtocolVersion,
this.tracer,
this.log,
);

Expand Down Expand Up @@ -377,20 +378,23 @@ export abstract class ClientTransport<
}

protected onBackoffFinished(session: SessionBackingOff) {
const connPromise = tracer.startActiveSpan('connect', async (span) => {
try {
return await this.createNewOutgoingConnection(session.to);
} catch (err) {
// rethrow the error so that the promise is rejected
// as it was before we wrapped it in a span
const errStr = coerceErrorString(err);
span.recordException(errStr);
span.setStatus({ code: SpanStatusCode.ERROR });
throw err;
} finally {
span.end();
}
});
const connPromise = session.tracer.startActiveSpan(
'connect',
async (span) => {
try {
return await this.createNewOutgoingConnection(session.to);
} catch (err) {
// rethrow the error so that the promise is rejected
// as it was before we wrapped it in a span
const errStr = coerceErrorString(err);
span.recordException(errStr);
span.setStatus({ code: SpanStatusCode.ERROR });
throw err;
} finally {
span.end();
}
},
);

// transition to connecting
const connectingSession =
Expand Down
1 change: 1 addition & 0 deletions transport/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export abstract class ServerTransport<
},
},
this.options,
this.tracer,
this.log,
);

Expand Down
Loading

0 comments on commit 31c0800

Please sign in to comment.