Skip to content

Commit

Permalink
feat(propagation-utils): end pub-sub process span on promise settled (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#1055)

Co-authored-by: Rauno Viskus <Rauno56@users.noreply.github.com>
Co-authored-by: Rauno Viskus <rauno56@gmail.com>
Co-authored-by: Valentin Marchaud <contact@vmarchaud.fr>
  • Loading branch information
4 people authored Sep 24, 2022
1 parent 227ae20 commit cb83d30
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 2 deletions.
10 changes: 10 additions & 0 deletions packages/opentelemetry-propagation-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"precompile": "tsc --version && lerna run version:update --scope @opentelemetry/propagation-utils --include-dependencies",
"prepare": "npm run compile",
"prewatch": "npm run precompile",
"tdd": "npm run test -- --watch-extensions ts --watch",
"test": "nyc ts-mocha -p tsconfig.json 'test/**/*.test.ts'",
"watch": "tsc --build --watch tsconfig.json tsconfig.esm.json"
},
"repository": "open-telemetry/opentelemetry-js-contrib",
Expand Down Expand Up @@ -43,8 +45,16 @@
},
"devDependencies": {
"@opentelemetry/api": "^1.0.0",
"@opentelemetry/contrib-test-utils": "^0.31.0",
"@types/mocha": "^9.1.1",
"@types/node": "16.11.21",
"@types/sinon": "^10.0.11",
"expect": "27.4.2",
"gts": "3.1.0",
"mocha": "7.2.0",
"nyc": "15.1.0",
"sinon": "13.0.1",
"ts-mocha": "10.0.0",
"typescript": "4.3.5"
}
}
17 changes: 15 additions & 2 deletions packages/opentelemetry-propagation-utils/src/pubsub-propagation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ const patchArrayFilter = (
});
};

function isPromise(value: unknown): value is Promise<unknown> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return typeof (value as any)?.then === 'function';
}

const patchArrayFunction = (
messages: OtelProcessedMessage[],
functionName: 'forEach' | 'map',
Expand All @@ -77,10 +82,18 @@ const patchArrayFunction = (
if (!messageSpan) return callback.apply(this, callbackArgs);

const res = context.with(trace.setSpan(loopContext, messageSpan), () => {
let result: Promise<unknown> | unknown;
try {
return callback.apply(this, callbackArgs);
result = callback.apply(this, callbackArgs);
if (isPromise(result)) {
const endSpan = () => message[END_SPAN_FUNCTION]?.();
result.then(endSpan, endSpan);
}
return result;
} finally {
message[END_SPAN_FUNCTION]?.();
if (!isPromise(result)) {
message[END_SPAN_FUNCTION]?.();
}
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import utils from '../src/pubsub-propagation';
import {
getTestSpans,
registerInstrumentationTestingProvider,
resetMemoryExporter,
} from '@opentelemetry/contrib-test-utils';
import { ROOT_CONTEXT, trace } from '@opentelemetry/api';
import * as expect from 'expect';

registerInstrumentationTestingProvider();

const tracer = trace.getTracer('test');

afterEach(() => {
resetMemoryExporter();
});

describe('Pubsub propagation', () => {
it('Span ends immediately when the function returns a non-promise', () => {
const messages = [{}];
utils.patchMessagesArrayToStartProcessSpans({
messages,
tracer,
parentContext: ROOT_CONTEXT,
messageToSpanDetails: () => ({
name: 'test',
parentContext: ROOT_CONTEXT,
attributes: {},
}),
});
utils.patchArrayForProcessSpans(messages, tracer, ROOT_CONTEXT);

expect(getTestSpans().length).toBe(0);

messages.map(x => x);

expect(getTestSpans().length).toBe(1);
expect(getTestSpans()[0]).toMatchObject({ name: 'test process' });
});

it('Span ends on promise-resolution', () => {
const messages = [{}];
utils.patchMessagesArrayToStartProcessSpans({
messages,
tracer,
parentContext: ROOT_CONTEXT,
messageToSpanDetails: () => ({
name: 'test',
parentContext: ROOT_CONTEXT,
attributes: {},
}),
});
utils.patchArrayForProcessSpans(messages, tracer, ROOT_CONTEXT);

expect(getTestSpans().length).toBe(0);

let resolve: (value: unknown) => void;

messages.map(
() =>
new Promise(res => {
resolve = res;
})
);

expect(getTestSpans().length).toBe(0);

// @ts-expect-error Typescript thinks this value is used before assignment
resolve(undefined);

// We use setTimeout here to make sure our assertations run
// after the promise resolves
return new Promise(res => setTimeout(res, 0)).then(() => {
expect(getTestSpans().length).toBe(1);
expect(getTestSpans()[0]).toMatchObject({ name: 'test process' });
});
});

it('Span ends on promise-rejection', () => {
const messages = [{}];
utils.patchMessagesArrayToStartProcessSpans({
messages,
tracer,
parentContext: ROOT_CONTEXT,
messageToSpanDetails: () => ({
name: 'test',
parentContext: ROOT_CONTEXT,
attributes: {},
}),
});
utils.patchArrayForProcessSpans(messages, tracer, ROOT_CONTEXT);

expect(getTestSpans().length).toBe(0);

let reject: (value: unknown) => void;

messages.map(
() =>
new Promise((_, rej) => {
reject = rej;
})
);

expect(getTestSpans().length).toBe(0);

// @ts-expect-error Typescript thinks this value is used before assignment
reject(new Error('Failed'));

// We use setTimeout here to make sure our assertations run
// after the promise resolves
return new Promise(res => setTimeout(res, 0)).then(() => {
expect(getTestSpans().length).toBe(1);
expect(getTestSpans()[0]).toMatchObject({ name: 'test process' });
});
});
});

0 comments on commit cb83d30

Please sign in to comment.