Skip to content

Commit

Permalink
Fix flaky spanmetricsconnector test that relies on timing by reading …
Browse files Browse the repository at this point in the history
…the current time from a mockable Clock
  • Loading branch information
swar8080 committed Jun 9, 2024
1 parent 2c993b0 commit d1030ef
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 41 deletions.
14 changes: 8 additions & 6 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type connectorImp struct {
// e.g. { "foo/barOK": { "serviceName": "foo", "span.name": "/bar", "status_code": "OK" }}
metricKeyToDimensions *cache.Cache[metrics.Key, pcommon.Map]

clock clock.Clock
ticker *clock.Ticker
done chan struct{}
started bool
Expand Down Expand Up @@ -109,7 +110,7 @@ func newDimensions(cfgDims []Dimension) []dimension {
return dims
}

func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Ticker) (*connectorImp, error) {
func newConnector(logger *zap.Logger, config component.Config, clock clock.Clock) (*connectorImp, error) {
logger.Info("Building spanmetrics connector")
cfg := config.(*Config)

Expand Down Expand Up @@ -148,7 +149,8 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
lastDeltaTimestamps: lastDeltaTimestamps,
ticker: ticker,
clock: clock,
ticker: clock.NewTicker(cfg.MetricsFlushInterval),
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
Expand Down Expand Up @@ -266,7 +268,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
func (p *connectorImp) buildMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
timestamp := pcommon.NewTimestampFromTime(time.Now())
timestamp := pcommon.NewTimestampFromTime(p.clock.Now())

p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
rm := m.ResourceMetrics().AppendEmpty()
Expand Down Expand Up @@ -336,7 +338,7 @@ func (p *connectorImp) resetState() {
return
}

now := time.Now()
now := p.clock.Now()
p.resourceMetrics.ForEach(func(k resourceKey, m *resourceMetrics) {
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
if p.config.Exemplars.Enabled {
Expand Down Expand Up @@ -365,7 +367,7 @@ func (p *connectorImp) resetState() {
// and span metadata such as name, kind, status_code and any additional
// dimensions the user has configured.
func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
startTimestamp := pcommon.NewTimestampFromTime(time.Now())
startTimestamp := pcommon.NewTimestampFromTime(p.clock.Now())
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rspans := traces.ResourceSpans().At(i)
resourceAttr := rspans.Resource().Attributes()
Expand Down Expand Up @@ -487,7 +489,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta

// If expiration is enabled, track the last seen time.
if p.config.MetricsExpiration > 0 {
v.lastSeen = time.Now()
v.lastSeen = p.clock.Now()
}

return v
Expand Down
80 changes: 50 additions & 30 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func disabledHistogramsConfig() HistogramConfig {
}
}

func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTimestampCacheSize int, excludedDimensions ...string) (*connectorImp, *clock.Mock, error) {
func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTimestampCacheSize int, clock clock.Clock, excludedDimensions ...string) (*connectorImp, error) {
cfg := &Config{
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Expand All @@ -477,20 +477,18 @@ func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramC
// Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc.
{regionResourceAttrName, nil},
},
Events: eventsConfig(),
MetricsExpiration: expiration,
TimestampCacheSize: &deltaTimestampCacheSize,
Events: eventsConfig(),
MetricsExpiration: expiration,
TimestampCacheSize: &deltaTimestampCacheSize,
MetricsFlushInterval: time.Nanosecond,
}

mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)

c, err := newConnector(zap.NewNop(), cfg, ticker)
c, err := newConnector(zap.NewNop(), cfg, clock)
if err != nil {
return nil, nil, err
return nil, err
}
c.metricsConsumer = consumertest.NewNop()
return c, mockClock, nil
return c, nil
}

func stringp(str string) *string {
Expand All @@ -500,7 +498,7 @@ func stringp(str string) *string {
func TestBuildKeySameServiceNameCharSequence(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -520,7 +518,7 @@ func TestBuildKeyExcludeDimensionsAll(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ExcludeDimensions = []string{"span.kind", "service.name", "span.name", "status.code"}
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -533,7 +531,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ExcludeDimensions = []string{"span.kind", "service.name.wrong.name", "span.name", "status.code"}
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -545,7 +543,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) {
func TestBuildKeyWithDimensions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

defaultFoo := pcommon.NewValueStr("bar")
Expand Down Expand Up @@ -641,7 +639,7 @@ func TestConcurrentShutdown(t *testing.T) {
core, observedLogs := observer.New(zapcore.InfoLevel)

// Test
p, _, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
p, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)
// Override the default no-op consumer and logger for testing.
p.metricsConsumer = new(consumertest.MetricsSink)
Expand Down Expand Up @@ -686,7 +684,7 @@ func TestConnectorCapabilities(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)

// Test
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
// Override the default no-op consumer for testing.
c.metricsConsumer = new(consumertest.MetricsSink)
assert.NoError(t, err)
Expand Down Expand Up @@ -719,7 +717,8 @@ func TestConsumeMetricsErrors(t *testing.T) {
logger := zap.New(core)

var wg sync.WaitGroup
p, mockClock, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer and logger for testing.
p.metricsConsumer = &errConsumer{
Expand Down Expand Up @@ -884,7 +883,8 @@ func TestConsumeTraces(t *testing.T) {
// Prepare

mcon := &consumertest.MetricsSink{}
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer with metrics sink for testing.
p.metricsConsumer = mcon
Expand All @@ -911,7 +911,7 @@ func TestConsumeTraces(t *testing.T) {
}

func TestMetricKeyCache(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)
traces := buildSampleTrace()

Expand Down Expand Up @@ -940,7 +940,7 @@ func TestMetricKeyCache(t *testing.T) {
}

func TestResourceMetricsCache(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)

// Test
Expand Down Expand Up @@ -977,7 +977,7 @@ func TestResourceMetricsCache(t *testing.T) {
}

func TestResourceMetricsExpiration(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)

// Test
Expand All @@ -1002,7 +1002,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) {
"service.name",
}

p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)

// Test
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) {

func BenchmarkConnectorConsumeTraces(b *testing.B) {
// Prepare
conn, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
conn, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(b, err)

traces := buildSampleTrace()
Expand All @@ -1054,7 +1054,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {

func TestExcludeDimensionsConsumeTraces(t *testing.T) {
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, excludeDimensions...)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()), excludeDimensions...)
require.NoError(t, err)
traces := buildSampleTrace()

Expand Down Expand Up @@ -1184,7 +1184,8 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
wg.Add(len(wantDataPointCounts))

// Note: default dimension key cache size is 2.
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer with metrics sink for testing.
p.metricsConsumer = mcon
Expand Down Expand Up @@ -1269,7 +1270,8 @@ func TestConnectorConsumeTracesExpiredMetrics(t *testing.T) {
mcon := &consumertest.MetricsSink{}

// Creating a connector with a very short metricsTTL to ensure that the metrics are expired.
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer with metrics sink for testing.
p.metricsConsumer = mcon
Expand Down Expand Up @@ -1489,7 +1491,7 @@ func TestSpanMetrics_Events(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Events = tt.eventsConfig
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)
err = c.ConsumeTraces(context.Background(), buildSampleTrace())
require.NoError(t, err)
Expand Down Expand Up @@ -1544,7 +1546,7 @@ func TestExemplarsAreDiscardedAfterFlushing(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000, clock.NewMock(time.Now()))
p.metricsConsumer = &consumertest.MetricsSink{}
require.NoError(t, err)

Expand Down Expand Up @@ -1671,7 +1673,8 @@ func TestTimestampsForUninterruptedStream(t *testing.T) {

for _, tt := range tests {
t.Run(tt.temporality, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000)
mockClock := newAlwaysIncreasingClock()
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
p.metricsConsumer = &consumertest.MetricsSink{}

Expand Down Expand Up @@ -1769,7 +1772,8 @@ func verifyAndCollectCommonTimestamps(t *testing.T, m pmetric.Metrics) (start pc

func TestDeltaTimestampCacheExpiry(t *testing.T) {
timestampCacheSize := 1
p, _, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, timestampCacheSize)
mockClock := newAlwaysIncreasingClock()
p, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, timestampCacheSize, mockClock)
require.NoError(t, err)
p.metricsConsumer = &consumertest.MetricsSink{}

Expand Down Expand Up @@ -1836,3 +1840,19 @@ func TestDeltaTimestampCacheExpiry(t *testing.T) {
serviceAStartTimestamp2 := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[2].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).StartTimestamp()
assert.Greater(t, serviceAStartTimestamp2, serviceATimestamp1) // These would be the same if nothing was evicted from the cache
}

// Clock where Now() always returns a greater value than the previous return value
type alwaysIncreasingClock struct {
clock.Clock
}

func newAlwaysIncreasingClock() alwaysIncreasingClock {
return alwaysIncreasingClock{
Clock: clock.NewMock(time.Now()),
}
}

func (c alwaysIncreasingClock) Now() time.Time {
c.Clock.(*clock.Mock).Add(time.Second)
return c.Clock.Now()
}
6 changes: 1 addition & 5 deletions connector/spanmetricsconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ func createDefaultConfig() component.Config {
}

func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c, err := newConnector(params.Logger, cfg, metricsTicker(ctx, cfg))
c, err := newConnector(params.Logger, cfg, clock.FromContext(ctx))
if err != nil {
return nil, err
}
c.metricsConsumer = nextConsumer
return c, nil
}

func metricsTicker(ctx context.Context, cfg component.Config) *clock.Ticker {
return clock.FromContext(ctx).NewTicker(cfg.(*Config).MetricsFlushInterval)
}

0 comments on commit d1030ef

Please sign in to comment.