From 70e79c8768b325779c0bcd31dcd470a0a5c55a3a Mon Sep 17 00:00:00 2001 From: vmarchaud Date: Mon, 6 Apr 2020 16:12:20 +0200 Subject: [PATCH] feat(aggregators): implement histogram aggregator #927 --- .../src/export/ConsoleMetricExporter.ts | 9 +- .../src/export/aggregators/histogram.ts | 96 +++++++++++ .../src/export/aggregators/index.ts | 3 +- .../src/export/aggregators/measureexact.ts | 11 +- .../src/export/aggregators/observer.ts | 2 +- .../opentelemetry-metrics/src/export/types.ts | 10 +- .../test/export/aggregators/histogram.test.ts | 152 ++++++++++++++++++ 7 files changed, 264 insertions(+), 19 deletions(-) create mode 100644 packages/opentelemetry-metrics/src/export/aggregators/histogram.ts create mode 100644 packages/opentelemetry-metrics/test/export/aggregators/histogram.test.ts diff --git a/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts b/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts index 78e5efc069f..1b0ff2a717f 100644 --- a/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts +++ b/packages/opentelemetry-metrics/src/export/ConsoleMetricExporter.ts @@ -14,13 +14,8 @@ * limitations under the License. */ -import { - MetricExporter, - MetricRecord, - MetricKind, - Sum, - Distribution, -} from './types'; +import { MetricExporter, MetricRecord, MetricKind, Sum } from './types'; +import { Distribution } from './aggregators'; import { ExportResult } from '@opentelemetry/base'; /** diff --git a/packages/opentelemetry-metrics/src/export/aggregators/histogram.ts b/packages/opentelemetry-metrics/src/export/aggregators/histogram.ts new file mode 100644 index 00000000000..6cd46efbc98 --- /dev/null +++ b/packages/opentelemetry-metrics/src/export/aggregators/histogram.ts @@ -0,0 +1,96 @@ +/*! + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Aggregator, Point } from '../types'; +import { HrTime } from '@opentelemetry/api'; +import { hrTime } from '@opentelemetry/core'; + +export type Checkpoint = { + buckets: { + boundaries: number[]; + counts: number[]; + }; + sum: number; + count: number; +}; + +/** Basic aggregator which calculates a Sum from individual measurements. */ +export class HistogramAggregator implements Aggregator { + private _lastCheckpoint: Checkpoint; + private _currentCheckpoint: Checkpoint; + private _lastCheckpointTime: HrTime = [0, 0]; + private _boundaries: number[]; + + constructor(boundaries: number[]) { + if (boundaries === undefined || boundaries.length === 0) { + throw new Error(`HistogramAggregator should be created with boundaries.`); + } + this._boundaries = boundaries.sort(); + this._lastCheckpoint = this._newEmptyCheckpoint(); + this._currentCheckpoint = this._newEmptyCheckpoint(); + } + + get sum() { + return this._lastCheckpoint.sum; + } + + get count() { + return this._lastCheckpoint.count; + } + + get checkpoint() { + return this._lastCheckpoint; + } + + update(value: number): void { + this._currentCheckpoint.count += 1; + this._currentCheckpoint.sum += value; + + for (let i = 0; i < this._boundaries.length; i++) { + if (value < this._boundaries[i]) { + this._currentCheckpoint.buckets.counts[i] += 1; + return; + } + } + + // value is above all observed boundaries + this._currentCheckpoint.buckets.counts[this._boundaries.length] += 1; + } + + resetCheckpoint(): void { + this._lastCheckpointTime = hrTime(); + this._lastCheckpoint = this._currentCheckpoint; + this._currentCheckpoint = this._newEmptyCheckpoint(); + } + + toPoint(): Point { + return { + value: this._lastCheckpoint, + timestamp: this._lastCheckpointTime, + }; + } + + private _newEmptyCheckpoint(): Checkpoint { + return { + buckets: { + boundaries: this._boundaries, + counts: this._boundaries.map(() => 0).concat([0]), + }, + sum: 0, + count: 0, + }; + } +} diff --git a/packages/opentelemetry-metrics/src/export/aggregators/index.ts b/packages/opentelemetry-metrics/src/export/aggregators/index.ts index 58f95e6008e..938e1ad6e38 100644 --- a/packages/opentelemetry-metrics/src/export/aggregators/index.ts +++ b/packages/opentelemetry-metrics/src/export/aggregators/index.ts @@ -16,4 +16,5 @@ export * from './countersum'; export * from './observer'; -export * from './measureexact'; \ No newline at end of file +export * from './measureexact'; +export * from './histogram'; diff --git a/packages/opentelemetry-metrics/src/export/aggregators/measureexact.ts b/packages/opentelemetry-metrics/src/export/aggregators/measureexact.ts index 30048da6a92..d9fe8137b15 100644 --- a/packages/opentelemetry-metrics/src/export/aggregators/measureexact.ts +++ b/packages/opentelemetry-metrics/src/export/aggregators/measureexact.ts @@ -14,10 +14,17 @@ * limitations under the License. */ -import { Aggregator, Point, Distribution } from '../types'; +import { Aggregator, Point } from '../types'; import { HrTime } from '@opentelemetry/api'; import { hrTime } from '@opentelemetry/core'; +export interface Distribution { + min: number; + max: number; + count: number; + sum: number; +} + /** Basic aggregator keeping all raw values (events, sum, max and min). */ export class MeasureExactAggregator implements Aggregator { private _distribution: Distribution; @@ -46,4 +53,4 @@ export class MeasureExactAggregator implements Aggregator { timestamp: this._lastUpdateTime, }; } -} \ No newline at end of file +} diff --git a/packages/opentelemetry-metrics/src/export/aggregators/observer.ts b/packages/opentelemetry-metrics/src/export/aggregators/observer.ts index 60e1d3ce5f7..d1ba176c30f 100644 --- a/packages/opentelemetry-metrics/src/export/aggregators/observer.ts +++ b/packages/opentelemetry-metrics/src/export/aggregators/observer.ts @@ -34,4 +34,4 @@ export class ObserverAggregator implements Aggregator { timestamp: this._lastUpdateTime, }; } -} \ No newline at end of file +} diff --git a/packages/opentelemetry-metrics/src/export/types.ts b/packages/opentelemetry-metrics/src/export/types.ts index ff4f1173eb0..cd7f7209d70 100644 --- a/packages/opentelemetry-metrics/src/export/types.ts +++ b/packages/opentelemetry-metrics/src/export/types.ts @@ -16,6 +16,7 @@ import { ValueType, HrTime, Labels } from '@opentelemetry/api'; import { ExportResult } from '@opentelemetry/base'; +import { Distribution, Checkpoint } from './aggregators'; /** The kind of metric. */ export enum MetricKind { @@ -30,13 +31,6 @@ export type Sum = number; /** LastValue returns last value. */ export type LastValue = number; -export interface Distribution { - min: number; - max: number; - count: number; - sum: number; -} - export interface MetricRecord { readonly descriptor: MetricDescriptor; readonly labels: Labels; @@ -80,6 +74,6 @@ export interface Aggregator { } export interface Point { - value: Sum | LastValue | Distribution; + value: Sum | LastValue | Distribution | Checkpoint; timestamp: HrTime; } diff --git a/packages/opentelemetry-metrics/test/export/aggregators/histogram.test.ts b/packages/opentelemetry-metrics/test/export/aggregators/histogram.test.ts new file mode 100644 index 00000000000..80c9dc30b35 --- /dev/null +++ b/packages/opentelemetry-metrics/test/export/aggregators/histogram.test.ts @@ -0,0 +1,152 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import { HistogramAggregator } from '../../../src/export/aggregators'; + +describe('HistogramAggregator', () => { + describe('constructor()', () => { + it('should construct a histogramAggregator', () => { + assert.doesNotThrow(() => { + new HistogramAggregator([1, 2]); + }); + }); + + it('should sort boundaries', () => { + const aggregator = new HistogramAggregator([500, 300, 700]); + assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [ + 300, + 500, + 700, + ]); + }); + + it('should throw if no boundaries are defined', () => { + // @ts-ignore + assert.throws(() => new HistogramAggregator()); + assert.throws(() => new HistogramAggregator([])); + }); + }); + + describe('.update()', () => { + it('should not update checkpoint', () => { + const aggregator = new HistogramAggregator([100, 200]); + aggregator.update(150); + assert.equal(aggregator.checkpoint.count, 0); + assert.equal(aggregator.checkpoint.sum, 0); + }); + + it('should update the second bucket', () => { + const aggregator = new HistogramAggregator([100, 200]); + aggregator.update(150); + aggregator.resetCheckpoint(); + assert.equal(aggregator.checkpoint.count, 1); + assert.equal(aggregator.checkpoint.sum, 150); + assert.equal(aggregator.checkpoint.buckets.counts[0], 0); + assert.equal(aggregator.checkpoint.buckets.counts[1], 1); + assert.equal(aggregator.checkpoint.buckets.counts[2], 0); + }); + + it('should update the second bucket', () => { + const aggregator = new HistogramAggregator([100, 200]); + aggregator.update(50); + aggregator.resetCheckpoint(); + assert.equal(aggregator.checkpoint.count, 1); + assert.equal(aggregator.checkpoint.sum, 50); + assert.equal(aggregator.checkpoint.buckets.counts[0], 1); + assert.equal(aggregator.checkpoint.buckets.counts[1], 0); + assert.equal(aggregator.checkpoint.buckets.counts[2], 0); + }); + + it('should update the third bucket since value is above all boundaries', () => { + const aggregator = new HistogramAggregator([100, 200]); + aggregator.update(250); + aggregator.resetCheckpoint(); + assert.equal(aggregator.checkpoint.count, 1); + assert.equal(aggregator.checkpoint.sum, 250); + assert.equal(aggregator.checkpoint.buckets.counts[0], 0); + assert.equal(aggregator.checkpoint.buckets.counts[1], 0); + assert.equal(aggregator.checkpoint.buckets.counts[2], 1); + }); + }); + + describe('.count', () => { + it('should return last checkpoint count', () => { + const aggregator = new HistogramAggregator([100]); + assert.equal(aggregator.count, aggregator.checkpoint.count); + aggregator.update(10); + aggregator.resetCheckpoint(); + assert.equal(aggregator.checkpoint.count, 1); + assert.equal(aggregator.count, aggregator.checkpoint.count); + }); + }); + + describe('.sum', () => { + it('should return last checkpoint sum', () => { + const aggregator = new HistogramAggregator([100]); + assert.equal(aggregator.sum, aggregator.checkpoint.sum); + aggregator.update(10); + aggregator.resetCheckpoint(); + assert.equal(aggregator.checkpoint.sum, 10); + assert.deepEqual(aggregator.sum, aggregator.checkpoint.sum); + }); + }); + + describe('.resetCheckpoint()', () => { + it('should create a empty checkoint by default', () => { + const aggregator = new HistogramAggregator([100]); + assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [100]); + assert(aggregator.checkpoint.buckets.counts.every(count => count === 0)); + // should contains one bucket for each boundary + one for values outside of the largest boundary + assert.equal(aggregator.checkpoint.buckets.counts.length, 2); + assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [100]); + assert.equal(aggregator.checkpoint.count, 0); + assert.equal(aggregator.checkpoint.sum, 0); + }); + + it('should update checkpoint', () => { + const aggregator = new HistogramAggregator([100]); + aggregator.update(10); + aggregator.resetCheckpoint(); + assert.equal(aggregator.checkpoint.count, 1); + assert.equal(aggregator.checkpoint.sum, 10); + assert.deepEqual(aggregator.checkpoint.buckets.boundaries, [100]); + assert.equal(aggregator.checkpoint.buckets.counts.length, 2); + assert.deepEqual(aggregator.checkpoint.buckets.counts, [1, 0]); + }); + }); + + describe('.toPoint()', () => { + it('should return default checkpoint', () => { + const aggregator = new HistogramAggregator([100]); + assert.deepEqual(aggregator.toPoint().value, aggregator.checkpoint); + assert.deepEqual(aggregator.toPoint().timestamp, [0, 0]); + }); + + it('should return last checkpoint if updated', () => { + const aggregator = new HistogramAggregator([100]); + aggregator.update(100); + aggregator.resetCheckpoint(); + assert.deepEqual(aggregator.toPoint().value, aggregator.checkpoint); + console.log(aggregator.toPoint().timestamp); + assert( + aggregator + .toPoint() + .timestamp.every(nbr => typeof nbr === 'number' && nbr !== 0) + ); + }); + }); +});