Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: batch observer to be independent from metric types #1709

Merged
merged 6 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/metrics/metrics/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const cpuUsageMetric = meter.createValueObserver('cpu_usage_per_app', {
description: 'Example of sync value observer used with async batch observer',
});

meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
meter.createBatchObserver((observerBatchResult) => {
Promise.all([
someAsyncMetrics(),
// simulate waiting
Expand Down
13 changes: 5 additions & 8 deletions packages/opentelemetry-api/src/metrics/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import {
Counter,
ValueRecorder,
ValueObserver,
BatchObserver,
BatchMetricOptions,
BatchObserverOptions,
UpDownCounter,
SumObserver,
UpDownSumObserver,
Expand Down Expand Up @@ -108,15 +107,13 @@ export interface Meter {
): UpDownSumObserver;

/**
* Creates a new `BatchObserver` metric, can be used to update many metrics
* Creates a new `BatchObserver`, can be used to update many metrics
* at the same time and when operations needs to be async
* @param name the name of the metric.
* @param callback the batch observer callback
* @param [options] the metric batch options.
* @param [options] the batch observer options.
*/
createBatchObserver(
name: string,
callback: (batchObserverResult: BatchObserverResult) => void,
options?: BatchMetricOptions
): BatchObserver;
options?: BatchObserverOptions
): void;
}
10 changes: 6 additions & 4 deletions packages/opentelemetry-api/src/metrics/Metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ export interface MetricOptions {
boundaries?: number[];
}

export interface BatchMetricOptions extends MetricOptions {
export interface BatchObserverOptions {
/**
* Indicates how long the batch metric should wait to update before cancel
*/
maxTimeoutUpdateMS?: number;

/**
* User provided logger.
*/
logger?: Logger;
}

/** The Type of value. It describes how the data is reported. */
Expand Down Expand Up @@ -166,9 +171,6 @@ export type UpDownSumObserver = BaseObserver;
/** Base interface for the SumObserver metrics. */
export type SumObserver = BaseObserver;

/** Base interface for the Batch Observer metrics. */
export type BatchObserver = Metric;

/**
* key-value pairs passed by the user.
*/
Expand Down
12 changes: 4 additions & 8 deletions packages/opentelemetry-api/src/metrics/NoopMeter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
Counter,
ValueRecorder,
ValueObserver,
BatchObserver,
UpDownCounter,
BaseObserver,
UpDownSumObserver,
Expand Down Expand Up @@ -119,10 +118,9 @@ export class NoopMeter implements Meter {
* @param callback the batch observer callback
*/
createBatchObserver(
_name: string,
_callback: (batchObserverResult: BatchObserverResult) => void
): BatchObserver {
return NOOP_BATCH_OBSERVER_METRIC;
): NoopBatchObserver {
return NOOP_BATCH_OBSERVER;
}
}

Expand Down Expand Up @@ -187,9 +185,7 @@ export class NoopBaseObserverMetric
}
}

export class NoopBatchObserverMetric
extends NoopMetric<void>
implements BatchObserver {}
export class NoopBatchObserver {}

export class NoopBoundCounter implements BoundCounter {
add(_value: number): void {
Expand Down Expand Up @@ -229,4 +225,4 @@ export const NOOP_SUM_OBSERVER_METRIC = new NoopBaseObserverMetric(
NOOP_BOUND_BASE_OBSERVER
);

export const NOOP_BATCH_OBSERVER_METRIC = new NoopBatchObserverMetric();
export const NOOP_BATCH_OBSERVER = new NoopBatchObserver();
2 changes: 1 addition & 1 deletion packages/opentelemetry-metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ const MemUsageMetric = meter.createValueObserver('mem_usage_per_app', {
description: 'Memory',
});

meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
meter.createBatchObserver((observerBatchResult) => {
getSomeAsyncMetrics().then(metrics => {
observerBatchResult.observe({ app: 'myApp' }, [
cpuUsageMetric.observation(metrics.value1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,32 @@
*/

import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { Logger, NoopLogger } from '@opentelemetry/api';
import { BatchObserverResult } from './BatchObserverResult';
import { BoundObserver } from './BoundInstrument';
import { Processor } from './export/Processor';
import { MetricKind, MetricRecord } from './export/types';
import { Metric } from './Metric';
import { MetricRecord } from './export/types';

const NOOP_CALLBACK = () => {};
const MAX_TIMEOUT_UPDATE_MS = 500;

/** This is a SDK implementation of Batch Observer Metric. */
export class BatchObserverMetric
extends Metric<BoundObserver>
implements api.BatchObserver {
/** This is a SDK implementation of Batch Observer. */
export class BatchObserver {
private _callback: (observerResult: api.BatchObserverResult) => void;
private _maxTimeoutUpdateMS: number;
private _logger: Logger;

constructor(
name: string,
options: api.BatchMetricOptions,
private readonly _processor: Processor,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
options: api.BatchObserverOptions,
callback?: (observerResult: api.BatchObserverResult) => void
) {
super(
name,
options,
MetricKind.BATCH_OBSERVER,
resource,
instrumentationLibrary
);
this._logger = options.logger ?? new NoopLogger();
this._maxTimeoutUpdateMS =
options.maxTimeoutUpdateMS ?? MAX_TIMEOUT_UPDATE_MS;
this._callback = callback || NOOP_CALLBACK;
}

protected _makeInstrument(labels: api.Labels): BoundObserver {
return new BoundObserver(
labels,
this._disabled,
this._valueType,
this._logger,
this._processor.aggregatorFor(this._descriptor)
);
}

getMetricRecord(): Promise<MetricRecord[]> {
collect(): Promise<MetricRecord[]> {
this._logger.debug('getMetricRecord - start');
return new Promise((resolve, reject) => {
return new Promise(resolve => {
const observerResult = new BatchObserverResult();

// cancels after MAX_TIMEOUT_MS - no more waiting for results
Expand All @@ -74,14 +49,14 @@ export class BatchObserverMetric
// remove callback to prevent user from updating the values later if
// for any reason the observerBatchResult will be referenced
observerResult.onObserveCalled();
super.getMetricRecord().then(resolve, reject);
resolve();
this._logger.debug('getMetricRecord - timeout');
}, this._maxTimeoutUpdateMS);

// sets callback for each "observe" method
observerResult.onObserveCalled(() => {
clearTimeout(timer);
super.getMetricRecord().then(resolve, reject);
resolve();
this._logger.debug('getMetricRecord - end');
});

Expand Down
2 changes: 1 addition & 1 deletion packages/opentelemetry-metrics/src/BatchObserverResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class BatchObserverResult implements api.BatchObserverResult {
* Cancels the further updates.
* This is used to prevent updating the value of result that took too
* long to update. For example to avoid update after timeout.
* See {@link BatchObserverMetric.getMetricRecord}
* See {@link BatchObserver.collect}
*/
cancelled = false;

Expand Down
58 changes: 15 additions & 43 deletions packages/opentelemetry-metrics/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import * as api from '@opentelemetry/api';
import { ConsoleLogger, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BatchObserverMetric } from './BatchObserverMetric';
import { BatchObserver } from './BatchObserver';
import { BaseBoundInstrument } from './BoundInstrument';
import { Processor } from './export/Processor';
import { MetricKind } from './export/types';
import { UpDownCounterMetric } from './UpDownCounterMetric';
import { CounterMetric } from './CounterMetric';
import { UpDownSumObserverMetric } from './UpDownSumObserverMetric';
Expand All @@ -38,6 +37,7 @@ import { NoopExporter } from './export/NoopExporter';
*/
export class Meter implements api.Meter {
private readonly _logger: api.Logger;
private readonly _batchObservers: BatchObserver[] = [];
private readonly _metrics = new Map<string, Metric<BaseBoundInstrument>>();
private readonly _processor: Processor;
private readonly _resource: Resource;
Expand Down Expand Up @@ -258,36 +258,20 @@ export class Meter implements api.Meter {
}

/**
* Creates a new batch observer metric.
* @param name the name of the metric.
* Creates a new batch observer.
* @param callback the batch observer callback
* @param [options] the metric batch options.
* @param [options] the batch options.
*/
createBatchObserver(
name: string,
callback: (observerResult: api.BatchObserverResult) => void,
options: api.BatchMetricOptions = {}
): api.BatchObserver {
if (!this._isValidName(name)) {
this._logger.warn(
`Invalid metric name ${name}. Defaulting to noop metric implementation.`
);
return api.NOOP_BATCH_OBSERVER_METRIC;
}
const opt: api.BatchMetricOptions = {
options: api.BatchObserverOptions = {}
): BatchObserver {
const opt: api.BatchObserverOptions = {
logger: this._logger,
...DEFAULT_METRIC_OPTIONS,
...options,
};
const batchObserver = new BatchObserverMetric(
name,
opt,
this._processor,
this._resource,
this._instrumentationLibrary,
callback
);
this._registerMetric(name, batchObserver);
const batchObserver = new BatchObserver(opt, callback);
this._batchObservers.push(batchObserver);
return batchObserver;
}

Expand All @@ -300,27 +284,15 @@ export class Meter implements api.Meter {
*/
async collect(): Promise<void> {
// call batch observers first
const batchObservers = Array.from(this._metrics.values())
.filter(metric => {
return metric.getKind() === MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
});
await Promise.all(batchObservers).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._processor.process(metric));
});
const observations = this._batchObservers.map(observer => {
return observer.collect();
});
await Promise.all(observations);

// after this all remaining metrics can be run
const metrics = Array.from(this._metrics.values())
.filter(metric => {
return metric.getKind() !== MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
});
const metrics = Array.from(this._metrics.values()).map(metric => {
return metric.getMetricRecord();
});

await Promise.all(metrics).then(records => {
records.forEach(metrics => {
Expand Down
Loading