Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal/common/ttlmap] Fix leak caused by time.Tick #32044

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions exporter/signalfxexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type DimensionClientConfig struct {
Timeout time.Duration `mapstructure:"timeout"`
}

func (cfg *Config) getMetricTranslator(logger *zap.Logger) (*translation.MetricTranslator, error) {
func (cfg *Config) getMetricTranslator(logger *zap.Logger, done chan struct{}) (*translation.MetricTranslator, error) {
rules := defaultTranslationRules
if cfg.TranslationRules != nil {
// Previous way to disable default translation rules.
Expand All @@ -166,7 +166,7 @@ func (cfg *Config) getMetricTranslator(logger *zap.Logger) (*translation.MetricT
if cfg.DisableDefaultTranslationRules {
rules = []translation.Rule{}
}
metricTranslator, err := translation.NewMetricTranslator(rules, cfg.DeltaTranslationTTL)
metricTranslator, err := translation.NewMetricTranslator(rules, cfg.DeltaTranslationTTL, done)
if err != nil {
return nil, fmt.Errorf("invalid \"%s\": %w", translationRulesConfigKey, err)
}
Expand Down
11 changes: 6 additions & 5 deletions exporter/signalfxexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func TestLoadConfig(t *testing.T) {
}

func TestConfigGetMetricTranslator(t *testing.T) {
done := make(chan struct{})
tests := []struct {
name string
cfg *Config
Expand All @@ -299,7 +300,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600)
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -311,7 +312,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600)
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -323,7 +324,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600)
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -336,7 +337,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600)
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -358,7 +359,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.cfg.getMetricTranslator(zap.NewNop())
got, err := tt.cfg.getMetricTranslator(zap.NewNop(), done)
if tt.wantErr {
assert.Error(t, err)
return
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newSignalFxExporter(
return nil, errors.New("nil config")
}

metricTranslator, err := config.getMetricTranslator(createSettings.TelemetrySettings.Logger)
metricTranslator, err := config.getMetricTranslator(createSettings.TelemetrySettings.Logger, make(chan struct{}))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ func TestTLSIngestConnection(t *testing.T) {
}

func TestDefaultSystemCPUTimeExcludedAndTranslated(t *testing.T) {
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600)
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600, make(chan struct{}))
require.NoError(t, err)
converter, err := translation.NewMetricsConverter(zap.NewNop(), translator, defaultExcludeMetrics, nil, "_-.", false, true)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestCreateMetricsExporter_CustomConfig(t *testing.T) {
func TestDefaultTranslationRules(t *testing.T) {
rules := defaultTranslationRules
require.NotNil(t, rules, "rules are nil")
tr, err := translation.NewMetricTranslator(rules, 1)
tr, err := translation.NewMetricTranslator(rules, 1, make(chan struct{}))
require.NoError(t, err)
data := testMetricsData(false)

Expand Down Expand Up @@ -475,7 +475,7 @@ func TestDefaultDiskTranslations(t *testing.T) {
func testGetTranslator(t *testing.T) *translation.MetricTranslator {
rules := defaultTranslationRules
require.NotNil(t, rules, "rules are nil")
tr, err := translation.NewMetricTranslator(rules, 3600)
tr, err := translation.NewMetricTranslator(rules, 3600, make(chan struct{}))
require.NoError(t, err)
return tr
}
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestDefaultExcludes_not_translated(t *testing.T) {
func BenchmarkMetricConversion(b *testing.B) {
rules := defaultTranslationRules
require.NotNil(b, rules, "rules are nil")
tr, err := translation.NewMetricTranslator(rules, 1)
tr, err := translation.NewMetricTranslator(rules, 1, make(chan struct{}))
require.NoError(b, err)

c, err := translation.NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestGetDimensionUpdateFromMetadata(t *testing.T) {
Action: translation.ActionRenameDimensionKeys,
Mapping: map[string]string{"name": "translated_name"},
},
}, 1)
}, 1, make(chan struct{}))
type args struct {
metadata metadata.MetadataUpdate
metricTranslator *translation.MetricTranslator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ func TestMetricDataToSignalFxV2WithTranslation(t *testing.T) {
"old.dim": "new.dim",
},
},
}, 1)
}, 1, make(chan struct{}))
require.NoError(t, err)

md := pmetric.NewMetrics()
Expand Down Expand Up @@ -1147,7 +1147,7 @@ func TestDimensionKeyCharsWithPeriod(t *testing.T) {
"old.dim.with.periods": "new.dim.with.periods",
},
},
}, 1)
}, 1, make(chan struct{}))
require.NoError(t, err)

md := pmetric.NewMetrics()
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestMetricsConverter_ConvertDimension(t *testing.T) {
"d.i.m": "di.m",
},
},
}, 0)
}, 0, make(chan struct{}))
return t
}(),
nonAlphanumericDimChars: "_-",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ type deltaTranslator struct {
prevPts *ttlmap.TTLMap
}

func newDeltaTranslator(ttl int64) *deltaTranslator {
func newDeltaTranslator(ttl int64, done chan struct{}) *deltaTranslator {
sweepIntervalSeconds := ttl / 2
if sweepIntervalSeconds == 0 {
sweepIntervalSeconds = 1
}
m := ttlmap.New(sweepIntervalSeconds, ttl)
m := ttlmap.New(sweepIntervalSeconds, ttl, done)
m.Start()
return &deltaTranslator{prevPts: m}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ type MetricTranslator struct {
deltaTranslator *deltaTranslator
}

func NewMetricTranslator(rules []Rule, ttl int64) (*MetricTranslator, error) {
func NewMetricTranslator(rules []Rule, ttl int64, done chan struct{}) (*MetricTranslator, error) {
err := validateTranslationRules(rules)
if err != nil {
return nil, err
Expand All @@ -240,7 +240,7 @@ func NewMetricTranslator(rules []Rule, ttl int64) (*MetricTranslator, error) {
return &MetricTranslator{
rules: rules,
dimensionsMap: createDimensionsMap(rules),
deltaTranslator: newDeltaTranslator(ttl),
deltaTranslator: newDeltaTranslator(ttl, done),
}, nil
}

Expand Down
26 changes: 13 additions & 13 deletions exporter/signalfxexporter/internal/translation/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestNewMetricTranslator(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt, err := NewMetricTranslator(tt.trs, 1)
mt, err := NewMetricTranslator(tt.trs, 1, make(chan struct{}))
if tt.wantError == "" {
require.NoError(t, err)
require.NotNil(t, mt)
Expand Down Expand Up @@ -1879,7 +1879,7 @@ func TestTranslateDataPoints(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt, err := NewMetricTranslator(tt.trs, 1)
mt, err := NewMetricTranslator(tt.trs, 1, make(chan struct{}))
require.NoError(t, err)
assert.NotEqualValues(t, tt.want, tt.dps)
got := mt.TranslateDataPoints(zap.NewNop(), tt.dps)
Expand Down Expand Up @@ -1925,15 +1925,15 @@ func TestTestTranslateDimension(t *testing.T) {
"old.dimension": "new.dimension",
},
},
}, 1)
}, 1, make(chan struct{}))
require.NoError(t, err)

assert.Equal(t, "new_dimension", mt.translateDimension("old_dimension"))
assert.Equal(t, "new.dimension", mt.translateDimension("old.dimension"))
assert.Equal(t, "another_dimension", mt.translateDimension("another_dimension"))

// Test no rename_dimension_keys translation rule
mt, err = NewMetricTranslator([]Rule{}, 1)
mt, err = NewMetricTranslator([]Rule{}, 1, make(chan struct{}))
require.NoError(t, err)
assert.Equal(t, "old_dimension", mt.translateDimension("old_dimension"))
}
Expand Down Expand Up @@ -2024,7 +2024,7 @@ func TestNewCalculateNewMetricErrors(t *testing.T) {
Operand1Metric: "metric1",
Operand2Metric: "metric2",
Operator: MetricOperatorDivision,
}}, 1)
}}, 1, make(chan struct{}))
require.NoError(t, err)
tr := mt.TranslateDataPoints(logger, dps)
require.Equal(t, 2, len(tr))
Expand All @@ -2045,7 +2045,7 @@ func TestNewMetricTranslator_InvalidOperator(t *testing.T) {
Operand1Metric: "metric1",
Operand2Metric: "metric2",
Operator: "*",
}}, 1)
}}, 1, make(chan struct{}))
require.Errorf(
t,
err,
Expand Down Expand Up @@ -2201,7 +2201,7 @@ func TestCalculateNewMetric_MatchingDims_Single(t *testing.T) {
Operand1Metric: "metric1",
Operand2Metric: "metric2",
Operator: "/",
}}, 1)
}}, 1, make(chan struct{}))
require.NoError(t, err)
m1 := &sfxpb.DataPoint{
Metric: "metric1",
Expand Down Expand Up @@ -2252,7 +2252,7 @@ func TestCalculateNewMetric_MatchingDims_Multi(t *testing.T) {
Operand1Metric: "metric1",
Operand2Metric: "metric2",
Operator: "/",
}}, 1)
}}, 1, make(chan struct{}))
require.NoError(t, err)
m1 := &sfxpb.DataPoint{
Metric: "metric1",
Expand Down Expand Up @@ -2339,7 +2339,7 @@ func TestUnsupportedOperator(t *testing.T) {
Operand1Metric: "metric1",
Operand2Metric: "metric2",
Operator: "*",
}}, 1)
}}, 1, make(chan struct{}))
require.Error(t, err)
}

Expand All @@ -2350,7 +2350,7 @@ func TestCalculateNewMetric_Double(t *testing.T) {
Operand1Metric: "metric1",
Operand2Metric: "metric2",
Operator: "/",
}}, 1)
}}, 1, make(chan struct{}))
require.NoError(t, err)
m1 := &sfxpb.DataPoint{
Metric: "metric1",
Expand Down Expand Up @@ -2910,7 +2910,7 @@ func TestDropDimensions(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mt, err := NewMetricTranslator(test.rules, 1)
mt, err := NewMetricTranslator(test.rules, 1, make(chan struct{}))
require.NoError(t, err)
outputSFxDps := mt.TranslateDataPoints(zap.NewNop(), test.inputDps)
require.Equal(t, test.expectedDps, outputSFxDps)
Expand Down Expand Up @@ -2945,7 +2945,7 @@ func TestDropDimensionsErrorCases(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mt, err := NewMetricTranslator(test.rules, 1)
mt, err := NewMetricTranslator(test.rules, 1, make(chan struct{}))
require.EqualError(t, err, test.expectedError)
require.Nil(t, mt)
})
Expand All @@ -2957,7 +2957,7 @@ func testConverter(t *testing.T, mapping map[string]string) *MetricsConverter {
Action: ActionDeltaMetric,
Mapping: mapping,
}}
tr, err := NewMetricTranslator(rules, 1)
tr, err := NewMetricTranslator(rules, 1, make(chan struct{}))
require.NoError(t, err)

c, err := NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false, true)
Expand Down
14 changes: 14 additions & 0 deletions internal/common/ttlmap/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ttlmap

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
24 changes: 20 additions & 4 deletions internal/common/ttlmap/ttl_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,36 @@ import (
type TTLMap struct {
md *ttlMapData
sweepInterval int64
done chan struct{}
}

// New creates a TTLMap. The sweepIntervalSeconds arg indicates how often
// entries are checked for expiration. The maxAgeSeconds arg indicates how long
// entries can persist before getting evicted. Call Start() on the returned
// TTLMap to begin periodic sweeps which check for expiration and evict entries
// as needed.
func New(sweepIntervalSeconds int64, maxAgeSeconds int64) *TTLMap {
// done is the channel that will be used to signal to the timer to stop its work.
func New(sweepIntervalSeconds int64, maxAgeSeconds int64, done chan struct{}) *TTLMap {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the size of this PR now. I originally wanted to simply create the channel inside of New, but an existing test exporter/signalfxexporter/TestConfigGetMetricTranslator does an Assert.Equal on objects that contain the TTLMap as a private member. The assertion fails as the pointers are not equal. I've attempted a few workarounds (cmp and cmpopts, yaml or JSON marshalling). Nothing works.

If the preference is to simply not have a done channel and use the timer and the timer.Stop() method instead, I can modify this PR to pass that in. Either way, the signature of New will need to be updated to accomodate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative that may be less impacting would be to pass the done channel to Start. I don't think passing in the Ticker would work as it gets started at creation. I don't think it would be a great design to pass a running Ticker to these methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the TTLMap struct might also create its own done channel. Wouldn't that be best?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be best, but the usage and testing is where the challenge comes in. The TestConfigGetMetricTranslator is testing to ensure the config is created properly relative to the metric translator itself in the signalfx exporter. When the TTLMap creates its own channel the equality comparison fails.

The other options are remove the test, or add a public equality checking method to the metrics translator.

return &TTLMap{
sweepInterval: sweepIntervalSeconds,
md: newTTLMapData(maxAgeSeconds),
done: done,
}
}

// Start starts periodic sweeps for expired entries in the underlying map.
func (m *TTLMap) Start() {
go func() {
d := time.Duration(m.sweepInterval) * time.Second
for now := range time.Tick(d) {
m.md.sweep(now.Unix())
ticker := time.NewTicker(time.Duration(m.sweepInterval) * time.Second)
defer ticker.Stop()

for {
select {
case now := <-ticker.C:
m.md.sweep(now.Unix())
case <-m.done:
return
}
}
}()
}
Expand All @@ -49,6 +59,12 @@ func (m *TTLMap) Get(k string) any {
return m.md.get(k)
}

func (m *TTLMap) Shutdown() {
if m.done != nil {
close(m.done)
}
}

type entry struct {
createTime int64
v any
Expand Down
5 changes: 3 additions & 2 deletions internal/common/ttlmap/ttl_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestTTLMapData(t *testing.T) {
}

func TestTTLMapSimple(t *testing.T) {
m := New(5, 10)
m := New(5, 10, make(chan struct{}))
require.EqualValues(t, m.sweepInterval, 5)
require.EqualValues(t, m.md.maxAge, 10)
m.Put("foo", "bar")
Expand All @@ -33,8 +33,9 @@ func TestTTLMapLong(t *testing.T) {
if testing.Short() {
t.Skip("skipping TestTTLMapLong in short mode")
}
m := New(1, 1)
m := New(1, 1, make(chan struct{}))
m.Start()
defer m.Shutdown()
m.Put("foo", "bar")
require.Equal(t, "bar", m.Get("foo"))
time.Sleep(time.Second * 3)
Expand Down
Loading