Skip to content

Commit

Permalink
[servicegraphprocessor, servicegraphconnector] Measure latency in sec…
Browse files Browse the repository at this point in the history
…onds instead of milliseconds (#27665)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Measures latency in seconds instead of milliseconds, as the metric name
indicates. Previously, milliseconds was used. This unit is still
available via the feature gate
`processor.servicegraph.legacyLatencyUnitMs`.

This is a breaking change.

**Link to tracking Issue:** <Issue number if applicable>  #27488

**Testing:** <Describe what testing was performed and which tests were
added.> Tests are updated

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Curtis Robert <92119472+crobert-1@users.noreply.github.com>
Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
3 people authored Oct 24, 2023
1 parent 67fcaad commit c9f1865
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 43 deletions.
31 changes: 31 additions & 0 deletions .chloggen/servicegraph-fix-time-unit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor, servicegraphconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Measure latency in seconds instead of milliseconds

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27488]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Measures latency in seconds instead of milliseconds, as the metric name indicates.
Previously, milliseconds was used.
This unit is still available via the feature gate `processor.servicegraph.legacyLatencyUnitMs`.
This is a breaking change.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 8 additions & 1 deletion processor/servicegraphprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const (
connectorStability = component.StabilityLevelDevelopment
virtualNodeFeatureGateID = "processor.servicegraph.virtualNode"
legacyLatencyMetricNamesFeatureGateID = "processor.servicegraph.legacyLatencyMetricNames"
legacyLatencyUnitMs = "processor.servicegraph.legacyLatencyUnitMs"
)

var virtualNodeFeatureGate, legacyMetricNamesFeatureGate *featuregate.Gate
var virtualNodeFeatureGate, legacyMetricNamesFeatureGate, legacyLatencyUnitMsFeatureGate *featuregate.Gate

func init() {
virtualNodeFeatureGate = featuregate.GlobalRegistry().MustRegister(
Expand All @@ -40,6 +41,12 @@ func init() {
featuregate.WithRegisterDescription("When enabled, processor uses legacy latency metric names."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18743,https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16578"),
)
legacyLatencyUnitMsFeatureGate = featuregate.GlobalRegistry().MustRegister(
legacyLatencyUnitMs,
featuregate.StageAlpha, // Alpha because we want it disabled by default.
featuregate.WithRegisterDescription("When enabled, processor reports latency in milliseconds, instead of seconds."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27488"),
)
}

// NewFactory creates a factory for the servicegraph processor.
Expand Down
12 changes: 6 additions & 6 deletions processor/servicegraphprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ func TestNewProcessor(t *testing.T) {
}{
{
name: "simplest config (use defaults)",
expectedLatencyHistogramBuckets: defaultLatencyHistogramBucketsMs,
expectedLatencyHistogramBuckets: defaultLatencyHistogramBuckets,
},
{
name: "latency histogram configured with catch-all bucket to check no additional catch-all bucket inserted",
latencyHistogramBuckets: []time.Duration{2 * time.Millisecond},
expectedLatencyHistogramBuckets: []float64{2},
expectedLatencyHistogramBuckets: []float64{0.002},
},
{
name: "full config with no catch-all bucket and check the catch-all bucket is inserted",
latencyHistogramBuckets: []time.Duration{2 * time.Millisecond},
expectedLatencyHistogramBuckets: []float64{2},
expectedLatencyHistogramBuckets: []float64{0.002},
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -64,17 +64,17 @@ func TestNewConnector(t *testing.T) {
}{
{
name: "simplest config (use defaults)",
expectedLatencyHistogramBuckets: defaultLatencyHistogramBucketsMs,
expectedLatencyHistogramBuckets: defaultLatencyHistogramBuckets,
},
{
name: "latency histogram configured with catch-all bucket to check no additional catch-all bucket inserted",
latencyHistogramBuckets: []time.Duration{2 * time.Millisecond},
expectedLatencyHistogramBuckets: []float64{2},
expectedLatencyHistogramBuckets: []float64{0.002},
},
{
name: "full config with no catch-all bucket and check the catch-all bucket is inserted",
latencyHistogramBuckets: []time.Duration{2 * time.Millisecond},
expectedLatencyHistogramBuckets: []float64{2},
expectedLatencyHistogramBuckets: []float64{0.002},
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down
41 changes: 29 additions & 12 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ const (
)

var (
defaultLatencyHistogramBucketsMs = []float64{
legacyDefaultLatencyHistogramBuckets = []float64{
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
}
defaultLatencyHistogramBuckets = []float64{
0.002, 0.004, 0.006, 0.008, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1, 1.4, 2, 5, 10, 15,
}

defaultPeerAttributes = []string{
semconv.AttributeDBName, semconv.AttributeNetSockPeerAddr, semconv.AttributeNetPeerName, semconv.AttributeRPCService, semconv.AttributeNetSockPeerName, semconv.AttributeNetPeerName, semconv.AttributeHTTPURL, semconv.AttributeHTTPTarget,
}
Expand Down Expand Up @@ -78,9 +82,12 @@ type serviceGraphProcessor struct {
func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProcessor {
pConfig := config.(*Config)

bounds := defaultLatencyHistogramBucketsMs
bounds := defaultLatencyHistogramBuckets
if legacyLatencyUnitMsFeatureGate.IsEnabled() {
bounds = legacyDefaultLatencyHistogramBuckets
}
if pConfig.LatencyHistogramBuckets != nil {
bounds = mapDurationsToMillis(pConfig.LatencyHistogramBuckets)
bounds = mapDurationsToFloat(pConfig.LatencyHistogramBuckets)
}

if pConfig.CacheLoop <= 0 {
Expand Down Expand Up @@ -254,7 +261,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace.
e.TraceID = traceID
e.ConnectionType = connectionType
e.ClientService = serviceName
e.ClientLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds())
e.ClientLatencySec = spanDuration(span)
e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError
p.upsertDimensions(clientKind, e.Dimensions, rAttributes, span.Attributes())

Expand All @@ -267,7 +274,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace.
if dbName, ok := findAttributeValue(semconv.AttributeDBName, rAttributes, span.Attributes()); ok {
e.ConnectionType = store.Database
e.ServerService = dbName
e.ServerLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds())
e.ServerLatencySec = spanDuration(span)
}
})
case ptrace.SpanKindConsumer:
Expand All @@ -281,7 +288,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace.
e.TraceID = traceID
e.ConnectionType = connectionType
e.ServerService = serviceName
e.ServerLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds())
e.ServerLatencySec = spanDuration(span)
e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError
p.upsertDimensions(serverKind, e.Dimensions, rAttributes, span.Attributes())
})
Expand Down Expand Up @@ -655,16 +662,26 @@ func (p *serviceGraphProcessor) cleanCache() {
p.seriesMutex.Unlock()
}

// durationToMillis converts the given duration to the number of milliseconds it represents.
// Note that this can return sub-millisecond (i.e. < 1ms) values as well.
func durationToMillis(d time.Duration) float64 {
return float64(d.Nanoseconds()) / float64(time.Millisecond.Nanoseconds())
// spanDuration returns the duration of the given span in seconds (legacy ms).
func spanDuration(span ptrace.Span) float64 {
if legacyLatencyUnitMsFeatureGate.IsEnabled() {
return float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds())
}
return float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Second.Nanoseconds())
}

// durationToFloat converts the given duration to the number of seconds (legacy ms) it represents.
func durationToFloat(d time.Duration) float64 {
if legacyLatencyUnitMsFeatureGate.IsEnabled() {
return float64(d.Milliseconds())
}
return d.Seconds()
}

func mapDurationsToMillis(vs []time.Duration) []float64 {
func mapDurationsToFloat(vs []time.Duration) []float64 {
vsm := make([]float64, len(vs))
for i, v := range vs {
vsm[i] = durationToMillis(v)
vsm[i] = durationToFloat(v)
}
return vsm
}
75 changes: 51 additions & 24 deletions processor/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,10 @@ func TestConnectorShutdown(t *testing.T) {
}

func TestProcessorConsume(t *testing.T) {
// set virtual node feature
_ = featuregate.GlobalRegistry().Set(virtualNodeFeatureGate.ID(), true)

for _, tc := range []struct {
name string
cfg Config
gates []*featuregate.Gate
sampleTraces ptrace.Traces
verifyMetrics func(t *testing.T, md pmetric.Metrics)
}{
Expand All @@ -155,6 +153,7 @@ func TestProcessorConsume(t *testing.T) {
TTL: time.Nanosecond,
},
},
gates: []*featuregate.Gate{virtualNodeFeatureGate},
sampleTraces: incompleteClientTraces(),
verifyMetrics: func(t *testing.T, md pmetric.Metrics) {
v, ok := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Get("server")
Expand All @@ -172,6 +171,7 @@ func TestProcessorConsume(t *testing.T) {
TTL: time.Nanosecond,
},
},
gates: []*featuregate.Gate{virtualNodeFeatureGate},
sampleTraces: incompleteServerTraces(false),
verifyMetrics: func(t *testing.T, md pmetric.Metrics) {
v, ok := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Get("client")
Expand All @@ -189,13 +189,32 @@ func TestProcessorConsume(t *testing.T) {
TTL: time.Nanosecond,
},
},
gates: []*featuregate.Gate{virtualNodeFeatureGate},
sampleTraces: incompleteServerTraces(true),
verifyMetrics: func(t *testing.T, md pmetric.Metrics) {
assert.Equal(t, 0, md.MetricCount())
},
},
{
name: "complete traces with legacy latency metrics",
cfg: Config{
MetricsExporter: "mock",
Dimensions: []string{"some-attribute", "non-existing-attribute"},
Store: StoreConfig{
MaxItems: 10,
TTL: time.Nanosecond,
},
}, sampleTraces: buildSampleTrace(t, "val"),
gates: []*featuregate.Gate{virtualNodeFeatureGate, legacyLatencyUnitMsFeatureGate},
verifyMetrics: verifyHappyCaseMetricsWithDuration(1000),
},
} {
t.Run(tc.name, func(t *testing.T) {
// Set feature gates
for _, gate := range tc.gates {
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), true))
}

// Prepare
p := newProcessor(zaptest.NewLogger(t), &tc.cfg)
p.tracesConsumer = consumertest.NewNop()
Expand Down Expand Up @@ -224,11 +243,13 @@ func TestProcessorConsume(t *testing.T) {

// Shutdown the processor
assert.NoError(t, p.Shutdown(context.Background()))

// Unset feature gates
for _, gate := range tc.gates {
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), false))
}
})
}

// unset virtual node feature
_ = featuregate.GlobalRegistry().Set(virtualNodeFeatureGate.ID(), false)
}

func TestConnectorConsume(t *testing.T) {
Expand Down Expand Up @@ -296,27 +317,33 @@ func TestProcessor_MetricsFlushInterval(t *testing.T) {
}

func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) {
assert.Equal(t, 3, md.MetricCount())
verifyHappyCaseMetricsWithDuration(1)(t, md)
}

rms := md.ResourceMetrics()
assert.Equal(t, 1, rms.Len())
func verifyHappyCaseMetricsWithDuration(durationSum float64) func(t *testing.T, md pmetric.Metrics) {
return func(t *testing.T, md pmetric.Metrics) {
assert.Equal(t, 3, md.MetricCount())

sms := rms.At(0).ScopeMetrics()
assert.Equal(t, 1, sms.Len())
rms := md.ResourceMetrics()
assert.Equal(t, 1, rms.Len())

ms := sms.At(0).Metrics()
assert.Equal(t, 3, ms.Len())
sms := rms.At(0).ScopeMetrics()
assert.Equal(t, 1, sms.Len())

mCount := ms.At(0)
verifyCount(t, mCount)
ms := sms.At(0).Metrics()
assert.Equal(t, 3, ms.Len())

mServerDuration := ms.At(1)
assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name())
verifyDuration(t, mServerDuration)
mCount := ms.At(0)
verifyCount(t, mCount)

mClientDuration := ms.At(2)
assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name())
verifyDuration(t, mClientDuration)
mServerDuration := ms.At(1)
assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name())
verifyDuration(t, mServerDuration, durationSum)

mClientDuration := ms.At(2)
assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name())
verifyDuration(t, mClientDuration, durationSum)
}
}

func verifyCount(t *testing.T, m pmetric.Metric) {
Expand All @@ -339,13 +366,13 @@ func verifyCount(t *testing.T, m pmetric.Metric) {
verifyAttr(t, attributes, "client_some-attribute", "val")
}

func verifyDuration(t *testing.T, m pmetric.Metric) {
func verifyDuration(t *testing.T, m pmetric.Metric, durationSum float64) {
assert.Equal(t, pmetric.MetricTypeHistogram, m.Type())
dps := m.Histogram().DataPoints()
assert.Equal(t, 1, dps.Len())

dp := dps.At(0)
assert.Equal(t, float64(1000), dp.Sum()) // Duration: 1sec
assert.Equal(t, durationSum, dp.Sum()) // Duration: 1sec
assert.Equal(t, uint64(1), dp.Count())
buckets := pcommon.NewUInt64Slice()
buckets.FromRaw([]uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0})
Expand Down Expand Up @@ -526,7 +553,7 @@ func TestUpdateDurationMetrics(t *testing.T) {
reqClientDurationSecondsSum: make(map[string]float64),
reqClientDurationSecondsCount: make(map[string]uint64),
reqClientDurationSecondsBucketCounts: make(map[string][]uint64),
reqDurationBounds: defaultLatencyHistogramBucketsMs,
reqDurationBounds: defaultLatencyHistogramBuckets,
keyToMetric: make(map[string]metricSeries),
config: &Config{
Dimensions: []string{},
Expand Down

0 comments on commit c9f1865

Please sign in to comment.