Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache instruments so repeatedly creating identical instruments doesn't leak memory #4820

Merged
merged 6 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)

### Fixed

- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820)

## [1.23.0-rc.1] 2024-01-18

This is a release candidate for the v1.23.0 release.
Expand Down
40 changes: 40 additions & 0 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,43 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// HasKey returns true if Lookup has previously been called with that key
//
// HasKey is safe to call concurrently.
func (c *cache[K, V]) HasKey(key K) bool {
c.Lock()
defer c.Unlock()
_, ok := c.data[key]
return ok
}

// cacheWithErr is a locking storage used to quickly return already computed values and an error.
//
// The zero value of a cacheWithErr is empty and ready to use.
//
// A cacheWithErr must not be copied after first use.
//
// All methods of a cacheWithErr are safe to call concurrently.
type cacheWithErr[K comparable, V any] struct {
cache[K, valAndErr[V]]
}

type valAndErr[V any] struct {
val V
err error
}

// Lookup returns the value stored in the cacheWithErr with the associated key
// if it exists. Otherwise, f is called and its returned value is set in the
// cacheWithErr for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cacheWithErr lock, so f
// should not block excessively.
func (c *cacheWithErr[K, V]) Lookup(key K, f func() (V, error)) (V, error) {
combined := c.cache.Lookup(key, func() valAndErr[V] {
val, err := f()
return valAndErr[V]{val: val, err: err}
})
return combined.val, combined.err
}
184 changes: 134 additions & 50 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
scope instrumentation.Scope
pipes pipelines

int64Insts *cacheWithErr[instID, *int64Inst]
float64Insts *cacheWithErr[instID, *float64Inst]
int64ObservableInsts *cacheWithErr[instID, int64Observable]
float64ObservableInsts *cacheWithErr[instID, float64Observable]

int64Resolver resolver[int64]
float64Resolver resolver[float64]
}
Expand All @@ -50,11 +55,20 @@
// meter is asked to create are logged to the user.
var viewCache cache[string, instID]

var int64Insts cacheWithErr[instID, *int64Inst]
var float64Insts cacheWithErr[instID, *float64Inst]
var int64ObservableInsts cacheWithErr[instID, int64Observable]
var float64ObservableInsts cacheWithErr[instID, float64Observable]

return &meter{
scope: s,
pipes: p,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
scope: s,
pipes: p,
int64Insts: &int64Insts,
float64Insts: &float64Insts,
int64ObservableInsts: &int64ObservableInsts,
float64ObservableInsts: &float64ObservableInsts,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
}
}

Expand Down Expand Up @@ -108,32 +122,48 @@
// int64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
key := instID{
Name: id.Name,
Description: id.Description,
Unit: id.Unit,
Kind: id.Kind,
}
if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
warnRepeatedObservableCallbacks(id)
}

Check warning on line 133 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L132-L133

Added lines #L132 - L133 were not covered by tests
return m.int64ObservableInsts.Lookup(key, func() (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}

Check warning on line 142 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L141-L142

Added lines #L141 - L142 were not covered by tests
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
}
return inst, validateInstrumentName(id.Name)
return inst, validateInstrumentName(id.Name)
})
}

// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
//
// If Int64ObservableCounter is invoked repeatedly with the same Name,
// Description, and Unit, only the first set of callbacks provided are used.
// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
id := Instrument{
Expand Down Expand Up @@ -225,32 +255,48 @@
// float64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
key := instID{
Name: id.Name,
Description: id.Description,
Unit: id.Unit,
Kind: id.Kind,
}
if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
warnRepeatedObservableCallbacks(id)
}

Check warning on line 266 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L265-L266

Added lines #L265 - L266 were not covered by tests
return m.float64ObservableInsts.Lookup(key, func() (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}

Check warning on line 275 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L274-L275

Added lines #L274 - L275 were not covered by tests
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
}
return inst, validateInstrumentName(id.Name)
return inst, validateInstrumentName(id.Name)
})
}

// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
//
// If Float64ObservableCounter is invoked repeatedly with the same Name,
// Description, and Unit, only the first set of callbacks provided are used.
// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
id := Instrument{
Expand Down Expand Up @@ -324,6 +370,16 @@
return isAlpha(c) || ('0' <= c && c <= '9')
}

func warnRepeatedObservableCallbacks(id Instrument) {
inst := fmt.Sprintf(
"Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
)
global.Warn("Repeated observable instrument creation with callbacks. Ignoring new callbacks. Use meter.RegisterCallback and Registration.Unregister to manage callbacks.",
"instrument", inst,
)

Check warning on line 380 in sdk/metric/meter.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/meter.go#L373-L380

Added lines #L373 - L380 were not covered by tests
}

// RegisterCallback registers f to be called each collection cycle so it will
// make observations for insts during those cycles.
//
Expand Down Expand Up @@ -529,14 +585,28 @@

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
return p.meter.int64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
})
}

// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
return p.meter.int64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
})
}

// float64InstProvider provides float64 OpenTelemetry instruments.
Expand Down Expand Up @@ -573,14 +643,28 @@

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
return p.meter.float64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
})
}

// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
return p.meter.float64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
})
}

type int64Observer struct {
Expand Down
Loading