Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric Accumulator fix for SumObservers #1381

Merged
merged 13 commits into from
Dec 11, 2020
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Move the OpenCensus example into `example` directory. (#1359)
- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357)

### Fixed

- Metric SDK `SumObserver` and `UpDownSumObserver` instruments correctness fixes. (#1381)

## [0.14.0] - 2020-11-19

### Added
Expand Down
3 changes: 3 additions & 0 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type Aggregator interface {
//
// This call has no Context argument because it is expected to
// perform only computation.
//
// When called with a nil `destination`, this Aggregator is reset
// and the current value is discarded.
SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error

// Merge combines the checkpointed state from the argument
Expand Down
45 changes: 45 additions & 0 deletions sdk/metric/aggregator/aggregatortest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ package aggregatortest // import "go.opentelemetry.io/otel/sdk/metric/aggregator

import (
"context"
"errors"
"math/rand"
"os"
"sort"
"testing"
"unsafe"

"github.com/stretchr/testify/require"

ottest "go.opentelemetry.io/otel/internal/testing"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)

Expand Down Expand Up @@ -172,3 +176,44 @@ func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *
t.Error("Unexpected Merge failure", err)
}
}

func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) export.Aggregator) {
// Ensures that SynchronizedMove(nil, descriptor) discards and
// resets the aggregator.
RunProfiles(t, func(t *testing.T, profile Profile) {
descriptor := NewAggregatorTest(
mkind,
profile.NumberKind,
)
agg := nf(descriptor)

all := NewNumbers(profile.NumberKind)

for i := 0; i < 10; i++ {
x1 := profile.Random(+1)
all.Append(x1)
CheckedUpdate(t, agg, x1, descriptor)
}

require.NoError(t, agg.SynchronizedMove(nil, descriptor))

if count, ok := agg.(aggregation.Count); ok {
c, err := count.Count()
require.Equal(t, int64(0), c)
require.NoError(t, err)
}

if sum, ok := agg.(aggregation.Sum); ok {
s, err := sum.Sum()
require.Equal(t, number.Number(0), s)
require.NoError(t, err)
}

if lv, ok := agg.(aggregation.LastValue); ok {
v, _, err := lv.LastValue()
require.Equal(t, number.Number(0), v)
require.Error(t, err)
require.True(t, errors.Is(err, aggregation.ErrNoData))
}
})
}
19 changes: 13 additions & 6 deletions sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,27 @@ func (c *Aggregator) Points() ([]number.Number, error) {
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

c.lock.Lock()
o.points, c.points = c.points, nil
o.sum, c.sum = c.sum, 0
if o != nil {
o.points = c.points
o.sum = c.sum
}
c.points = nil
c.sum = 0
c.lock.Unlock()

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: This sort should be done lazily, only when quantiles
// are requested. The SDK specification says you can use this
// aggregator to simply list values in the order they were
// received as an alternative to requesting quantile information.
o.sort(desc.NumberKind())
if o != nil {
o.sort(desc.NumberKind())
}
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)
Expand Down Expand Up @@ -329,3 +330,13 @@ func TestArrayFloat64(t *testing.T) {
require.Equal(t, all.Points()[i], po[i], "Wrong point at position %d", i)
}
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1)[0]
},
)
}
12 changes: 8 additions & 4 deletions sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,19 @@ func (c *Aggregator) toNumber(f float64) number.Number {
// a new sketch, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
replace := sdk.NewDDSketch(c.cfg)

c.lock.Lock()
o.sketch, c.sketch = c.sketch, replace
if o != nil {
o.sketch = c.sketch
}
c.sketch = replace
c.lock.Unlock()

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/ddsketch/ddsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)
Expand Down Expand Up @@ -208,3 +209,13 @@ func TestDDSketchMerge(t *testing.T) {
})
}
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1, desc, NewDefaultConfig())[0]
},
)
}
13 changes: 9 additions & 4 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,18 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
// other.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

c.lock.Lock()
o.state, c.state = c.state, emptyState(c.boundaries)
if o != nil {
o.state = c.state
}
c.state = emptyState(c.boundaries)
c.lock.Unlock()

if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
)
Expand Down Expand Up @@ -249,3 +250,13 @@ func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint6

return counts
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &histogram.New(1, desc, boundaries)[0]
},
)
}
4 changes: 4 additions & 0 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) {

// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
if oa == nil {
atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
}
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
Expand Down
10 changes: 10 additions & 0 deletions sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,13 @@ func TestLastValueNotSet(t *testing.T) {

checkZero(t, g)
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueObserverInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1)[0]
},
)
}
11 changes: 7 additions & 4 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,19 @@ func (c *Aggregator) Max() (number.Number, error) {
// the empty set.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

// TODO: It is incorrect to use an Aggregator of different
// kind. Should we test that o.kind == c.kind? (The same question
// occurs for several of the other aggregators in ../*.)
c.lock.Lock()
o.state, c.state = c.state, emptyState(c.kind)
if o != nil {
o.state = c.state
}
c.state = emptyState(c.kind)
c.lock.Unlock()
if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}

return nil
}
Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/minmaxsumcount/mmsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)
Expand Down Expand Up @@ -235,3 +236,13 @@ func TestMaxSumCountNotSet(t *testing.T) {
require.Equal(t, number.Number(0), max)
})
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1, desc)[0]
},
)
}
4 changes: 4 additions & 0 deletions sdk/metric/aggregator/sum/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (c *Aggregator) Sum() (number.Number, error) {
// SynchronizedMove atomically saves the current value into oa and resets the
// current sum to zero.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
if oa == nil {
c.value.SetRawAtomic(0)
return nil
}
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
11 changes: 11 additions & 0 deletions sdk/metric/aggregator/sum/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
ottest "go.opentelemetry.io/otel/internal/testing"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
)

Expand Down Expand Up @@ -141,3 +142,13 @@ func TestCounterMerge(t *testing.T) {
require.Nil(t, err)
})
}

func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
metric.SumObserverInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &New(1)[0]
},
)
}
Loading