diff --git a/.gitignore b/.gitignore index d4f03a0df..661747bf1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ /.nyc_output /docs/ /out/ -/build/ +**/build/ system-test/secrets.js system-test/*key.json *.lock diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 019d584ac..040ebc67e 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -111,6 +111,7 @@ export abstract class MessageQueue extends EventEmitter { } spanMessages.forEach(m => { tracing.PubsubSpans.updatePublisherTopicName(m.parentSpan!, topic.name); + tracing.PubsubEvents.publishStart(m); }); } @@ -143,6 +144,7 @@ export abstract class MessageQueue extends EventEmitter { // We're finished with both the RPC and the whole publish operation, // so close out all of the related spans. rpcSpan?.end(); + tracing.PubsubEvents.publishEnd(m); m.parentSpan?.end(); }); } diff --git a/src/publisher/pubsub-message.ts b/src/publisher/pubsub-message.ts index 3b05c1f20..87b5705eb 100644 --- a/src/publisher/pubsub-message.ts +++ b/src/publisher/pubsub-message.ts @@ -55,6 +55,24 @@ export interface PubsubMessage * @private */ publishSchedulerSpan?: tracing.Span; + + /** + * If this is a message being received from a subscription, expose the ackId + * internally. Primarily for tracing. + * + * @private + * @internal + */ + ackId?: string; + + /** + * If this is a message being received from a subscription, expose the exactly + * once delivery flag internally. Primarily for tracing. + * + * @private + * @internal + */ + isExactlyOnceDelivery?: boolean; } /** diff --git a/src/subscriber.ts b/src/subscriber.ts index 4b846c87d..fc83c8c07 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -135,6 +135,9 @@ export class SubscriberSpans { } // Emit an event for calling modAck. + // Note that we don't currently support users calling modAck directly, but + // this may be used in the future for things like fully managed pull + // subscriptions. modAckCall(deadline: Duration) { if (this.processing) { tracing.PubsubEvents.modAckCalled(this.processing, deadline); @@ -243,6 +246,16 @@ export class Message implements tracing.MessageWithAttributes { */ parentSpan?: tracing.Span; + /** + * We'll save the state of the subscription's exactly once delivery flag at the + * time the message was received. This is pretty much only for tracing, as we will + * generally use the live state of the subscription to figure out how to respond. + * + * @private + * @internal + */ + isExactlyOnceDelivery: boolean; + /** * @private * @@ -344,6 +357,14 @@ export class Message implements tracing.MessageWithAttributes { */ this.subSpans = new SubscriberSpans(this); + /** + * Save the state of the subscription into the message for later tracing. + * + * @private + * @internal + */ + this.isExactlyOnceDelivery = sub.isExactlyOnceDelivery; + this._handled = false; this._length = this.data.length; this._subscriber = sub; @@ -423,6 +444,7 @@ export class Message implements tracing.MessageWithAttributes { /** * Modifies the ack deadline. + * At present time, this should generally not be called by users. * * @param {number} deadline The number of seconds to extend the deadline. * @private @@ -437,6 +459,7 @@ export class Message implements tracing.MessageWithAttributes { /** * Modifies the ack deadline, expecting a response (for exactly-once delivery subscriptions). * If exactly-once delivery is not enabled, this will immediately resolve successfully. + * At present time, this should generally not be called by users. * * @param {number} deadline The number of seconds to extend the deadline. * @private diff --git a/src/subscription.ts b/src/subscription.ts index 26d7dcfdf..c0a5b129e 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -50,7 +50,6 @@ import {DebugMessage} from './debug'; export {AckError, AckResponse, AckResponses} from './subscriber'; import {EmitterCallback, WrappingEmitter} from './wrapping-emitter'; -import * as tracing from './telemetry-tracing'; export type PushConfig = google.pubsub.v1.IPushConfig; export type OidcToken = google.pubsub.v1.PushConfig.IOidcToken; @@ -1238,6 +1237,7 @@ export class Subscription extends WrappingEmitter { return formatted as google.pubsub.v1.ISubscription; } + /*! * Format the name of a subscription. A subscription's full name is in the * format of projects/{projectId}/subscriptions/{subName}. diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index 9c60bc154..a8a684e7c 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -29,7 +29,6 @@ import { } from '@opentelemetry/api'; import {Attributes, PubsubMessage} from './publisher/pubsub-message'; import {PublishOptions} from './publisher/index'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; import {Duration} from './temporal'; export {Span}; @@ -190,7 +189,7 @@ export const pubsubSetter = new PubsubMessageSet(); * @private */ export interface SpanAttributes { - [x: string]: string | number; + [x: string]: string | number | boolean; } /** @@ -224,8 +223,54 @@ export const modernAttributeName = 'googclient_traceparent'; export const legacyAttributeName = 'googclient_OpenTelemetrySpanContext'; export interface AttributeParams { + // Fully qualified. topicName?: string; subName?: string; + + // These are generally split from the fully qualified names. + projectId?: string; + topicId?: string; + subId?: string; +} + +/** + * Break down the subscription's full name into its project and ID. + * + * @private + */ +export function getSubscriptionInfo(fullName: string): AttributeParams { + const results = fullName.match(/projects\/([^/]+)\/subscriptions\/(.+)/); + if (!results?.[0]) { + return { + subName: fullName, + }; + } + + return { + subName: fullName, + projectId: results[1], + subId: results[2], + }; +} + +/** + * Break down the subscription's full name into its project and ID. + * + * @private + */ +export function getTopicInfo(fullName: string): AttributeParams { + const results = fullName.match(/projects\/([^/]+)\/topics\/(.+)/); + if (!results?.[0]) { + return { + topicName: fullName, + }; + } + + return { + topicName: fullName, + projectId: results[1], + topicId: results[2], + }; } export class PubsubSpans { @@ -234,76 +279,115 @@ export class PubsubSpans { message?: PubsubMessage ): SpanAttributes { const destinationName = params.topicName ?? params.subName; - if (!destinationName || (params.topicName && params.subName)) { - throw new Error('One of topicName or subName must be specified'); - } - const destinationKind = params.topicName ? 'topic' : 'subscription'; + const destinationId = params.topicId ?? params.subId; + const projectId = params.projectId; + + // Purposefully leaving this debug check here as a comment - this should + // always be true, but we don't want to fail in prod if it's not. + /*if ( + (params.topicName && params.subName) || + (!destinationName && !projectId && !destinationId) + ) { + throw new Error( + 'One of topicName or subName must be specified, and must be fully qualified' + ); + }*/ const spanAttributes = { // Add Opentelemetry semantic convention attributes to the span, based on: // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md - [SemanticAttributes.MESSAGING_TEMP_DESTINATION]: false, - [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', - [SemanticAttributes.MESSAGING_OPERATION]: 'send', - [SemanticAttributes.MESSAGING_DESTINATION]: destinationName, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: destinationKind, - [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', + ['messaging.system']: 'gcp_pubsub', + ['messaging.destination.name']: destinationId ?? destinationName, + ['gcp.project_id']: projectId, } as SpanAttributes; if (message) { - if (message.data?.length) { - spanAttributes[ - SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES - ] = message.data?.length; + if (message.calculatedSize) { + spanAttributes['messaging.message.envelope.size'] = + message.calculatedSize; + } else { + if (message.data?.length) { + spanAttributes['messaging.message.envelope.size'] = + message.data?.length; + } } if (message.orderingKey) { spanAttributes['messaging.gcp_pubsub.message.ordering_key'] = message.orderingKey; } + if (message.isExactlyOnceDelivery) { + spanAttributes['messaging.gcp_pubsub.message.exactly_once_delivery'] = + message.isExactlyOnceDelivery; + } + if (message.ackId) { + spanAttributes['messaging.gcp_pubsub.message.ack_id'] = message.ackId; + } } return spanAttributes; } static createPublisherSpan(message: PubsubMessage, topicName: string): Span { - const span: Span = getTracer().startSpan(`${topicName} send`, { + const topicInfo = getTopicInfo(topicName); + const span: Span = getTracer().startSpan(`${topicName} create`, { kind: SpanKind.PRODUCER, - attributes: PubsubSpans.createAttributes({topicName}, message), + attributes: PubsubSpans.createAttributes(topicInfo, message), }); + if (topicInfo.topicId) { + span.updateName(`${topicInfo.topicId} create`); + span.setAttribute('messaging.destination.name', topicInfo.topicId); + } return span; } static updatePublisherTopicName(span: Span, topicName: string) { - span.updateName(`${topicName} send`); - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, topicName); + const topicInfo = getTopicInfo(topicName); + if (topicInfo.topicId) { + span.updateName(`${topicInfo.topicId} create`); + span.setAttribute('messaging.destination.name', topicInfo.topicId); + } else { + span.updateName(`${topicName} create`); + } + if (topicInfo.projectId) { + span.setAttribute('gcp.project_id', topicInfo.projectId); + } } - static createReceiveSpan(subName: string, parent: Context | undefined): Span { - const name = `${subName} receive`; + static createReceiveSpan( + message: PubsubMessage, + subName: string, + parent: Context | undefined + ): Span { + const subInfo = getSubscriptionInfo(subName); + const name = `${subInfo.subId ?? subName} subscribe`; + const attributes = this.createAttributes(subInfo, message); + if (subInfo.subId) { + attributes['messaging.destination.name'] = subInfo.subId; + } - // Mostly we want to keep the context IDs; the attributes and such - // are only something we do on the publish side. if (context) { return getTracer().startSpan( name, { kind: SpanKind.CONSUMER, - attributes: {}, + attributes, }, parent ); } else { return getTracer().startSpan(name, { kind: SpanKind.CONSUMER, + attributes, }); } } static createChildSpan( name: string, - message?: MessageWithAttributes, - parentSpan?: Span + message?: PubsubMessage, + parentSpan?: Span, + attributes?: SpanAttributes ): Span | undefined { const parent = message?.parentSpan ?? parentSpan; if (parent) { @@ -311,7 +395,7 @@ export class PubsubSpans { name, { kind: SpanKind.INTERNAL, - attributes: {}, + attributes: attributes ?? {}, }, spanContextToContext(parent.spanContext()) ); @@ -325,14 +409,16 @@ export class PubsubSpans { } static createPublishSchedulerSpan(message: PubsubMessage): Span | undefined { - return PubsubSpans.createChildSpan('publish scheduler', message); + return PubsubSpans.createChildSpan('publisher batching', message); } static createPublishRpcSpan( messages: MessageWithAttributes[], topicName: string ): Span { - const spanAttributes = PubsubSpans.createAttributes({topicName}); + const spanAttributes = PubsubSpans.createAttributes( + getTopicInfo(topicName) + ); const links: Link[] = messages .map(m => ({context: m.parentSpan?.spanContext()}) as Link) .filter(l => l.context); @@ -367,7 +453,9 @@ export class PubsubSpans { messageSpans: (Span | undefined)[], subName: string ): Span { - const spanAttributes = PubsubSpans.createAttributes({subName}); + const spanAttributes = PubsubSpans.createAttributes( + getSubscriptionInfo(subName) + ); const links: Link[] = messageSpans .map(m => ({context: m?.spanContext()}) as Link) .filter(l => l.context); @@ -401,20 +489,27 @@ export class PubsubSpans { static createReceiveFlowSpan( message: MessageWithAttributes ): Span | undefined { - return PubsubSpans.createChildSpan('subscriber flow control', message); + return PubsubSpans.createChildSpan( + 'subscriber concurrency control', + message + ); } static createReceiveSchedulerSpan( message: MessageWithAttributes ): Span | undefined { - return PubsubSpans.createChildSpan('subscribe scheduler', message); + return PubsubSpans.createChildSpan('subscriber scheduler', message); } static createReceiveProcessSpan( message: MessageWithAttributes, subName: string ): Span | undefined { - return PubsubSpans.createChildSpan(`${subName} process`, message); + const subInfo = getSubscriptionInfo(subName); + return PubsubSpans.createChildSpan( + `${subInfo.subId ?? subName} process`, + message + ); } static setReceiveProcessResult(span: Span, isAck: boolean) { @@ -423,15 +518,27 @@ export class PubsubSpans { static createReceiveLeaseSpan( message: MessageWithAttributes, - deadline: Duration, - isInitial: boolean + subName: string, + type: 'modack' | 'ack' | 'nack', + deadline?: Duration, + isInitial?: boolean ): Span | undefined { - const span = PubsubSpans.createChildSpan('modify ack deadline', message); - span?.setAttribute( - 'messaging.gcp_pubsub.modack_deadline_seconds', - deadline.totalOf('second') + const subInfo = getSubscriptionInfo(subName); + const span = PubsubSpans.createReceiveSpan( + message, + `${subInfo.subId ?? subInfo.subName} ${type}`, + undefined ); - span?.setAttribute('messaging.gcp_pubsub.is_receipt_modack', isInitial); + + if (deadline) { + span?.setAttribute( + 'messaging.gcp_pubsub.message.ack_deadline_seconds', + deadline.totalOf('second') + ); + } + if (isInitial !== undefined) { + span?.setAttribute('messaging.gcp_pubsub.is_receipt_modack', isInitial); + } return span; } } @@ -469,6 +576,14 @@ export class PubsubEvents { PubsubEvents.addEvent('ack end', message); } + static modackStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack start', message); + } + + static modackEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack end', message); + } + static nackStart(message: MessageWithAttributes) { PubsubEvents.addEvent('nack start', message); } @@ -500,7 +615,7 @@ export class PubsubEvents { deadline: Duration, isInitial: boolean ) { - PubsubEvents.addEvent('modify ack deadline start', message, { + PubsubEvents.addEvent('modack start', message, { 'messaging.gcp_pubsub.modack_deadline_seconds': `${deadline.totalOf( 'second' )}`, @@ -509,7 +624,7 @@ export class PubsubEvents { } static modAckEnd(message: MessageWithAttributes) { - PubsubEvents.addEvent('modify ack deadline end', message); + PubsubEvents.addEvent('modack end', message); } // Add this event any time the process is shut down before processing @@ -629,7 +744,7 @@ export function extractSpan( } } - const span = PubsubSpans.createReceiveSpan(subName, context); + const span = PubsubSpans.createReceiveSpan(message, subName, context); message.parentSpan = span; return span; } diff --git a/test/message-queues.ts b/test/message-queues.ts index 095da46f6..98e514f98 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -54,7 +54,7 @@ class FakeSubscriber extends EventEmitter { constructor() { super(); - this.name = uuid.v4(); + this.name = `projects/test/subscriptions/${uuid.v4()}`; this.client = new FakeClient(); this.iEOS = false; } diff --git a/test/publisher/flow-publisher.ts b/test/publisher/flow-publisher.ts index 55e600917..6493c384c 100644 --- a/test/publisher/flow-publisher.ts +++ b/test/publisher/flow-publisher.ts @@ -54,7 +54,10 @@ describe('Flow control publisher', () => { const fcp = new fp.FlowControlledPublisher(publisher); const message = { data: Buffer.from('foo'), - parentSpan: tracing.PubsubSpans.createPublisherSpan({}, 'topic'), + parentSpan: tracing.PubsubSpans.createPublisherSpan( + {}, + 'projects/foo/topics/topic' + ), }; fcp.publish(message as unknown as PubsubMessage); assert.strictEqual(!!message.parentSpan, true); diff --git a/test/publisher/index.ts b/test/publisher/index.ts index 7dc97e3d1..2ebc78ce1 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -31,7 +31,6 @@ import {defaultOptions} from '../../src/default-options'; import * as tracing from '../../src/telemetry-tracing'; import {exporter} from '../tracing'; import {SpanKind} from '@opentelemetry/api'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; let promisified = false; const fakeUtil = Object.assign({}, util, { @@ -100,9 +99,11 @@ class FakeOrderedQueue extends FakeQueue { describe('Publisher', () => { let sandbox: sinon.SinonSandbox; let spy: sinon.SinonSpy; + const topicId = 'topic-name'; + const projectId = 'PROJECT_ID'; const topic = { - name: 'topic-name', - pubsub: {projectId: 'PROJECT_ID'}, + name: `projects/${projectId}/topics/${topicId}`, + pubsub: {projectId}, } as Topic; // tslint:disable-next-line variable-name @@ -209,22 +210,14 @@ describe('Publisher', () => { opentelemetry.SpanStatusCode.UNSET ); assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_OPERATION], - 'send' + createdSpan.attributes['messaging.system'], + 'gcp_pubsub' ); assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM], - 'pubsub' + createdSpan.attributes['messaging.destination.name'], + topicId ); - assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION], - topic.name - ); - assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND], - 'topic' - ); - assert.strictEqual(createdSpan.name, 'topic-name send'); + assert.strictEqual(createdSpan.name, `${topicId} create`); assert.strictEqual( createdSpan.kind, SpanKind.PRODUCER, diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index f9b92f072..b9cefea53 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -28,7 +28,7 @@ import * as q from '../../src/publisher/message-queues'; import {PublishError} from '../../src/publisher/publish-error'; class FakeTopic { - name = 'fake-topic'; + name = 'projects/foo/topics/fake-topic'; // eslint-disable-next-line @typescript-eslint/no-unused-vars request(config: RequestConfig, callback: RequestCallback): void {} } diff --git a/test/subscriber.ts b/test/subscriber.ts index f5e1f33c9..34a6b5e4e 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -58,9 +58,11 @@ class FakePubSub { } } +const projectId = uuid.v4(); +const subId = uuid.v4(); + class FakeSubscription { - name = uuid.v4(); - projectId = uuid.v4(); + name = `projects/${projectId}/subscriptions/${subId}`; pubsub = new FakePubSub(); } @@ -195,9 +197,15 @@ describe('Subscriber', () => { beforeEach(() => { sandbox = sinon.createSandbox(); fakeProjectify = { - replaceProjectIdToken: sandbox.stub().callsFake((name, projectId) => { - return `projects/${projectId}/name/${name}`; - }), + replaceProjectIdToken: sandbox + .stub() + .callsFake((name: string, projectId: string) => { + if (name.indexOf('/') >= 0) { + return name; + } else { + return `projects/${projectId}/name/${name}`; + } + }), }; const s = proxyquire('../src/subscriber.js', { @@ -941,7 +949,7 @@ describe('Subscriber', () => { assert.strictEqual(firstSpan.parentSpanId, parentSpanContext.spanId); assert.strictEqual( firstSpan.name, - `${subscriber.name} receive`, + `${subId} subscribe`, 'name of span should match' ); assert.strictEqual( diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts index 69153957e..25aadebe9 100644 --- a/test/telemetry-tracing.ts +++ b/test/telemetry-tracing.ts @@ -28,117 +28,313 @@ describe('OpenTelemetryTracer', () => { beforeEach(() => { exporter.reset(); }); + afterEach(() => { + exporter.reset(); + }); - it('creates a span', () => { - const message: PubsubMessage = {}; - const span = otel.PubsubSpans.createPublisherSpan( - message, - 'test topic' - ) as trace.Span; - span.end(); + describe('project parser', () => { + it('parses subscription info', () => { + const name = 'projects/project-name/subscriptions/sub-name'; + const info = otel.getSubscriptionInfo(name); + assert.strictEqual(info.subName, name); + assert.strictEqual(info.projectId, 'project-name'); + assert.strictEqual(info.subId, 'sub-name'); + assert.strictEqual(info.topicId, undefined); + assert.strictEqual(info.topicName, undefined); + }); - const spans = exporter.getFinishedSpans(); - assert.notStrictEqual(spans.length, 0); - const exportedSpan = spans.concat().pop()!; + it('parses topic info', () => { + const name = 'projects/project-name/topics/topic-name'; + const info = otel.getTopicInfo(name); + assert.strictEqual(info.topicName, name); + assert.strictEqual(info.projectId, 'project-name'); + assert.strictEqual(info.topicId, 'topic-name'); + assert.strictEqual(info.subId, undefined); + assert.strictEqual(info.subName, undefined); + }); - assert.strictEqual(exportedSpan.name, 'test topic send'); - assert.strictEqual(exportedSpan.kind, SpanKind.PRODUCER); - }); + it('parses broken subscription info', () => { + const name = 'projec/foo_foo/subs/sub_sub'; + const info = otel.getSubscriptionInfo(name); + assert.strictEqual(info.subName, name); + assert.strictEqual(info.projectId, undefined); + assert.strictEqual(info.subId, undefined); + assert.strictEqual(info.topicId, undefined); + assert.strictEqual(info.topicName, undefined); + }); - it('injects a trace context', () => { - const message: PubsubMessage = { - attributes: {}, - }; - const span = otel.PubsubSpans.createPublisherSpan( - message, - 'test topic' - ) as trace.Span; - - otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern); - - assert.strictEqual( - Object.getOwnPropertyNames(message.attributes).includes( - otel.modernAttributeName - ), - true - ); + it('parses broken topic info', () => { + const name = 'projec/foo_foo/tops/top_top'; + const info = otel.getTopicInfo(name); + assert.strictEqual(info.subName, undefined); + assert.strictEqual(info.projectId, undefined); + assert.strictEqual(info.subId, undefined); + assert.strictEqual(info.topicId, undefined); + assert.strictEqual(info.topicName, name); + }); }); - it('injects a trace context and legacy baggage', () => { - const message: PubsubMessage = { - attributes: {}, - }; - const span = otel.PubsubSpans.createPublisherSpan(message, 'test topic'); - - otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); - - assert.strictEqual( - Object.getOwnPropertyNames(message.attributes).includes( - otel.modernAttributeName - ), - true - ); - assert.strictEqual( - Object.getOwnPropertyNames(message.attributes).includes( - otel.legacyAttributeName - ), - true - ); + describe('basic span creation', () => { + it('creates a span', () => { + const message: PubsubMessage = {}; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo' + ) as trace.Span; + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.notStrictEqual(spans.length, 0); + const exportedSpan = spans.concat().pop()!; + + assert.strictEqual(exportedSpan.name, 'topicfoo create'); + assert.strictEqual(exportedSpan.kind, SpanKind.PRODUCER); + }); + + it('injects a trace context', () => { + const message: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo' + ) as trace.Span; + + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern); + + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.modernAttributeName + ), + true + ); + }); }); - it('should issue a warning if OpenTelemetry span context key is set', () => { - const message: PubsubMessage = { - attributes: { - [otel.legacyAttributeName]: 'foobar', - [otel.modernAttributeName]: 'bazbar', - }, - }; - const span = otel.PubsubSpans.createPublisherSpan(message, 'test topic'); + describe('context propagation', () => { + it('injects a trace context and legacy baggage', () => { + const message: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo' + ); - const warnSpy = sinon.spy(console, 'warn'); - try { otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); - assert.strictEqual(warnSpy.callCount, 2); - } finally { - warnSpy.restore(); - } + + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.modernAttributeName + ), + true + ); + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.legacyAttributeName + ), + true + ); + }); + + it('should issue a warning if OpenTelemetry span context key is set', () => { + const message: PubsubMessage = { + attributes: { + [otel.legacyAttributeName]: 'foobar', + [otel.modernAttributeName]: 'bazbar', + }, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo' + ); + + const warnSpy = sinon.spy(console, 'warn'); + try { + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); + assert.strictEqual(warnSpy.callCount, 2); + } finally { + warnSpy.restore(); + } + }); + + it('should be able to determine if attributes are present', () => { + let message: otel.MessageWithAttributes = { + attributes: { + [otel.legacyAttributeName]: 'foobar', + }, + }; + assert.strictEqual(otel.containsSpanContext(message), true); + + message = { + attributes: { + [otel.modernAttributeName]: 'foobar', + }, + }; + assert.strictEqual(otel.containsSpanContext(message), true); + + message = {}; + assert.strictEqual(otel.containsSpanContext(message), false); + }); + + it('extracts a trace context', () => { + const message = { + attributes: { + [otel.modernAttributeName]: + '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'projects/test/subscriptions/subfoo', + otel.OpenTelemetryLevel.Modern + ); + assert.strictEqual( + childSpan!.spanContext().traceId, + 'd4cda95b652f4a1592b449d5929fda1b' + ); + }); }); - it('should be able to determine if attributes are present', () => { - let message: otel.MessageWithAttributes = { - attributes: { - [otel.legacyAttributeName]: 'foobar', - }, - }; - assert.strictEqual(otel.containsSpanContext(message), true); + describe('attribute creation', () => { + it('creates attributes for publish', () => { + const topicInfo: otel.AttributeParams = { + topicName: 'projects/foo/topics/top', + topicId: 'top', + projectId: 'foo', + }; + const message: PubsubMessage = { + data: Buffer.from('test'), + attributes: {}, + calculatedSize: 1234, + orderingKey: 'key', + isExactlyOnceDelivery: true, + ackId: 'ackack', + }; - message = { - attributes: { - [otel.modernAttributeName]: 'foobar', - }, - }; - assert.strictEqual(otel.containsSpanContext(message), true); + const topicAttrs = otel.PubsubSpans.createAttributes(topicInfo, message); + assert.deepStrictEqual(topicAttrs, { + 'messaging.system': 'gcp_pubsub', + 'messaging.destination.name': topicInfo.topicId, + 'gcp.project_id': topicInfo.projectId, + 'messaging.message.envelope.size': message.calculatedSize, + 'messaging.gcp_pubsub.message.ordering_key': message.orderingKey, + 'messaging.gcp_pubsub.message.exactly_once_delivery': + message.isExactlyOnceDelivery, + 'messaging.gcp_pubsub.message.ack_id': message.ackId, + }); + + // Check again with no calculated size and other parameters missing. + delete message.calculatedSize; + delete message.orderingKey; + delete message.isExactlyOnceDelivery; + delete message.ackId; - message = {}; - assert.strictEqual(otel.containsSpanContext(message), false); + const topicAttrs2 = otel.PubsubSpans.createAttributes(topicInfo, message); + assert.deepStrictEqual(topicAttrs2, { + 'messaging.system': 'gcp_pubsub', + 'messaging.destination.name': topicInfo.topicId, + 'gcp.project_id': topicInfo.projectId, + 'messaging.message.envelope.size': message.data?.length, + }); + }); }); - it('extracts a trace context', () => { - const message = { - attributes: { - [otel.modernAttributeName]: - '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', - }, + describe('specialized span creation', () => { + const tests = { + topicInfo: { + topicName: 'projects/foo/topics/top', + topicId: 'top', + projectId: 'foo', + } as otel.AttributeParams, + subInfo: { + subName: 'projects/foo/subscriptions/sub', + subId: 'sub', + projectId: 'foo', + } as otel.AttributeParams, + message: { + data: Buffer.from('test'), + attributes: {}, + calculatedSize: 1234, + orderingKey: 'key', + isExactlyOnceDelivery: true, + ackId: 'ackack', + } as PubsubMessage, }; - const childSpan = otel.extractSpan( - message, - 'test sub', - otel.OpenTelemetryLevel.Modern - ); - assert.strictEqual( - childSpan!.spanContext().traceId, - 'd4cda95b652f4a1592b449d5929fda1b' - ); + it('creates publisher spans', () => { + const span = otel.PubsubSpans.createPublisherSpan( + tests.message, + tests.topicInfo.topicName! + ); + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1); + + const firstSpan = spans.pop(); + assert.ok(firstSpan); + assert.strictEqual(firstSpan.name, `${tests.topicInfo.topicId} create`); + assert.strictEqual( + firstSpan.attributes['messaging.destination.name'], + tests.topicInfo.topicId + ); + assert.strictEqual( + firstSpan.attributes['messaging.system'], + 'gcp_pubsub' + ); + }); + + it('updates publisher topic names', () => { + const span = otel.PubsubSpans.createPublisherSpan( + tests.message, + tests.topicInfo.topicName! + ); + otel.PubsubSpans.updatePublisherTopicName( + span, + 'projects/foo/topics/other' + ); + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1); + + const firstSpan = spans.pop(); + assert.ok(firstSpan); + assert.strictEqual(firstSpan.name, 'other create'); + + assert.strictEqual( + firstSpan.attributes['messaging.destination.name'], + 'other' + ); + }); + + it('creates receive spans', () => { + const parentSpan = otel.PubsubSpans.createPublisherSpan( + tests.message, + tests.topicInfo.topicName! + ); + const span = otel.PubsubSpans.createReceiveSpan( + tests.message, + tests.subInfo.subName!, + otel.spanContextToContext(parentSpan.spanContext()) + ); + span.end(); + parentSpan.end(); + + const spans = exporter.getFinishedSpans(); + const parentReadSpan = spans.pop(); + const childReadSpan = spans.pop(); + assert.ok(parentReadSpan && childReadSpan); + + assert.strictEqual(childReadSpan.name, 'sub subscribe'); + assert.strictEqual( + childReadSpan.attributes['messaging.destination.name'], + 'sub' + ); + assert.strictEqual(childReadSpan.kind, SpanKind.CONSUMER); + assert.ok(childReadSpan.parentSpanId); + }); }); });