Skip to content

Commit

Permalink
fix(instrumentation-http): close server span when response finishes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas authored Dec 20, 2022
1 parent 2fb80eb commit faa0a33
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 67 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All notable changes to experimental packages in this project will be documented
* fix(instrumentation): add back support for absolute paths via `require-in-the-middle` [#3457](https://github.com/open-telemetry/opentelemetry-js/pull/3457) @mhassan1
* fix(prometheus-sanitization): replace repeated `_` with a single `_` [3470](https://github.com/open-telemetry/opentelemetry-js/pull/3470) @samimusallam
* fix(prometheus-serializer): correct string used for NaN [#3477](https://github.com/open-telemetry/opentelemetry-js/pull/3477) @JacksonWeber
* fix(instrumentation-http): close server span when response finishes [#3407](https://github.com/open-telemetry/opentelemetry-js/pull/3407) @legendecas

### :books: (Refine Doc)

Expand Down
131 changes: 68 additions & 63 deletions experimental/packages/opentelemetry-instrumentation-http/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import {
HttpInstrumentationConfig,
HttpRequestArgs,
Https,
ResponseEndArgs,
} from './types';
import * as utils from './utils';
import { VERSION } from './version';
Expand Down Expand Up @@ -488,7 +487,7 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
};

const startTime = hrTime();
let metricAttributes: MetricAttributes =
const metricAttributes =
utils.getIncomingRequestMetricAttributes(spanAttributes);

const ctx = propagation.extract(ROOT_CONTEXT, headers);
Expand Down Expand Up @@ -520,73 +519,29 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
header => request.headers[header]
);

// Wraps end (inspired by:
// https://github.com/GoogleCloudPlatform/cloud-trace-nodejs/blob/master/src/instrumentations/instrumentation-connect.ts#L75)
const originalEnd = response.end;
response.end = function (
this: http.ServerResponse,
..._args: ResponseEndArgs
) {
response.end = originalEnd;
// Cannot pass args of type ResponseEndArgs,
const returned = safeExecuteInTheMiddle(
() => response.end.apply(this, arguments as never),
error => {
if (error) {
utils.setSpanWithError(span, error);
instrumentation._closeHttpSpan(
span,
SpanKind.SERVER,
startTime,
metricAttributes
);
throw error;
}
}
);

const attributes = utils.getIncomingRequestAttributesOnResponse(
// After 'error', no further events other than 'close' should be emitted.
let hasError = false;
response.on('close', () => {
if (hasError) {
return;
}
instrumentation._onServerResponseFinish(
request,
response
);
metricAttributes = Object.assign(
metricAttributes,
utils.getIncomingRequestMetricAttributesOnResponse(attributes)
);

instrumentation._headerCapture.server.captureResponseHeaders(
response,
span,
header => response.getHeader(header)
metricAttributes,
startTime
);

span.setAttributes(attributes).setStatus({
code: utils.parseResponseStatus(
SpanKind.SERVER,
response.statusCode
),
});

if (instrumentation._getConfig().applyCustomAttributesOnSpan) {
safeExecuteInTheMiddle(
() =>
instrumentation._getConfig().applyCustomAttributesOnSpan!(
span,
request,
response
),
() => {},
true
);
}

instrumentation._closeHttpSpan(
});
response.on(errorMonitor, (err: Err) => {
hasError = true;
instrumentation._onServerResponseError(
span,
SpanKind.SERVER,
metricAttributes,
startTime,
metricAttributes
err
);
return returned;
};
});

return safeExecuteInTheMiddle(
() => original.apply(this, [event, ...args]),
Expand Down Expand Up @@ -741,6 +696,56 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
};
}

private _onServerResponseFinish(
request: http.IncomingMessage,
response: http.ServerResponse,
span: Span,
metricAttributes: MetricAttributes,
startTime: HrTime
) {
const attributes = utils.getIncomingRequestAttributesOnResponse(
request,
response
);
metricAttributes = Object.assign(
metricAttributes,
utils.getIncomingRequestMetricAttributesOnResponse(attributes)
);

this._headerCapture.server.captureResponseHeaders(span, header =>
response.getHeader(header)
);

span.setAttributes(attributes).setStatus({
code: utils.parseResponseStatus(SpanKind.SERVER, response.statusCode),
});

if (this._getConfig().applyCustomAttributesOnSpan) {
safeExecuteInTheMiddle(
() =>
this._getConfig().applyCustomAttributesOnSpan!(
span,
request,
response
),
() => {},
true
);
}

this._closeHttpSpan(span, SpanKind.SERVER, startTime, metricAttributes);
}

private _onServerResponseError(
span: Span,
metricAttributes: MetricAttributes,
startTime: HrTime,
error: Err
) {
utils.setSpanWithError(span, error);
this._closeHttpSpan(span, SpanKind.SERVER, startTime, metricAttributes);
}

private _startHttpSpan(
name: string,
options: SpanOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ export type Http = typeof http;
export type Https = typeof https;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type Func<T> = (...args: any[]) => T;
export type ResponseEndArgs =
| [((() => void) | undefined)?]
| [unknown, ((() => void) | undefined)?]
| [unknown, string, ((() => void) | undefined)?];

export interface HttpCustomAttributeFunction {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ export const responseHookFunction = (
response: IncomingMessage | ServerResponse
): void => {
span.setAttribute('custom response hook attribute', 'response');
// IncomingMessage (Readable) 'end'.
response.on('end', () => {
span.setAttribute('custom incoming message attribute', 'end');
});
// ServerResponse (writable) 'finish'.
response.on('finish', () => {
span.setAttribute('custom server response attribute', 'finish');
});
};

export const startIncomingSpanHookFunction = (
Expand Down Expand Up @@ -251,6 +259,16 @@ describe('HttpInstrumentation', () => {
});
instrumentation.enable();
server = http.createServer((request, response) => {
if (request.url?.includes('/premature-close')) {
response.destroy();
return;
}
if (request.url?.includes('/hang')) {
// write response headers.
response.write('');
// hang the request.
return;
}
if (request.url?.includes('/ignored')) {
provider.getTracer('test').startSpan('some-span').end();
}
Expand Down Expand Up @@ -771,6 +789,7 @@ describe('HttpInstrumentation', () => {
const spans = memoryExporter.getFinishedSpans();
const [incomingSpan, outgoingSpan] = spans;

// server request
assert.strictEqual(
incomingSpan.attributes['custom request hook attribute'],
'request'
Expand All @@ -779,12 +798,17 @@ describe('HttpInstrumentation', () => {
incomingSpan.attributes['custom response hook attribute'],
'response'
);
assert.strictEqual(
incomingSpan.attributes['custom server response attribute'],
'finish'
);
assert.strictEqual(incomingSpan.attributes['guid'], 'user_guid');
assert.strictEqual(
incomingSpan.attributes['span kind'],
SpanKind.CLIENT
);

// client request
assert.strictEqual(
outgoingSpan.attributes['custom request hook attribute'],
'request'
Expand All @@ -793,6 +817,10 @@ describe('HttpInstrumentation', () => {
outgoingSpan.attributes['custom response hook attribute'],
'response'
);
assert.strictEqual(
outgoingSpan.attributes['custom incoming message attribute'],
'end'
);
assert.strictEqual(outgoingSpan.attributes['guid'], 'user_guid');
assert.strictEqual(
outgoingSpan.attributes['span kind'],
Expand All @@ -815,6 +843,66 @@ describe('HttpInstrumentation', () => {
});
});
});

it('should have 2 ended span when client prematurely close', async () => {
const promise = new Promise<void>((resolve, reject) => {
const req = http.get(
`${protocol}://${hostname}:${serverPort}/hang`,
res => {
res.on('close', () => {});
}
);
// close the socket.
setTimeout(() => {
req.destroy();
}, 10);

req.on('error', reject);

req.on('close', () => {
// yield to server to end the span.
setTimeout(resolve, 10);
});
});

await promise;

const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 2);
const [serverSpan, clientSpan] = spans.sort(
(lhs, rhs) => lhs.kind - rhs.kind
);
assert.strictEqual(serverSpan.kind, SpanKind.SERVER);
assert.ok(Object.keys(serverSpan.attributes).length >= 6);

assert.strictEqual(clientSpan.kind, SpanKind.CLIENT);
assert.ok(Object.keys(clientSpan.attributes).length >= 6);
});

it('should have 2 ended span when server prematurely close', async () => {
const promise = new Promise<void>(resolve => {
const req = http.get(
`${protocol}://${hostname}:${serverPort}/premature-close`
);
req.on('error', err => {
resolve();
});
});

await promise;

const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 2);
const [serverSpan, clientSpan] = spans.sort(
(lhs, rhs) => lhs.kind - rhs.kind
);
assert.strictEqual(serverSpan.kind, SpanKind.SERVER);
assert.ok(Object.keys(serverSpan.attributes).length >= 6);

assert.strictEqual(clientSpan.kind, SpanKind.CLIENT);
assert.strictEqual(clientSpan.status.code, SpanStatusCode.ERROR);
assert.ok(Object.keys(clientSpan.attributes).length >= 6);
});
});

describe('with require parent span', () => {
Expand Down

0 comments on commit faa0a33

Please sign in to comment.