Skip to content

Commit

Permalink
add virtual_node label & tests
Browse files Browse the repository at this point in the history
  • Loading branch information
t00mas authored and jpkrohling committed May 21, 2024
1 parent 80317ce commit cff9427
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 28 deletions.
27 changes: 27 additions & 0 deletions .chloggen/extra-labels-for-service-graphs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added a new configuration option `enable_virtual_node_label` to allow users to identify which node is the virtual node in each edge of the service graph."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31889]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
41 changes: 40 additions & 1 deletion connector/servicegraphconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,16 @@ The following settings can be optionally configured:
- Default: `2s`
- `virtual_node_peer_attributes`: the list of attributes, ordered by priority, whose presence in a client span will result in the creation of a virtual server node. An empty list disables virtual node creation.
- Default: `[peer.service, db.name, db.system]`
- `virtual_node_extra_label`: adds an extra label `virtual_node` with an optional value of `client` or `server`, indicating which node is the uninstrumented one.
- Default: `false`
- `metrics_flush_interval`: the interval at which metrics are flushed to the exporter.
- Default: Metrics are flushed on every received batch of traces.
- `database_name_attribute`: the attribute name used to identify the database name from span attributes.
- Default: `db.name`

## Example configuration
## Example configurations

### Sample with custom buckets and dimensions

```yaml
receivers:
Expand Down Expand Up @@ -173,3 +177,38 @@ service:
receivers: [servicegraph]
exporters: [prometheus/servicegraph]
```

### Sample with options for uninstrumented services identification

```yaml
receivers:
otlp:
protocols:
grpc:
connectors:
servicegraph:
dimensions:
- db.system
- messaging.system
virtual_node_peer_attributes:
- db.name
- db.system
- messaging.system
- peer.service
virtual_node_extra_label: true
exporters:
prometheus/servicegraph:
endpoint: localhost:9090
namespace: servicegraph
service:
pipelines:
traces:
receivers: [otlp]
exporters: [servicegraph]
metrics/servicegraph:
receivers: [servicegraph]
exporters: [prometheus/servicegraph]
```
6 changes: 6 additions & 0 deletions connector/servicegraphconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ type Config struct {

// Store contains the config for the in-memory store used to find requests between services by pairing spans.
Store StoreConfig `mapstructure:"store"`

// CacheLoop is the time to cleans the cache periodically.
CacheLoop time.Duration `mapstructure:"cache_loop"`

// CacheLoop is the time to expire old entries from the store periodically.
StoreExpirationLoop time.Duration `mapstructure:"store_expiration_loop"`

// VirtualNodePeerAttributes the list of attributes need to match, the higher the front, the higher the priority.
VirtualNodePeerAttributes []string `mapstructure:"virtual_node_peer_attributes"`

// VirtualNodeExtraLabel enables the `virtual_node` label to be added to the spans.
VirtualNodeExtraLabel bool `mapstructure:"virtual_node_extra_label"`

// MetricsFlushInterval is the interval at which metrics are flushed to the exporter.
// If set to 0, metrics are flushed on every received batch of traces.
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`
Expand Down
7 changes: 7 additions & 0 deletions connector/servicegraphconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
metricKeySeparator = string(byte(0))
clientKind = "client"
serverKind = "server"
virtualNodeLabel = "virtual_node"
)

var (
Expand Down Expand Up @@ -371,11 +372,17 @@ func (p *serviceGraphConnector) onExpire(e *store.Edge) {
e.ConnectionType = store.VirtualNode
if len(e.ClientService) == 0 && e.Key.SpanIDIsEmpty() {
e.ClientService = "user"
if p.config.VirtualNodeExtraLabel {
e.VirtualNodeLabel = store.ClientVirtualNode
}
p.onComplete(e)
}

if len(e.ServerService) == 0 {
e.ServerService = p.getPeerHost(p.config.VirtualNodePeerAttributes, e.Peer)
if p.config.VirtualNodeExtraLabel {
e.VirtualNodeLabel = store.ServerVirtualNode
}
p.onComplete(e)
}
}
Expand Down
145 changes: 121 additions & 24 deletions connector/servicegraphconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -19,6 +21,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -76,7 +79,8 @@ func TestConnectorConsume(t *testing.T) {
assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))

// Test & verify
td := buildSampleTrace(t, "val")
traceOpts := &SampleTraceOptions{ClientAttrKeyValues: map[string]string{"some-attribute": "val"}}
td := buildSampleTrace(t, traceOpts)
// The assertion is part of verifyHappyCaseMetrics func.
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

Expand Down Expand Up @@ -120,6 +124,37 @@ func verifyHappyCaseMetricsWithDuration(durationSum float64) func(t *testing.T,
}
}

func verifyMetrics(t *testing.T, md pmetric.Metrics) {
verifyMetricsWithDuration(1)(t, md)
}

func verifyMetricsWithDuration(durationSum float64) func(t *testing.T, md pmetric.Metrics) {
return func(t *testing.T, md pmetric.Metrics) {
// assert.Equal(t, 3, md.MetricCount())

rms := md.ResourceMetrics()
assert.Equal(t, 1, rms.Len())

sms := rms.At(0).ScopeMetrics()
assert.Equal(t, 1, sms.Len())

ms := sms.At(0).Metrics()
assert.Equal(t, 1, ms.Len())
// assert.Equal(t, 3, ms.Len())

// mCount := ms.At(0)
// verifyCount(t, mCount)

// mServerDuration := ms.At(1)
// assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name())
// verifyDuration(t, mServerDuration, durationSum)

// mClientDuration := ms.At(2)
// assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name())
// verifyDuration(t, mClientDuration, durationSum)
}
}

func verifyCount(t *testing.T, m pmetric.Metric) {
assert.Equal(t, "traces_service_graph_request_total", m.Name())

Expand Down Expand Up @@ -166,7 +201,14 @@ func verifyAttr(t *testing.T, attrs pcommon.Map, k, expected string) {
assert.Equal(t, expected, v.AsString())
}

func buildSampleTrace(t *testing.T, attrValue string) ptrace.Traces {
type SampleTraceOptions struct {
ClientAttrKeyValues map[string]string
ServerAttrKeyValues map[string]string
SkipClientSpan bool
SkipServerSpan bool
}

func buildSampleTrace(t *testing.T, opts *SampleTraceOptions) ptrace.Traces {
tStart := time.Date(2022, 1, 2, 3, 4, 5, 6, time.UTC)
tEnd := time.Date(2022, 1, 2, 3, 4, 6, 6, time.UTC)

Expand All @@ -187,23 +229,32 @@ func buildSampleTrace(t *testing.T, attrValue string) ptrace.Traces {
_, err = rand.Read(serverSpanID[:])
assert.NoError(t, err)

clientSpan := scopeSpans.Spans().AppendEmpty()
clientSpan.SetName("client span")
clientSpan.SetSpanID(clientSpanID)
clientSpan.SetTraceID(traceID)
clientSpan.SetKind(ptrace.SpanKindClient)
clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
clientSpan.Attributes().PutStr("some-attribute", attrValue) // Attribute selected as dimension for metrics

serverSpan := scopeSpans.Spans().AppendEmpty()
serverSpan.SetName("server span")
serverSpan.SetSpanID(serverSpanID)
serverSpan.SetTraceID(traceID)
serverSpan.SetParentSpanID(clientSpanID)
serverSpan.SetKind(ptrace.SpanKindServer)
serverSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
serverSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
if !opts.SkipClientSpan {
clientSpan := scopeSpans.Spans().AppendEmpty()
clientSpan.SetName("client span")
clientSpan.SetSpanID(clientSpanID)
for k, v := range opts.ClientAttrKeyValues {
clientSpan.Attributes().PutStr(k, v)
}
clientSpan.SetTraceID(traceID)
clientSpan.SetKind(ptrace.SpanKindClient)
clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
}

if !opts.SkipServerSpan {
serverSpan := scopeSpans.Spans().AppendEmpty()
serverSpan.SetName("server span")
serverSpan.SetSpanID(serverSpanID)
for k, v := range opts.ServerAttrKeyValues {
serverSpan.Attributes().PutStr(k, v)
}
serverSpan.SetTraceID(traceID)
serverSpan.SetParentSpanID(clientSpanID)
serverSpan.SetKind(ptrace.SpanKindServer)
serverSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
serverSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
}

return traces
}
Expand Down Expand Up @@ -293,7 +344,8 @@ func TestStaleSeriesCleanup(t *testing.T) {
assert.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))

// ConsumeTraces
td := buildSampleTrace(t, "first")
traceOpts1st := &SampleTraceOptions{ClientAttrKeyValues: map[string]string{"some-attribute": "first"}}
td := buildSampleTrace(t, traceOpts1st)
assert.NoError(t, p.ConsumeTraces(context.Background(), td))

// Make series stale and force a cache cleanup
Expand All @@ -305,7 +357,8 @@ func TestStaleSeriesCleanup(t *testing.T) {
assert.Equal(t, 0, len(p.keyToMetric))

// ConsumeTraces with a trace with different attribute value
td = buildSampleTrace(t, "second")
traceOpts2nd := &SampleTraceOptions{ClientAttrKeyValues: map[string]string{"some-attribute": "second"}}
td = buildSampleTrace(t, traceOpts2nd)
assert.NoError(t, p.ConsumeTraces(context.Background(), td))

// Shutdown the connector
Expand All @@ -330,7 +383,8 @@ func TestMapsAreConsistentDuringCleanup(t *testing.T) {
assert.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))

// ConsumeTraces
td := buildSampleTrace(t, "first")
traceOpts1st := &SampleTraceOptions{ClientAttrKeyValues: map[string]string{"some-attribute": "first"}}
td := buildSampleTrace(t, traceOpts1st)
assert.NoError(t, p.ConsumeTraces(context.Background(), td))

// Make series stale and force a cache cleanup
Expand Down Expand Up @@ -396,7 +450,8 @@ func TestValidateOwnTelemetry(t *testing.T) {
assert.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))

// ConsumeTraces
td := buildSampleTrace(t, "first")
traceOpts1st := &SampleTraceOptions{ClientAttrKeyValues: map[string]string{"some-attribute": "first"}}
td := buildSampleTrace(t, traceOpts1st)
assert.NoError(t, p.ConsumeTraces(context.Background(), td))

// Make series stale and force a cache cleanup
Expand All @@ -408,7 +463,8 @@ func TestValidateOwnTelemetry(t *testing.T) {
assert.Equal(t, 0, len(p.keyToMetric))

// ConsumeTraces with a trace with different attribute value
td = buildSampleTrace(t, "second")
traceOpts2nd := &SampleTraceOptions{ClientAttrKeyValues: map[string]string{"some-attribute": "second"}}
td = buildSampleTrace(t, traceOpts2nd)
assert.NoError(t, p.ConsumeTraces(context.Background(), td))

// Shutdown the connector
Expand All @@ -434,3 +490,44 @@ func TestValidateOwnTelemetry(t *testing.T) {
}
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}

func TestVirtualNodeLabels(t *testing.T) {
err := featuregate.GlobalRegistry().Set("connector.servicegraph.virtualNode", true)
require.NoError(t, err)

virtualNodeDimensions := []string{"db.system", "messaging.system"}
cfg := &Config{
Dimensions: virtualNodeDimensions,
MetricsFlushInterval: 2 * time.Second,
Store: StoreConfig{MaxItems: 10, TTL: time.Second},
VirtualNodePeerAttributes: virtualNodeDimensions,
VirtualNodeExtraLabel: true,
}

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
conn := newConnector(set, cfg)
conn.metricsConsumer = newMockMetricsExporter()

assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer require.NoError(t, conn.Shutdown(context.Background()))

td, err := golden.ReadTraces("testdata/virtual-node-label-trace.yaml")
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

conn.store.Expire()
actualMetrics, err := conn.buildMetrics()
assert.NoError(t, err)

expectedMetrics, err := golden.ReadMetrics("testdata/virtual-node-label-expected-metrics.yaml")
assert.NoError(t, err)

err = pmetrictest.CompareMetrics(expectedMetrics, actualMetrics,
pmetrictest.IgnoreMetricsOrder(),
pmetrictest.IgnoreMetricDataPointsOrder(),
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
)
require.NoError(t, err)
}
5 changes: 4 additions & 1 deletion connector/servicegraphconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/servi
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.99.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.101.0
go.opentelemetry.io/collector/config/configtelemetry v0.101.0
Expand All @@ -25,7 +27,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -46,6 +48,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.99.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
Expand Down
Loading

0 comments on commit cff9427

Please sign in to comment.