forked from open-telemetry/opentelemetry-go-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add array aggregation for raw measure metrics, improve testing (#282)
* Array aggregator part 1 * Improve median testing * More testing * More testing * Update other dist tests * Add to the benchmark * Move errors into aggregator package, use from ddsketch; update Max/Min/Quantile to return errors for array/ddsketch/maxsumcount * Lint * Test non-absolute ddsketch * Lint * Comment * Add note
- Loading branch information
Showing
12 changed files
with
767 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package array // import "go.opentelemetry.io/otel/sdk/metric/aggregator/array" | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"sort" | ||
"sync" | ||
"unsafe" | ||
|
||
"go.opentelemetry.io/otel/api/core" | ||
"go.opentelemetry.io/otel/sdk/export" | ||
"go.opentelemetry.io/otel/sdk/metric/aggregator" | ||
) | ||
|
||
type ( | ||
Aggregator struct { | ||
lock sync.Mutex | ||
current Points | ||
checkpoint Points | ||
ckptSum core.Number | ||
} | ||
|
||
Points []core.Number | ||
) | ||
|
||
var _ export.MetricAggregator = &Aggregator{} | ||
|
||
func New() *Aggregator { | ||
return &Aggregator{} | ||
} | ||
|
||
// Sum returns the sum of the checkpoint. | ||
func (c *Aggregator) Sum() core.Number { | ||
return c.ckptSum | ||
} | ||
|
||
// Count returns the count of the checkpoint. | ||
func (c *Aggregator) Count() int64 { | ||
return int64(len(c.checkpoint)) | ||
} | ||
|
||
// Max returns the max of the checkpoint. | ||
func (c *Aggregator) Max() (core.Number, error) { | ||
return c.checkpoint.Quantile(1) | ||
} | ||
|
||
// Min returns the min of the checkpoint. | ||
func (c *Aggregator) Min() (core.Number, error) { | ||
return c.checkpoint.Quantile(0) | ||
} | ||
|
||
// Quantile returns the estimated quantile of the checkpoint. | ||
func (c *Aggregator) Quantile(q float64) (core.Number, error) { | ||
return c.checkpoint.Quantile(q) | ||
} | ||
|
||
func (c *Aggregator) Collect(ctx context.Context, rec export.MetricRecord, exp export.MetricBatcher) { | ||
c.lock.Lock() | ||
c.checkpoint, c.current = c.current, nil | ||
c.lock.Unlock() | ||
|
||
desc := rec.Descriptor() | ||
kind := desc.NumberKind() | ||
|
||
c.sort(kind) | ||
|
||
c.ckptSum = core.Number(0) | ||
|
||
for _, v := range c.checkpoint { | ||
c.ckptSum.AddNumber(kind, v) | ||
} | ||
|
||
exp.Export(ctx, rec, c) | ||
} | ||
|
||
func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.MetricRecord) { | ||
desc := rec.Descriptor() | ||
kind := desc.NumberKind() | ||
|
||
if kind == core.Float64NumberKind && math.IsNaN(number.AsFloat64()) { | ||
// TODO warn | ||
// NOTE: add this to the specification. | ||
return | ||
} | ||
|
||
if !desc.Alternate() && number.IsNegative(kind) { | ||
// TODO warn | ||
return | ||
} | ||
|
||
c.lock.Lock() | ||
c.current = append(c.current, number) | ||
c.lock.Unlock() | ||
} | ||
|
||
func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) { | ||
o, _ := oa.(*Aggregator) | ||
if o == nil { | ||
// TODO warn | ||
return | ||
} | ||
|
||
c.ckptSum.AddNumber(desc.NumberKind(), o.ckptSum) | ||
c.checkpoint = combine(c.checkpoint, o.checkpoint, desc.NumberKind()) | ||
} | ||
|
||
func (c *Aggregator) sort(kind core.NumberKind) { | ||
switch kind { | ||
case core.Float64NumberKind: | ||
sort.Float64s(*(*[]float64)(unsafe.Pointer(&c.checkpoint))) | ||
|
||
case core.Int64NumberKind: | ||
sort.Sort(&c.checkpoint) | ||
|
||
default: | ||
// NOTE: This can't happen because the SDK doesn't | ||
// support uint64-kind metric instruments. | ||
panic("Impossible case") | ||
} | ||
} | ||
|
||
func combine(a, b Points, kind core.NumberKind) Points { | ||
result := make(Points, 0, len(a)+len(b)) | ||
|
||
for len(a) != 0 && len(b) != 0 { | ||
if a[0].CompareNumber(kind, b[0]) < 0 { | ||
result = append(result, a[0]) | ||
a = a[1:] | ||
} else { | ||
result = append(result, b[0]) | ||
b = b[1:] | ||
} | ||
} | ||
result = append(result, a...) | ||
result = append(result, b...) | ||
return result | ||
} | ||
|
||
func (p *Points) Len() int { | ||
return len(*p) | ||
} | ||
|
||
func (p *Points) Less(i, j int) bool { | ||
// Note this is specialized for int64, because float64 is | ||
// handled by `sort.Float64s` and uint64 numbers never appear | ||
// in this data. | ||
return int64((*p)[i]) < int64((*p)[j]) | ||
} | ||
|
||
func (p *Points) Swap(i, j int) { | ||
(*p)[i], (*p)[j] = (*p)[j], (*p)[i] | ||
} | ||
|
||
// Quantile returns the least X such that Pr(x<X)>=q, where X is an | ||
// element of the data set. | ||
func (p *Points) Quantile(q float64) (core.Number, error) { | ||
if len(*p) == 0 { | ||
return core.Number(0), aggregator.ErrEmptyDataSet | ||
} | ||
|
||
if q < 0 || q > 1 { | ||
return core.Number(0), aggregator.ErrInvalidQuantile | ||
} | ||
|
||
if q == 0 || len(*p) == 1 { | ||
return (*p)[0], nil | ||
} else if q == 1 { | ||
return (*p)[len(*p)-1], nil | ||
} | ||
|
||
// Note: There's no interpolation being done here. There are | ||
// many definitions for "quantile", some interpolate, some do | ||
// not. What is expected? | ||
position := float64(len(*p)-1) * q | ||
ceil := int(math.Ceil(position)) | ||
return (*p)[ceil], nil | ||
} |
Oops, something went wrong.