Skip to content

Commit

Permalink
fix(pg): avoid disjoint spans from pg instrumentation (#1122)
Browse files Browse the repository at this point in the history
* fix: avoid rogue net and dns spans with pg

* revert node engine version change

* test: fix hang

* refactor: remove code duplication, rename callbacks

* refactor: reuse params type

* refactor: use any instead of never, don't create extra promises

Co-authored-by: Daniel Dyla <dyladan@users.noreply.github.com>
  • Loading branch information
seemk and dyladan authored Sep 13, 2022
1 parent 457be50 commit 82b8a84
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 52 deletions.
129 changes: 93 additions & 36 deletions plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import {
import * as pgTypes from 'pg';
import * as pgPoolTypes from 'pg-pool';
import {
PgClientConnect,
PgClientExtended,
PgErrorCallback,
NormalizedQueryConfig,
PostgresCallback,
PgPoolExtended,
Expand All @@ -46,6 +48,7 @@ import {
import { VERSION } from './version';

const PG_POOL_COMPONENT = 'pg-pool';

export class PgInstrumentation extends InstrumentationBase {
static readonly COMPONENT = 'pg';

Expand All @@ -67,11 +70,23 @@ export class PgInstrumentation extends InstrumentationBase {
if (isWrapped(moduleExports.Client.prototype.query)) {
this._unwrap(moduleExports.Client.prototype, 'query');
}

if (isWrapped(moduleExports.Client.prototype.connect)) {
this._unwrap(moduleExports.Client.prototype, 'connect');
}

this._wrap(
moduleExports.Client.prototype,
'query',
this._getClientQueryPatch() as never
this._getClientQueryPatch() as any
);

this._wrap(
moduleExports.Client.prototype,
'connect',
this._getClientConnectPatch() as any
);

return moduleExports;
},
moduleExports => {
Expand All @@ -93,7 +108,7 @@ export class PgInstrumentation extends InstrumentationBase {
this._wrap(
moduleExports.prototype,
'connect',
this._getPoolConnectPatch() as never
this._getPoolConnectPatch() as any
);
return moduleExports;
},
Expand All @@ -107,6 +122,49 @@ export class PgInstrumentation extends InstrumentationBase {
return [modulePG, modulePGPool];
}

private _getClientConnectPatch() {
const plugin = this;
return (original: PgClientConnect) => {
return function connect(
this: pgTypes.Client,
callback?: PgErrorCallback
) {
const span = plugin.tracer.startSpan(
`${PgInstrumentation.COMPONENT}.connect`,
{
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.database,
[SemanticAttributes.NET_PEER_NAME]: this.host,
[SemanticAttributes.DB_CONNECTION_STRING]:
utils.getConnectionString(this),
[SemanticAttributes.NET_PEER_PORT]: this.port,
[SemanticAttributes.DB_USER]: this.user,
},
}
);

if (callback) {
const parentSpan = trace.getSpan(context.active());
callback = utils.patchClientConnectCallback(span, callback);
if (parentSpan) {
callback = context.bind(context.active(), callback);
}
}

const connectResult: unknown = context.with(
trace.setSpan(context.active(), span),
() => {
return original.call(this, callback);
}
);

return handleConnectResult(span, connectResult);
};
};
}

private _getClientQueryPatch() {
const plugin = this;
return (original: typeof pgTypes.Client.prototype.query) => {
Expand Down Expand Up @@ -186,7 +244,7 @@ export class PgInstrumentation extends InstrumentationBase {
}

// Perform the original query
const result: unknown = original.apply(this, args as never);
const result: unknown = original.apply(this, args as any);

// Bind promise to parent span and end the span
if (result instanceof Promise) {
Expand Down Expand Up @@ -225,15 +283,15 @@ export class PgInstrumentation extends InstrumentationBase {
const plugin = this;
return (originalConnect: typeof pgPoolTypes.prototype.connect) => {
return function connect(this: PgPoolExtended, callback?: PgPoolCallback) {
const jdbcString = utils.getJDBCString(this.options);
const connString = utils.getConnectionString(this.options);
// setup span
const span = plugin.tracer.startSpan(`${PG_POOL_COMPONENT}.connect`, {
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.options.database, // required
[SemanticAttributes.NET_PEER_NAME]: this.options.host, // required
[SemanticAttributes.DB_CONNECTION_STRING]: jdbcString, // required
[SemanticAttributes.DB_CONNECTION_STRING]: connString, // required
[SemanticAttributes.NET_PEER_PORT]: this.options.port,
[SemanticAttributes.DB_USER]: this.options.user,
[AttributeNames.IDLE_TIMEOUT_MILLIS]:
Expand All @@ -254,40 +312,39 @@ export class PgInstrumentation extends InstrumentationBase {
}
}

const connectResult: unknown = originalConnect.call(
this,
callback as never
const connectResult: unknown = context.with(
trace.setSpan(context.active(), span),
() => {
return originalConnect.call(this, callback as any);
}
);

// No callback was provided, return a promise instead
if (connectResult instanceof Promise) {
const connectResultPromise = connectResult as Promise<unknown>;
return context.bind(
context.active(),
connectResultPromise
.then(result => {
// Return a pass-along promise which ends the span and then goes to user's orig resolvers
return new Promise(resolve => {
span.end();
resolve(result);
});
})
.catch((error: Error) => {
return new Promise((_, reject) => {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
span.end();
reject(error);
});
})
);
}

// Else a callback was provided, so just return the result
return connectResult;
return handleConnectResult(span, connectResult);
};
};
}
}

function handleConnectResult(span: Span, connectResult: unknown) {
if (!(connectResult instanceof Promise)) {
return connectResult;
}

const connectResultPromise = connectResult as Promise<unknown>;
return context.bind(
context.active(),
connectResultPromise
.then(result => {
span.end();
return result;
})
.catch((error: Error) => {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
span.end();
return Promise.reject(error);
})
);
}
14 changes: 10 additions & 4 deletions plugins/node/opentelemetry-instrumentation-pg/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ export type PostgresCallback = (err: Error, res: object) => unknown;
// These are not included in @types/pg, so manually define them.
// https://github.com/brianc/node-postgres/blob/fde5ec586e49258dfc4a2fcd861fcdecb4794fc3/lib/client.js#L25
export interface PgClientConnectionParams {
database: string;
host: string;
port: number;
user: string;
database?: string;
host?: string;
port?: number;
user?: string;
}

export interface PgClientExtended extends pgTypes.Client {
Expand All @@ -69,6 +69,8 @@ export type PgPoolCallback = (
done: (release?: any) => void
) => void;

export type PgErrorCallback = (err: Error) => void;

export interface PgPoolOptionsParams {
database: string;
host: string;
Expand All @@ -81,3 +83,7 @@ export interface PgPoolOptionsParams {
export interface PgPoolExtended extends pgPoolTypes<pgTypes.Client> {
options: PgPoolOptionsParams;
}

export type PgClientConnect = (
callback?: (err: Error) => void
) => Promise<void> | void;
30 changes: 25 additions & 5 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
NormalizedQueryConfig,
PostgresCallback,
PgClientConnectionParams,
PgErrorCallback,
PgPoolCallback,
PgPoolExtended,
PgInstrumentationConfig,
Expand All @@ -50,16 +51,16 @@ function getCommandFromText(text?: string): string {
return words[0].length > 0 ? words[0] : 'unknown';
}

export function getJDBCString(params: PgClientConnectionParams) {
const host = params.host || 'localhost'; // postgres defaults to localhost
const port = params.port || 5432; // postgres defaults to port 5432
export function getConnectionString(params: PgClientConnectionParams) {
const host = params.host || 'localhost';
const port = params.port || 5432;
const database = params.database || '';
return `jdbc:postgresql://${host}:${port}/${database}`;
return `postgresql://${host}:${port}/${database}`;
}

// Private helper function to start a span
function pgStartSpan(tracer: Tracer, client: PgClientExtended, name: string) {
const jdbcString = getJDBCString(client.connectionParameters);
const jdbcString = getConnectionString(client.connectionParameters);
return tracer.startSpan(name, {
kind: SpanKind.CLIENT,
attributes: {
Expand Down Expand Up @@ -236,3 +237,22 @@ export function patchCallbackPGPool(
cb.call(this, err, res, done);
};
}

export function patchClientConnectCallback(
span: Span,
cb: PgErrorCallback
): PgErrorCallback {
return function patchedClientConnectCallback(
this: pgTypes.Client,
err: Error
) {
if (err) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: err.message,
});
}
span.end();
cb.call(this, err);
};
}
16 changes: 12 additions & 4 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const DEFAULT_PGPOOL_ATTRIBUTES = {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: CONFIG.database,
[SemanticAttributes.NET_PEER_NAME]: CONFIG.host,
[SemanticAttributes.DB_CONNECTION_STRING]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`,
[SemanticAttributes.DB_CONNECTION_STRING]: `postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`,
[SemanticAttributes.NET_PEER_PORT]: CONFIG.port,
[SemanticAttributes.DB_USER]: CONFIG.user,
[AttributeNames.MAX_CLIENT]: CONFIG.maxClient,
Expand All @@ -78,7 +78,7 @@ const DEFAULT_PG_ATTRIBUTES = {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: CONFIG.database,
[SemanticAttributes.NET_PEER_NAME]: CONFIG.host,
[SemanticAttributes.DB_CONNECTION_STRING]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`,
[SemanticAttributes.DB_CONNECTION_STRING]: `postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`,
[SemanticAttributes.NET_PEER_PORT]: CONFIG.port,
[SemanticAttributes.DB_USER]: CONFIG.user,
};
Expand Down Expand Up @@ -197,11 +197,19 @@ describe('pg-pool', () => {
const span = provider.getTracer('test-pg-pool').startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const client = await pool.connect();
runCallbackTest(span, pgPoolattributes, events, unsetStatus, 1, 0);
runCallbackTest(span, pgPoolattributes, events, unsetStatus, 2, 1);

const [connectSpan, poolConnectSpan] =
memoryExporter.getFinishedSpans();
assert.strictEqual(
connectSpan.parentSpanId,
poolConnectSpan.spanContext().spanId
);

assert.ok(client, 'pool.connect() returns a promise');
try {
await client.query('SELECT NOW()');
runCallbackTest(span, pgAttributes, events, unsetStatus, 2, 1);
runCallbackTest(span, pgAttributes, events, unsetStatus, 3, 2);
} finally {
client.release();
}
Expand Down
Loading

0 comments on commit 82b8a84

Please sign in to comment.