Skip to content

Commit

Permalink
cache instruments to avoid leaking memory
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jan 19, 2024
1 parent 33f5cf4 commit e87d2e5
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)

### Fixed

- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820)

## [1.23.0-rc.1] 2024-01-18

This is a release candidate for the v1.23.0 release.
Expand Down
26 changes: 20 additions & 6 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,16 @@ var (
)

func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string) float64Observable {
return float64Observable{
observable: newObservable[float64](m, kind, name, desc, u),
}
return m.float64ObservableInsts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() float64Observable {
return float64Observable{
observable: newObservable[float64](m, kind, name, desc, u),
}
})
}

type int64Observable struct {
Expand All @@ -286,9 +293,16 @@ var (
)

func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int64Observable {
return int64Observable{
observable: newObservable[int64](m, kind, name, desc, u),
}
return m.int64ObservableInsts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() int64Observable {
return int64Observable{
observable: newObservable[int64](m, kind, name, desc, u),
}
})
}

type observable[N int64 | float64] struct {
Expand Down
96 changes: 82 additions & 14 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines

int64Insts *cache[instID, int64InstVal]
float64Insts *cache[instID, float64InstVal]
int64ObservableInsts *cache[instID, int64Observable]
float64ObservableInsts *cache[instID, float64Observable]

int64Resolver resolver[int64]
float64Resolver resolver[float64]
}
Expand All @@ -50,11 +55,20 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, instID]

var int64Insts cache[instID, int64InstVal]
var float64Insts cache[instID, float64InstVal]
var int64ObservableInsts cache[instID, int64Observable]
var float64ObservableInsts cache[instID, float64Observable]

return &meter{
scope: s,
pipes: p,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
scope: s,
pipes: p,
int64Insts: &int64Insts,
float64Insts: &float64Insts,
int64ObservableInsts: &int64ObservableInsts,
float64ObservableInsts: &float64ObservableInsts,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
}
}

Expand Down Expand Up @@ -109,6 +123,9 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
// If we are re-using the instrument, measures have already been appended
// and we don't need to append them again.
shouldAppendMeasures := len(inst.measures) == 0
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
Expand All @@ -121,7 +138,9 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
if shouldAppendMeasures {
inst.appendMeasures(in)
}
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
Expand Down Expand Up @@ -226,6 +245,9 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
// If we are re-using the instrument, measures have already been appended
// and we don't need to append them again.
shouldAppendMeasures := len(inst.measures) == 0
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
Expand All @@ -238,7 +260,9 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
if shouldAppendMeasures {
inst.appendMeasures(in)
}
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
Expand Down Expand Up @@ -498,6 +522,12 @@ func (noopRegister) Unregister() error {
// int64InstProvider provides int64 OpenTelemetry instruments.
type int64InstProvider struct{ *meter }

// int64InstVal is the cached value in an int64 instrument cache.
type int64InstVal struct {
instrument *int64Inst
err error
}

func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
inst := Instrument{
Name: name,
Expand Down Expand Up @@ -529,19 +559,41 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
val := p.meter.int64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() int64InstVal {
aggs, err := p.aggs(kind, name, desc, u)
return int64InstVal{instrument: &int64Inst{measures: aggs}, err: err}
})
return val.instrument, val.err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
val := p.meter.int64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() int64InstVal {
aggs, err := p.histogramAggs(name, cfg)
return int64InstVal{instrument: &int64Inst{measures: aggs}, err: err}
})
return val.instrument, val.err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }

// float64InstVal is the cached value in an instrument cache.
type float64InstVal struct {
instrument *float64Inst
err error
}

func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
inst := Instrument{
Name: name,
Expand Down Expand Up @@ -573,14 +625,30 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
val := p.meter.float64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() float64InstVal {
aggs, err := p.aggs(kind, name, desc, u)
return float64InstVal{instrument: &float64Inst{measures: aggs}, err: err}
})
return val.instrument, val.err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
val := p.meter.float64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() float64InstVal {
aggs, err := p.histogramAggs(name, cfg)
return float64InstVal{instrument: &float64Inst{measures: aggs}, err: err}
})
return val.instrument, val.err
}

type int64Observer struct {
Expand Down
109 changes: 109 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2272,3 +2272,112 @@ func TestObservableDropAggregation(t *testing.T) {
})
}
}

func TestDuplicateInstrumentCreation(t *testing.T) {
for _, tt := range []struct {
desc string
createInstrument func(metric.Meter) error
}{
{
desc: "Int64ObservableCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64ObservableCounter("observable.int64.counter")
return err
},
},
{
desc: "Int64ObservableUpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter")
return err
},
},
{
desc: "Int64ObservableGauge",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64ObservableGauge("observable.int64.gauge")
return err
},
},
{
desc: "Float64ObservableCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64ObservableCounter("observable.float64.counter")
return err
},
},
{
desc: "Float64ObservableUpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter")
return err
},
},
{
desc: "Float64ObservableGauge",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64ObservableGauge("observable.float64.gauge")
return err
},
},
{
desc: "Int64Counter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64Counter("sync.int64.counter")
return err
},
},
{
desc: "Int64UpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64UpDownCounter("sync.int64.up.down.counter")
return err
},
},
{
desc: "Int64Histogram",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64Histogram("sync.int64.histogram")
return err
},
},
{
desc: "Float64Counter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64Counter("sync.float64.counter")
return err
},
},
{
desc: "Float64UpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64UpDownCounter("sync.float64.up.down.counter")
return err
},
},
{
desc: "Float64Histogram",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64Histogram("sync.float64.histogram")
return err
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
reader := NewManualReader()
defer func() {
require.NoError(t, reader.Shutdown(context.Background()))
}()

m := NewMeterProvider(WithReader(reader)).Meter("TestDuplicateInstrumentCreation")
for i := 0; i < 3; i++ {
require.NoError(t, tt.createInstrument(m))
}
internalMeter, ok := m.(*meter)
require.True(t, ok)
// check that multiple calls to create the same instrument only create 1 instrument
numInstruments := len(internalMeter.int64Insts.data) + len(internalMeter.float64Insts.data) + len(internalMeter.int64ObservableInsts.data) + len(internalMeter.float64ObservableInsts.data)
require.Equal(t, 1, numInstruments)
})
}
}

0 comments on commit e87d2e5

Please sign in to comment.