Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect metrics getting generated from multiple readers #5900

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Global MeterProvider registration unwraps global instrument Observers, the undocumented Unwrap() methods are now private. (#5881)
- Fix `go.opentelemetry.io/otel/exporters/prometheus` trying to add exemplars to Gauge metrics, which is unsupported. (#5912)
- Fix incorrect metrics generated from callbacks when multiple readers are used in `go.opentelemetry.io/otel/sdk/metric`. (#5900)

### Changed

Expand Down
59 changes: 45 additions & 14 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addInt64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addFloat64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -441,8 +451,8 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
return noopRegister{}, nil
}

reg := newObserver()
var err error
validInstruments := make([]metric.Observable, 0, len(insts))
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
Expand All @@ -452,49 +462,64 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
continue
}
reg.registerInt64(o.observableID)

validInstruments = append(validInstruments, inst)
case float64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
reg.registerFloat64(o.observableID)

validInstruments = append(validInstruments, inst)
default:
// Instrument external to the SDK.
return nil, fmt.Errorf("invalid observable: from different implementation")
}
}

if reg.len() == 0 {
if len(validInstruments) == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}

// Some or all instruments were valid.
cback := func(ctx context.Context) error { return f(ctx, reg) }
return m.pipes.registerMultiCallback(cback), err
unregs := make([]func(), len(m.pipes))
for ix, pipe := range m.pipes {
reg := newObserver(pipe)
for _, inst := range validInstruments {
switch o := inst.(type) {
case int64Observable:
reg.registerInt64(o.observableID)
case float64Observable:
reg.registerFloat64(o.observableID)
}
}

// Some or all instruments were valid.
cBack := func(ctx context.Context) error { return f(ctx, reg) }
unregs[ix] = pipe.addMultiCallback(cBack)
}

return unregisterFuncs{f: unregs}, err
}

type observer struct {
embedded.Observer

pipe *pipeline
float64 map[observableID[float64]]struct{}
dashpole marked this conversation as resolved.
Show resolved Hide resolved
int64 map[observableID[int64]]struct{}
}

func newObserver() observer {
func newObserver(p *pipeline) observer {
return observer{
pipe: p,
float64: make(map[observableID[float64]]struct{}),
int64: make(map[observableID[int64]]struct{}),
}
}

func (r observer) len() int {
return len(r.float64) + len(r.int64)
}

func (r observer) registerFloat64(id observableID[float64]) {
r.float64[id] = struct{}{}
}
Expand Down Expand Up @@ -530,7 +555,10 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
measures := r.pipe.float64Measures[oImpl.observableID]
pellared marked this conversation as resolved.
Show resolved Hide resolved
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
Expand All @@ -555,7 +583,10 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
measures := r.pipe.int64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

type noopRegister struct{ embedded.Registration }
Expand Down
43 changes: 26 additions & 17 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
Expand Down Expand Up @@ -43,10 +42,12 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFi
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
exemplarFilter: exemplarFilter,
resource: res,
reader: reader,
views: views,
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
Expand All @@ -64,10 +65,26 @@ type pipeline struct {
views []View

sync.Mutex
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
}

// addInt64Measure adds a new int64 measure to the pipeline for each observer.
func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
p.Lock()
defer p.Unlock()
p.int64Measures[id] = m
}

// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
p.Lock()
defer p.Unlock()
p.float64Measures[id] = m
}

// addSync adds the instrumentSync to pipeline p with scope. This method is not
Expand Down Expand Up @@ -574,14 +591,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View, exempl
return pipes
}

func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addMultiCallback(c)
}
return unregisterFuncs{f: unregs}
}

type unregisterFuncs struct {
embedded.Registration
f []func()
Expand Down
106 changes: 106 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
Expand All @@ -24,6 +26,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -101,6 +104,21 @@ func TestPipelineConcurrentSafe(t *testing.T) {
defer wg.Done()
pipe.addMultiCallback(func(context.Context) error { return nil })
}()

wg.Add(1)
go func() {
defer wg.Done()
b := aggregate.Builder[int64]{
Temporality: metricdata.CumulativeTemporality,
ReservoirFunc: nil,
AggregationLimit: 0,
}
var oID observableID[int64]
m, _ := b.PrecomputedSum(false)
measures := []aggregate.Measure[int64]{}
measures = append(measures, m)
pipe.addInt64Measure(oID, measures)
}()
}
wg.Wait()
}
Expand Down Expand Up @@ -518,3 +536,91 @@ func TestExemplars(t *testing.T) {
check(t, r, 2, 2, 2)
})
}

func TestAddingAndObservingMeasureConcurrentSafe(t *testing.T) {
exp := &fnExporter{}
r1 := NewPeriodicReader(exp, WithInterval(10*time.Millisecond))
r2 := NewPeriodicReader(exp, WithInterval(10*time.Millisecond))

pellared marked this conversation as resolved.
Show resolved Hide resolved
mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")

oc1, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_, err := m.Int64ObservableCounter("int64-observable-counter-2")
require.NoError(t, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
_, err := m.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc1, 2)
return nil
}, oc1)
require.NoError(t, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = mp.pipes[0].produce(context.Background(), &metricdata.ResourceMetrics{})
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = mp.pipes[1].produce(context.Background(), &metricdata.ResourceMetrics{})
}()

wg.Wait()
}

func TestPipelineWithMultipleReaders(t *testing.T) {
exp := &fnExporter{}
r1 := NewPeriodicReader(exp, WithInterval(10*time.Millisecond))
r2 := NewPeriodicReader(exp, WithInterval(10*time.Millisecond))

mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")

var val atomic.Int64
val.Add(1)
measure := func(_ context.Context, m metric.Meter) {
oc, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)
_, err = m.RegisterCallback(
// SDK periodically calls this function to collect data.
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc, val.Load())
return nil
}, oc)
require.NoError(t, err)
}
ctx := context.Background()
measure(ctx, m)
rm := new(metricdata.ResourceMetrics)
val.Add(1)
err := r1.Collect(ctx, rm)
require.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(t, rm.ScopeMetrics, 1)
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1)
pellared marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(c, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
}, 3*time.Second, 20*time.Millisecond, "observed counter value mismatch for first reader")

val.Add(1)
err = r2.Collect(ctx, rm)
require.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(t, rm.ScopeMetrics, 1)
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1)
assert.Equal(c, int64(3), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
}, 3*time.Second, 20*time.Millisecond, "observed counter value mismatch for second reader")
}
pellared marked this conversation as resolved.
Show resolved Hide resolved
Loading