diff --git a/CHANGELOG.md b/CHANGELOG.md index 77e1b651014..53a56a9946f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file. ### :rocket: (Enhancement) +* feat: allow Resource to be created with some attributes resolving asynchronously [#2933](https://github.com/open-telemetry/opentelemetry-js/pull/2933) @aabmass + ### :bug: (Bug Fix) * fix(resources): fix browser compatibility for host and os detectors [#3004](https://github.com/open-telemetry/opentelemetry-js/pull/3004) @legendecas diff --git a/experimental/packages/opentelemetry-sdk-node/src/sdk.ts b/experimental/packages/opentelemetry-sdk-node/src/sdk.ts index 6f9ddb1d23c..46bd2d8b962 100644 --- a/experimental/packages/opentelemetry-sdk-node/src/sdk.ts +++ b/experimental/packages/opentelemetry-sdk-node/src/sdk.ts @@ -24,7 +24,7 @@ import { } from '@opentelemetry/instrumentation'; import { NodeTracerConfig, NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; import { - detectResources, + detectResourcesSync, envDetector, processDetector, Resource, @@ -113,13 +113,13 @@ export class NodeSDK { } /** Detect resource attributes */ - public async detectResources(config?: ResourceDetectionConfig): Promise { + public detectResources(config?: ResourceDetectionConfig): void { const internalConfig: ResourceDetectionConfig = { detectors: [ envDetector, processDetector], ...config, }; - this.addResource(await detectResources(internalConfig)); + this.addResource(detectResourcesSync(internalConfig)); } /** Manually add a resource */ @@ -130,9 +130,9 @@ export class NodeSDK { /** * Once the SDK has been configured, call this method to construct SDK components and register them with the OpenTelemetry API. */ - public async start(): Promise { + public start(): void { if (this._autoDetectResources) { - await this.detectResources(); + this.detectResources(); } if (this._tracerProviderConfig) { diff --git a/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts b/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts index 14b362209c0..f0c5393e5da 100644 --- a/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts +++ b/experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts @@ -70,7 +70,7 @@ describe('Node SDK', () => { autoDetectResources: false, }); - await sdk.start(); + sdk.start(); assert.strictEqual(context['_getContextManager'](), ctxManager, 'context manager should not change'); assert.strictEqual(propagation['_getGlobalPropagator'](), propagator, 'propagator should not change'); @@ -85,7 +85,7 @@ describe('Node SDK', () => { autoDetectResources: false, }); - await sdk.start(); + sdk.start(); assert.ok(metrics.getMeterProvider() instanceof NoopMeterProvider); @@ -108,7 +108,7 @@ describe('Node SDK', () => { autoDetectResources: false, }); - await sdk.start(); + sdk.start(); assert.ok(metrics.getMeterProvider() instanceof NoopMeterProvider); @@ -135,7 +135,7 @@ describe('Node SDK', () => { autoDetectResources: false, }); - await sdk.start(); + sdk.start(); assert.strictEqual(context['_getContextManager'](), ctxManager, 'context manager should not change'); assert.strictEqual(propagation['_getGlobalPropagator'](), propagator, 'propagator should not change'); @@ -162,7 +162,7 @@ describe('Node SDK', () => { const sdk = new NodeSDK({ autoDetectResources: true, }); - await sdk.detectResources({ + sdk.detectResources({ detectors: [ processDetector, { detect() { throw new Error('Buggy detector'); @@ -171,6 +171,7 @@ describe('Node SDK', () => { envDetector ] }); const resource = sdk['_resource']; + await resource.waitForAsyncAttributes(); assertServiceResource(resource, { instanceId: '627cc493', @@ -217,7 +218,8 @@ describe('Node SDK', () => { DiagLogLevel.VERBOSE ); - await sdk.detectResources(); + sdk.detectResources(); + await sdk['_resource'].waitForAsyncAttributes().catch(() => {}); // Test that the Env Detector successfully found its resource and populated it with the right values. assert.ok( @@ -249,7 +251,7 @@ describe('Node SDK', () => { DiagLogLevel.DEBUG ); - await sdk.detectResources(); + sdk.detectResources(); assert.ok( callArgsContains( diff --git a/packages/opentelemetry-resources/src/Resource.ts b/packages/opentelemetry-resources/src/Resource.ts index ac368d812ee..6d44d538cea 100644 --- a/packages/opentelemetry-resources/src/Resource.ts +++ b/packages/opentelemetry-resources/src/Resource.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import { diag } from '@opentelemetry/api'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; import { SDK_INFO } from '@opentelemetry/core'; import { ResourceAttributes } from './types'; @@ -25,6 +26,10 @@ import { defaultServiceName } from './platform'; */ export class Resource { static readonly EMPTY = new Resource({}); + private _attributes: ResourceAttributes; + private asyncAttributesPromise: Promise | undefined; + + private _asyncAttributesHaveResolved: boolean; /** * Returns an empty Resource @@ -34,7 +39,7 @@ export class Resource { } /** - * Returns a Resource that indentifies the SDK in use. + * Returns a Resource that indentifies the SDK in use. */ static default(): Resource { return new Resource({ @@ -54,8 +59,50 @@ export class Resource { * information about the entity as numbers, strings or booleans * TODO: Consider to add check/validation on attributes. */ - readonly attributes: ResourceAttributes - ) {} + attributes: ResourceAttributes, + asyncAttributesPromise?: Promise, + ) { + this._attributes = attributes; + this._asyncAttributesHaveResolved = asyncAttributesPromise === undefined; + this.asyncAttributesPromise = asyncAttributesPromise?.then( + asyncAttributes => { + this._attributes = Object.assign({}, this._attributes, asyncAttributes); + this._asyncAttributesHaveResolved = true; + return asyncAttributes; + } + ); + this.asyncAttributesPromise?.catch(err => { + diag.debug("The resource's async promise rejected: %s", err); + this._asyncAttributesHaveResolved = true; + return {}; + }); + } + + get attributes(): ResourceAttributes { + return this._attributes; + } + + /** + * Check if async attributes have resolved. This is useful to avoid awaiting + * waitForAsyncAttributes (which will introduce asynchronous behavior) when not necessary. + * + * @returns true if no async attributes promise was provided or if the promise has resolved + * and been merged together with the sync attributes. + */ + asyncAttributesHaveResolved(): boolean { + return this._asyncAttributesHaveResolved; + } + + /** + * Returns a promise that resolves when all async attributes have finished being added to + * this Resource's attributes. This is useful in exporters to block until resource detection + * has finished. + */ + async waitForAsyncAttributes(): Promise { + if (!this._asyncAttributesHaveResolved) { + await this.asyncAttributesPromise; + } + } /** * Returns a new, merged {@link Resource} by merging the current Resource @@ -66,7 +113,7 @@ export class Resource { * @returns the newly merged Resource. */ merge(other: Resource | null): Resource { - if (!other || !Object.keys(other.attributes).length) return this; + if (!other) return this; // SpanAttributes from resource overwrite attributes from other resource. const mergedAttributes = Object.assign( @@ -74,6 +121,22 @@ export class Resource { this.attributes, other.attributes ); - return new Resource(mergedAttributes); + + let mergedAsyncAttributesPromise: Promise | undefined; + if (this.asyncAttributesPromise && other.asyncAttributesPromise) { + mergedAsyncAttributesPromise = Promise.all([ + this.asyncAttributesPromise.catch(() => ({})), + other.asyncAttributesPromise.catch(() => ({})), + ]).then( + ([thisAttributes, otherAttributes]) => { + return Object.assign({}, thisAttributes, otherAttributes); + } + ); + } else { + mergedAsyncAttributesPromise = this.asyncAttributesPromise ?? other.asyncAttributesPromise; + } + + + return new Resource(mergedAttributes, mergedAsyncAttributesPromise); } } diff --git a/packages/opentelemetry-resources/src/platform/browser/detect-resources.ts b/packages/opentelemetry-resources/src/platform/browser/detect-resources.ts index 457965fadfa..919c3c6d5d9 100644 --- a/packages/opentelemetry-resources/src/platform/browser/detect-resources.ts +++ b/packages/opentelemetry-resources/src/platform/browser/detect-resources.ts @@ -19,9 +19,11 @@ import { ResourceDetectionConfig } from '../../config'; import { diag } from '@opentelemetry/api'; /** - * Runs all resource detectors and returns the results merged into a single - * Resource. + * Runs all resource detectors and returns the results merged into a single Resource. Promise + * does not resolve until all of the underlying detectors have resolved, unlike + * detectResourcesSync. * + * @deprecated use detectResourceSync() instead. * @param config Configuration for resource detection */ export const detectResources = async ( @@ -47,3 +49,51 @@ export const detectResources = async ( Resource.empty() ); }; + +/** + * Runs all resource detectors synchronously, merging their results. Any asynchronous + * attributes will be merged together in-order after they resolve. + * + * @param config Configuration for resource detection + */ +export const detectResourcesSync = ( + config: ResourceDetectionConfig = {} +): Resource => { + const internalConfig: ResourceDetectionConfig = Object.assign(config); + + const resources: Resource[] = (internalConfig.detectors ?? []).map(d => { + try { + const resourceOrPromise = d.detect(internalConfig); + let resource: Resource; + if (resourceOrPromise instanceof Promise) { + diag.info('Resource detector %s should return a Resource directly instead of a promise.', d.constructor.name); + const createPromise = async () => { + const resolved = await resourceOrPromise; + await resolved.waitForAsyncAttributes(); + return resolved.attributes; + }; + resource = new Resource({}, createPromise()); + } else { + resource = resourceOrPromise; + } + + resource.waitForAsyncAttributes().then(() => { + diag.debug(`${d.constructor.name} found resource.`, resource); + }).catch(e => { + diag.debug(`${d.constructor.name} failed: ${e.message}`); + }); + + return resource; + } catch (e) { + diag.debug(`${d.constructor.name} failed: ${e.message}`); + return Resource.empty(); + } + }); + + + const mergedResources = resources.reduce( + (acc, resource) => acc.merge(resource), + Resource.empty() + ); + return mergedResources; +}; diff --git a/packages/opentelemetry-resources/src/platform/node/detect-resources.ts b/packages/opentelemetry-resources/src/platform/node/detect-resources.ts index 35bf32b4cdd..a91418d957d 100644 --- a/packages/opentelemetry-resources/src/platform/node/detect-resources.ts +++ b/packages/opentelemetry-resources/src/platform/node/detect-resources.ts @@ -20,9 +20,11 @@ import { diag } from '@opentelemetry/api'; import * as util from 'util'; /** - * Runs all resource detectors and returns the results merged into a single - * Resource. + * Runs all resource detectors and returns the results merged into a single Resource. Promise + * does not resolve until all of the underlying detectors have resolved, unlike + * detectResourcesSync. * + * @deprecated use detectResourceSync() instead. * @param config Configuration for resource detection */ export const detectResources = async ( @@ -52,6 +54,57 @@ export const detectResources = async ( ); }; +/** + * Runs all resource detectors synchronously, merging their results. Any asynchronous + * attributes will be merged together in-order after they resolve. + * + * @param config Configuration for resource detection + */ +export const detectResourcesSync = ( + config: ResourceDetectionConfig = {} +): Resource => { + const internalConfig: ResourceDetectionConfig = Object.assign(config); + + const resources: Resource[] = (internalConfig.detectors ?? []).map(d => { + try { + const resourceOrPromise = d.detect(internalConfig); + let resource: Resource; + if (resourceOrPromise instanceof Promise) { + diag.info('Resource detector %s should return a Resource directly instead of a promise.', d.constructor.name); + const createPromise = async () => { + const resolved = await resourceOrPromise; + await resolved.waitForAsyncAttributes(); + return resolved.attributes; + }; + resource = new Resource({}, createPromise()); + } else { + resource = resourceOrPromise; + } + + resource.waitForAsyncAttributes().then(() => { + diag.debug(`${d.constructor.name} found resource.`, resource); + }).catch(e => { + diag.debug(`${d.constructor.name} failed: ${e.message}`); + }); + + return resource; + } catch (e) { + diag.debug(`${d.constructor.name} failed: ${e.message}`); + return Resource.empty(); + } + }); + + + const mergedResources = resources.reduce( + (acc, resource) => acc.merge(resource), + Resource.empty() + ); + void mergedResources.waitForAsyncAttributes().then(() => { + // Future check if verbose logging is enabled issue #1903 + logResources(resources); + }); + return mergedResources; +}; /** * Writes debug information about the detected resources to the logger defined in the resource detection config, if one is provided. diff --git a/packages/opentelemetry-resources/src/types.ts b/packages/opentelemetry-resources/src/types.ts index 717f71381d1..bbe195ceaec 100644 --- a/packages/opentelemetry-resources/src/types.ts +++ b/packages/opentelemetry-resources/src/types.ts @@ -26,9 +26,10 @@ import { SpanAttributes } from '@opentelemetry/api'; export type ResourceAttributes = SpanAttributes; /** - * Interface for a Resource Detector. In order to detect resources in parallel - * a detector returns a Promise containing a Resource. + * Interface for a Resource Detector. In order to detect resources asynchronously, a detector + * can pass a Promise as the second parameter to the Resource constructor. Returning a + * Promise is deprecated in favor of this approach. */ export interface Detector { - detect(config?: ResourceDetectionConfig): Promise; + detect(config?: ResourceDetectionConfig): Promise | Resource; } diff --git a/packages/opentelemetry-resources/test/Resource.test.ts b/packages/opentelemetry-resources/test/Resource.test.ts index c3780591d68..e0ed310b9de 100644 --- a/packages/opentelemetry-resources/test/Resource.test.ts +++ b/packages/opentelemetry-resources/test/Resource.test.ts @@ -14,11 +14,13 @@ * limitations under the License. */ +import * as sinon from 'sinon'; import * as assert from 'assert'; import { SDK_INFO } from '@opentelemetry/core'; import { Resource } from '../src'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; import { describeBrowser, describeNode } from './util'; +import {diag} from '@opentelemetry/api'; describe('Resource', () => { const resource1 = new Resource({ @@ -99,6 +101,99 @@ describe('Resource', () => { it('should return the same empty resource', () => { assert.strictEqual(Resource.empty(), Resource.empty()); }); + + it('should return true for asyncAttributesHaveResolved() immediately', () => { + assert.ok(Resource.empty().asyncAttributesHaveResolved()); + }); + }); + + describe('asynchronous attributes', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should return true for asyncAttributesHaveResolved() if no promise provided', () => { + assert.ok(new Resource({'foo': 'bar'}).asyncAttributesHaveResolved()); + assert.ok(Resource.empty().asyncAttributesHaveResolved()); + assert.ok(Resource.default().asyncAttributesHaveResolved()); + }); + + it('should return true for asyncAttributesHaveResolved() once promise finishes', async () => { + const clock = sinon.useFakeTimers(); + const resourceResolve = new Resource({}, new Promise(resolve => { + setTimeout(resolve, 100); + })); + const resourceReject = new Resource({}, new Promise((_, reject) => { + setTimeout(reject, 200); + })); + + for (const resource of [resourceResolve, resourceReject]) { + assert.ok(!resource.asyncAttributesHaveResolved()); + await clock.nextAsync(); + await resource.waitForAsyncAttributes(); + assert.ok(resource.asyncAttributesHaveResolved()); + } + }); + + it('should merge async attributes into sync attributes once resolved', async () => { + const resource = new Resource( + {'sync': 'fromsync', 'shared': 'fromsync'}, + Promise.resolve({'async': 'fromasync', 'shared': 'fromasync'}), + ); + + await resource.waitForAsyncAttributes(); + assert.deepStrictEqual( + resource.attributes, + { + 'sync': 'fromsync', + // async takes precedence + 'shared': 'fromasync', + 'async': 'fromasync', + }, + ); + }); + + it('should merge async attributes when both resources have promises', async () => { + const resource1 = new Resource( + {}, Promise.resolve({'promise1': 'promise1val', 'shared': 'promise1val'}), + ); + const resource2 = new Resource( + {}, Promise.resolve({'promise2': 'promise2val', 'shared': 'promise2val'}), + ); + // this one rejects + const resource3 = new Resource( + {}, Promise.reject(new Error('reject')), + ); + const resource4 = new Resource( + {}, Promise.resolve({'promise4': 'promise4val', 'shared': 'promise4val'}), + ); + + const merged = resource1.merge(resource2).merge(resource3).merge(resource4); + await merged.waitForAsyncAttributes(); + assert.deepStrictEqual( + merged.attributes, + { + 'promise1': 'promise1val', + 'promise2': 'promise2val', + 'promise4': 'promise4val', + // same behavior as for synchronous attributes + 'shared': 'promise4val', + } + ); + }); + + it('should log when promise rejects', async () => { + const debugStub = sinon.spy(diag, 'debug'); + // should be possible to catch failure with waitForAsyncAttributes() + try { + await assert.rejects( + new Resource({}, Promise.reject(new Error('rejected'))).waitForAsyncAttributes(), + ); + } catch(err) {} + // will log after yielding to event loop + await new Promise(resolve => setTimeout(resolve)); + assert.ok(debugStub.calledOnce); + }); }); describeNode('.default()', () => { diff --git a/packages/opentelemetry-resources/test/detect-resources.test.ts b/packages/opentelemetry-resources/test/detect-resources.test.ts new file mode 100644 index 00000000000..c074ad4419f --- /dev/null +++ b/packages/opentelemetry-resources/test/detect-resources.test.ts @@ -0,0 +1,106 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { diag } from '@opentelemetry/api'; +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { Resource, Detector, detectResourcesSync } from '../src'; +import { describeNode } from './util'; + +describe('detectResourcesSync', () => { + afterEach(() => { + sinon.restore(); + }); + + it('handles resource detectors which return Promise', async () => { + const detector: Detector = { + async detect() { + return new Resource( + { 'sync': 'fromsync', 'async': 'fromasync' }, + ); + }, + }; + const resource = detectResourcesSync({ + detectors: [detector], + }); + + await resource.waitForAsyncAttributes(); + assert.deepStrictEqual(resource.attributes, {'sync': 'fromsync', 'async': 'fromasync'}); + }); + + it('handles resource detectors which return Resource with a promise inside', async () => { + const detector: Detector = { + detect() { + return new Resource( + { 'sync': 'fromsync' }, + Promise.resolve({ 'async': 'fromasync'}) + ); + }, + }; + const resource = detectResourcesSync({ + detectors: [detector], + }); + + // before waiting, it should already have the sync resources + assert.deepStrictEqual(resource.attributes, {'sync': 'fromsync'}); + await resource.waitForAsyncAttributes(); + assert.deepStrictEqual(resource.attributes, {'sync': 'fromsync', 'async': 'fromasync'}); + }); + + describeNode('logging', () => { + it("logs when a detector's async attributes promise rejects or resolves", async () => { + const debugStub = sinon.spy(diag, 'debug'); + + // use a class so it has a name + class DetectorRejects implements Detector { + detect() { + return new Resource( + { 'sync': 'fromsync' }, + Promise.reject(new Error('reject')), + ); + } + } + class DetectorOk implements Detector { + detect() { + return new Resource( + { 'sync': 'fromsync' }, + Promise.resolve({'async': 'fromasync'}), + ); + } + } + class DetectorAsync implements Detector { + async detect() { + return new Resource( + { 'sync': 'fromsync', 'async': 'fromasync' }, + ); + } + } + + const resource = detectResourcesSync({ + detectors: [ + new DetectorRejects(), + new DetectorOk(), + new DetectorAsync(), + ], + }); + + await assert.doesNotReject(resource.waitForAsyncAttributes()); + assert.ok(debugStub.calledWithMatch('DetectorRejects failed: reject')); + assert.ok(debugStub.calledWithMatch('DetectorOk found resource.')); + assert.ok(debugStub.calledWithMatch('DetectorAsync found resource.')); + }); + }); +}); diff --git a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts index e6417d84afc..dd2e139e831 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/BatchSpanProcessorBase.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { context, Context, TraceFlags } from '@opentelemetry/api'; +import { context, Context, diag, TraceFlags } from '@opentelemetry/api'; import { BindOnceFuture, ExportResultCode, @@ -152,8 +152,10 @@ export abstract class BatchSpanProcessorBase implements // Reset the finished spans buffer here because the next invocations of the _flush method // could pass the same finished spans to the exporter if the buffer is cleared // outside of the execution of this callback. - this._exporter.export( - this._finishedSpans.splice(0, this._maxExportBatchSize), + const spans = this._finishedSpans.splice(0, this._maxExportBatchSize); + + const doExport = () => this._exporter.export( + spans, result => { clearTimeout(timer); if (result.code === ExportResultCode.SUCCESS) { @@ -166,6 +168,17 @@ export abstract class BatchSpanProcessorBase implements } } ); + + const pendingResources = spans.map(span => span.resource) + .filter(resource => !resource.asyncAttributesHaveResolved()); + + // Avoid scheduling a promise to make the behavior more predictable and easier to test + if (pendingResources.length === 0) { + doExport(); + } else { + Promise.all(pendingResources.map(resource => resource.waitForAsyncAttributes())) + .then(doExport, err => diag.debug('Error while resolving async portion of resource: ', err)); + } }); }); } diff --git a/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts b/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts index c775bdf6d4c..f70616e38ec 100644 --- a/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts +++ b/packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { context, Context, TraceFlags } from '@opentelemetry/api'; +import { context, Context, diag, TraceFlags } from '@opentelemetry/api'; import { ExportResultCode, globalErrorHandler, @@ -58,7 +58,7 @@ export class SimpleSpanProcessor implements SpanProcessor { // prevent downstream exporter calls from generating spans context.with(suppressTracing(context.active()), () => { - this._exporter.export([span], result => { + const doExport = () => this._exporter.export([span], result => { if (result.code !== ExportResultCode.SUCCESS) { globalErrorHandler( result.error ?? @@ -68,7 +68,16 @@ export class SimpleSpanProcessor implements SpanProcessor { ); } }); + + // Avoid scheduling a promise to make the behavior more predictable and easier to test + if (span.resource.asyncAttributesHaveResolved()) { + doExport(); + } else { + span.resource.waitForAsyncAttributes() + .then(doExport, err => diag.debug('Error while resolving async portion of resource: ', err)); + } }); + } shutdown(): Promise {