Skip to content

Commit

Permalink
feat(instrumentation-pg): initial implementation of DB metrics (#2349)
Browse files Browse the repository at this point in the history
  • Loading branch information
maryliag authored Sep 11, 2024
1 parent 7448d37 commit 12adb43
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
Span,
SpanStatusCode,
SpanKind,
UpDownCounter,
} from '@opentelemetry/api';
import type * as pgTypes from 'pg';
import type * as pgPoolTypes from 'pg-pool';
Expand All @@ -41,12 +42,53 @@ import * as utils from './utils';
import { addSqlCommenterComment } from '@opentelemetry/sql-common';
import { PACKAGE_NAME, PACKAGE_VERSION } from './version';
import { SpanNames } from './enums/SpanNames';
import {
METRIC_DB_CLIENT_CONNECTION_COUNT,
METRIC_DB_CLIENT_CONNECTION_PENDING_REQUESTS,
} from '@opentelemetry/semantic-conventions/incubating';

export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConfig> {
private _connectionsCount!: UpDownCounter;
private _connectionPendingRequests!: UpDownCounter;
// Pool events connect, acquire, release and remove can be called
// multiple times without changing the values of total, idle and waiting
// connections. The _connectionsCounter is used to keep track of latest
// values and only update the metrics _connectionsCount and _connectionPendingRequests
// when the value change.
private _connectionsCounter: utils.poolConnectionsCounter = {
used: 0,
idle: 0,
pending: 0,
};

constructor(config: PgInstrumentationConfig = {}) {
super(PACKAGE_NAME, PACKAGE_VERSION, config);
}

override _updateMetricInstruments() {
this._connectionsCounter = {
idle: 0,
pending: 0,
used: 0,
};
this._connectionsCount = this.meter.createUpDownCounter(
METRIC_DB_CLIENT_CONNECTION_COUNT,
{
description:
'The number of connections that are currently in state described by the state attribute.',
unit: '{connection}',
}
);
this._connectionPendingRequests = this.meter.createUpDownCounter(
METRIC_DB_CLIENT_CONNECTION_PENDING_REQUESTS,
{
description:
'The number of current pending requests for an open connection.',
unit: '{connection}',
}
);
}

protected init() {
const modulePG = new InstrumentationNodeModuleDefinition(
'pg',
Expand Down Expand Up @@ -334,6 +376,42 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
attributes: utils.getSemanticAttributesFromPool(this.options),
});

this.on('connect', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('acquire', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('remove', () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

this.on('release' as any, () => {
plugin._connectionsCounter = utils.updateCounter(
this,
plugin._connectionsCount,
plugin._connectionPendingRequests,
plugin._connectionsCounter
);
});

if (callback) {
const parentSpan = trace.getSpan(context.active());
callback = utils.patchCallbackPGPool(
Expand Down
51 changes: 51 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
Tracer,
SpanKind,
diag,
UpDownCounter,
} from '@opentelemetry/api';
import { AttributeNames } from './enums/AttributeNames';
import {
Expand All @@ -34,6 +35,12 @@ import {
SEMATTRS_DB_STATEMENT,
DBSYSTEMVALUES_POSTGRESQL,
} from '@opentelemetry/semantic-conventions';
import {
ATTR_DB_CLIENT_CONNECTION_POOL_NAME,
ATTR_DB_CLIENT_CONNECTION_STATE,
DB_CLIENT_CONNECTION_STATE_VALUE_USED,
DB_CLIENT_CONNECTION_STATE_VALUE_IDLE,
} from '@opentelemetry/semantic-conventions/incubating';
import {
PgClientExtended,
PostgresCallback,
Expand Down Expand Up @@ -258,6 +265,50 @@ export function patchCallback(
};
}

export function getPoolName(pool: PgPoolOptionsParams): string {
let poolName = '';
poolName += (pool?.host ? `${pool.host}` : 'unknown_host') + ':';
poolName += (pool?.port ? `${pool.port}` : 'unknown_port') + '/';
poolName += pool?.database ? `${pool.database}` : 'unknown_database';

return poolName.trim();
}

export interface poolConnectionsCounter {
used: number;
idle: number;
pending: number;
}

export function updateCounter(
pool: PgPoolExtended,
connectionCount: UpDownCounter,
connectionPendingRequests: UpDownCounter,
latestCounter: poolConnectionsCounter
): poolConnectionsCounter {
const poolName = getPoolName(pool.options);
const all = pool.totalCount;
const pending = pool.waitingCount;
const idle = pool.idleCount;
const used = all - idle;

connectionCount.add(used - latestCounter.used, {
[ATTR_DB_CLIENT_CONNECTION_STATE]: DB_CLIENT_CONNECTION_STATE_VALUE_USED,
[ATTR_DB_CLIENT_CONNECTION_POOL_NAME]: poolName,
});

connectionCount.add(idle - latestCounter.idle, {
[ATTR_DB_CLIENT_CONNECTION_STATE]: DB_CLIENT_CONNECTION_STATE_VALUE_IDLE,
[ATTR_DB_CLIENT_CONNECTION_POOL_NAME]: poolName,
});

connectionPendingRequests.add(pending - latestCounter.pending, {
[ATTR_DB_CLIENT_CONNECTION_POOL_NAME]: poolName,
});

return { used: used, idle: idle, pending: pending };
}

export function patchCallbackPGPool(
span: Span,
cb: PgPoolCallback
Expand Down
111 changes: 98 additions & 13 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
SEMATTRS_DB_USER,
SEMATTRS_DB_STATEMENT,
} from '@opentelemetry/semantic-conventions';
import { ATTR_DB_CLIENT_CONNECTION_STATE } from '@opentelemetry/semantic-conventions/incubating';

const memoryExporter = new InMemorySpanExporter();

Expand Down Expand Up @@ -180,7 +181,7 @@ describe('pg-pool', () => {
describe('#pool.connect()', () => {
// promise - checkout a client
it('should intercept pool.connect()', async () => {
const pgPoolattributes = {
const pgPoolAttributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
Expand All @@ -191,7 +192,7 @@ describe('pg-pool', () => {
const span = provider.getTracer('test-pg-pool').startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const client = await pool.connect();
runCallbackTest(span, pgPoolattributes, events, unsetStatus, 2, 1);
runCallbackTest(span, pgPoolAttributes, events, unsetStatus, 2, 1);

const [connectSpan, poolConnectSpan] =
memoryExporter.getFinishedSpans();
Expand All @@ -212,7 +213,7 @@ describe('pg-pool', () => {

// callback - checkout a client
it('should not return a promise if callback is provided', done => {
const pgPoolattributes = {
const pgPoolAttributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
Expand All @@ -237,7 +238,7 @@ describe('pg-pool', () => {
assert.ok(client);
runCallbackTest(
parentSpan,
pgPoolattributes,
pgPoolAttributes,
events,
unsetStatus,
1,
Expand Down Expand Up @@ -285,7 +286,7 @@ describe('pg-pool', () => {
describe('#pool.query()', () => {
// promise
it('should call patched client.query()', async () => {
const pgPoolattributes = {
const pgPoolAttributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
Expand All @@ -296,15 +297,15 @@ describe('pg-pool', () => {
const span = provider.getTracer('test-pg-pool').startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const result = await pool.query('SELECT NOW()');
runCallbackTest(span, pgPoolattributes, events, unsetStatus, 2, 0);
runCallbackTest(span, pgPoolAttributes, events, unsetStatus, 2, 0);
runCallbackTest(span, pgAttributes, events, unsetStatus, 2, 1);
assert.ok(result, 'pool.query() returns a promise');
});
});

// callback
it('should not return a promise if callback is provided', done => {
const pgPoolattributes = {
const pgPoolAttributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
Expand All @@ -322,7 +323,7 @@ describe('pg-pool', () => {
}
runCallbackTest(
parentSpan,
pgPoolattributes,
pgPoolAttributes,
events,
unsetStatus,
2,
Expand All @@ -341,7 +342,7 @@ describe('pg-pool', () => {
const events: TimedEvent[] = [];

describe('AND valid responseHook', () => {
const pgPoolattributes = {
const pgPoolAttributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
Expand Down Expand Up @@ -375,7 +376,7 @@ describe('pg-pool', () => {
}
runCallbackTest(
parentSpan,
pgPoolattributes,
pgPoolAttributes,
events,
unsetStatus,
2,
Expand Down Expand Up @@ -409,7 +410,7 @@ describe('pg-pool', () => {
const result = await pool.query(query);
runCallbackTest(
span,
pgPoolattributes,
pgPoolAttributes,
events,
unsetStatus,
2,
Expand All @@ -423,7 +424,7 @@ describe('pg-pool', () => {
});

describe('AND invalid responseHook', () => {
const pgPoolattributes = {
const pgPoolAttributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
Expand Down Expand Up @@ -456,7 +457,7 @@ describe('pg-pool', () => {

runCallbackTest(
parentSpan,
pgPoolattributes,
pgPoolAttributes,
events,
unsetStatus,
2,
Expand All @@ -482,4 +483,88 @@ describe('pg-pool', () => {
});
});
});

describe('pg metrics', () => {
let metricReader: testUtils.TestMetricReader;

beforeEach(() => {
metricReader = testUtils.initMeterProvider(instrumentation);
});

it('should generate `db.client.connection.count` and `db.client.connection.pending_requests` metrics', async () => {
pool.connect((err, client, release) => {
if (err) {
throw new Error(err.message);
}
if (!release) {
throw new Error('Did not receive release function');
}
if (!client) {
throw new Error('No client received');
}
assert.ok(client);

client.query('SELECT NOW()', async (err, ret) => {
release();
if (err) {
throw new Error(err.message);
}
assert.ok(ret);

const { resourceMetrics, errors } = await metricReader.collect();
assert.deepEqual(
errors,
[],
'expected no errors from the callback during metric collection'
);

const metrics = resourceMetrics.scopeMetrics[0].metrics;
assert.strictEqual(
metrics[0].descriptor.name,
'db.client.connection.count'
);
assert.strictEqual(
metrics[0].descriptor.description,
'The number of connections that are currently in state described by the state attribute.'
);
assert.strictEqual(
metrics[0].dataPoints[0].attributes[
ATTR_DB_CLIENT_CONNECTION_STATE
],
'used'
);
assert.strictEqual(
metrics[0].dataPoints[0].value,
1,
'expected to have 1 used connection'
);
assert.strictEqual(
metrics[0].dataPoints[1].attributes[
ATTR_DB_CLIENT_CONNECTION_STATE
],
'idle'
);
assert.strictEqual(
metrics[0].dataPoints[1].value,
0,
'expected to have 0 idle connections'
);

assert.strictEqual(
metrics[1].descriptor.name,
'db.client.connection.pending_requests'
);
assert.strictEqual(
metrics[1].descriptor.description,
'The number of current pending requests for an open connection.'
);
assert.strictEqual(
metrics[1].dataPoints[0].value,
0,
'expected to have 0 pending requests'
);
});
});
});
});
});
Loading

0 comments on commit 12adb43

Please sign in to comment.