Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/servicegraphconnector] Adds a new config option allowing an extra label for virtual node identification #32826

Merged
merged 26 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cff9427
add virtual_node label & tests
t00mas Apr 30, 2024
2400a2d
fix & tests
t00mas May 2, 2024
79064f5
add extra label and harden tests
t00mas May 2, 2024
9ac68a9
add tests for extra dimensions with trace
t00mas May 2, 2024
eb107bf
removing unnecessary improvements in favor of golden
t00mas May 2, 2024
b3ed312
lint yaml
t00mas May 2, 2024
a41614e
resolve expired edge flushing & flakiness
t00mas May 10, 2024
a38f010
fix newConnector usage
t00mas May 13, 2024
cff100e
update golden & pdatatest
t00mas May 15, 2024
e99f941
make fmt
jpkrohling May 21, 2024
30bdc11
crosslink
jpkrohling May 22, 2024
a11082d
remove flakyness from tests
t00mas May 22, 2024
1d04cd4
Merge branch 'main' into extra-labels-for-service-graphs
t00mas May 22, 2024
6a6789a
Merge branch 'main' into extra-labels-for-service-graphs
t00mas May 22, 2024
b04aca4
Merge branch 'main' into extra-labels-for-service-graphs
t00mas May 22, 2024
8709bee
Merge branch 'main' into extra-labels-for-service-graphs
t00mas May 23, 2024
d99dc78
go mod tidy
t00mas May 24, 2024
3f978dd
fix signature in test
t00mas May 24, 2024
a6e46e7
fix conflict
t00mas May 24, 2024
faef5c7
Merge branch 'main' into extra-labels-for-service-graphs
t00mas May 24, 2024
d37f522
fully separate tests
t00mas May 24, 2024
b240fd4
add a goroutine-safe way to retrieve test metrics
t00mas May 27, 2024
e03f97d
use assert eventually for store flushing
t00mas May 28, 2024
2f45995
Merge branch 'main' into extra-labels-for-service-graphs
t00mas Jun 19, 2024
8650f08
go mod tidy
t00mas Jun 19, 2024
bbbaa1c
Merge branch 'main' into extra-labels-for-service-graphs
jpkrohling Jun 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/extra-labels-for-service-graphs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added a new configuration option `enable_virtual_node_label` to allow users to identify which node is the virtual node in each edge of the service graph."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31889]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
41 changes: 40 additions & 1 deletion connector/servicegraphconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,16 @@ The following settings can be optionally configured:
- Default: `2s`
- `virtual_node_peer_attributes`: the list of attributes, ordered by priority, whose presence in a client span will result in the creation of a virtual server node. An empty list disables virtual node creation.
- Default: `[peer.service, db.name, db.system]`
- `virtual_node_extra_label`: adds an extra label `virtual_node` with an optional value of `client` or `server`, indicating which node is the uninstrumented one.
- Default: `false`
- `metrics_flush_interval`: the interval at which metrics are flushed to the exporter.
- Default: Metrics are flushed on every received batch of traces.
- `database_name_attribute`: the attribute name used to identify the database name from span attributes.
- Default: `db.name`

## Example configuration
## Example configurations

### Sample with custom buckets and dimensions

```yaml
receivers:
Expand Down Expand Up @@ -173,3 +177,38 @@ service:
receivers: [servicegraph]
exporters: [prometheus/servicegraph]
```

### Sample with options for uninstrumented services identification

```yaml
receivers:
otlp:
protocols:
grpc:

connectors:
servicegraph:
dimensions:
- db.system
- messaging.system
virtual_node_peer_attributes:
- db.name
- db.system
- messaging.system
- peer.service
virtual_node_extra_label: true

exporters:
prometheus/servicegraph:
endpoint: localhost:9090
namespace: servicegraph

service:
pipelines:
traces:
receivers: [otlp]
exporters: [servicegraph]
metrics/servicegraph:
receivers: [servicegraph]
exporters: [prometheus/servicegraph]
```
6 changes: 6 additions & 0 deletions connector/servicegraphconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ type Config struct {

// Store contains the config for the in-memory store used to find requests between services by pairing spans.
Store StoreConfig `mapstructure:"store"`

// CacheLoop is the time to cleans the cache periodically.
CacheLoop time.Duration `mapstructure:"cache_loop"`

// CacheLoop is the time to expire old entries from the store periodically.
StoreExpirationLoop time.Duration `mapstructure:"store_expiration_loop"`

// VirtualNodePeerAttributes the list of attributes need to match, the higher the front, the higher the priority.
VirtualNodePeerAttributes []string `mapstructure:"virtual_node_peer_attributes"`

// VirtualNodeExtraLabel enables the `virtual_node` label to be added to the spans.
VirtualNodeExtraLabel bool `mapstructure:"virtual_node_extra_label"`

// MetricsFlushInterval is the interval at which metrics are flushed to the exporter.
// If set to 0, metrics are flushed on every received batch of traces.
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`
Expand Down
16 changes: 16 additions & 0 deletions connector/servicegraphconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
metricKeySeparator = string(byte(0))
clientKind = "client"
serverKind = "server"
virtualNodeLabel = "virtual_node"
)

var (
Expand Down Expand Up @@ -349,11 +350,17 @@ func (p *serviceGraphConnector) onExpire(e *store.Edge) {
e.ConnectionType = store.VirtualNode
if len(e.ClientService) == 0 && e.Key.SpanIDIsEmpty() {
e.ClientService = "user"
if p.config.VirtualNodeExtraLabel {
e.VirtualNodeLabel = store.ClientVirtualNode
}
p.onComplete(e)
}

if len(e.ServerService) == 0 {
e.ServerService = p.getPeerHost(p.config.VirtualNodePeerAttributes, e.Peer)
if p.config.VirtualNodeExtraLabel {
e.VirtualNodeLabel = store.ServerVirtualNode
}
p.onComplete(e)
}
}
Expand All @@ -363,6 +370,10 @@ func (p *serviceGraphConnector) aggregateMetricsForEdge(e *store.Edge) {
metricKey := p.buildMetricKey(e.ClientService, e.ServerService, string(e.ConnectionType), e.Dimensions)
dimensions := buildDimensions(e)

if p.config.VirtualNodeExtraLabel {
dimensions = addExtraLabel(dimensions, virtualNodeLabel, string(e.VirtualNodeLabel))
}
Comment on lines +373 to +375
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we only want to add this label if the edge has a virtual node, right? We should check that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be - it's an easy addition, we probably just change to:

if e.ConnectionType == store.VirtualNode && p.config.VirtualNodeExtraLabel {

however, is it better to not have it, or to have virtual_node: with an empty value when this option is enabled?

I'm leaning towards having the label anyway with an empty value, but not 100% convinced of that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be confusing, but I don't have a strong opinion on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK then, I'll opt for leaving it with an empty value sometimes but always present. We can review this if it's an issue.


p.seriesMutex.Lock()
defer p.seriesMutex.Unlock()
p.updateSeries(metricKey, dimensions)
Expand Down Expand Up @@ -434,6 +445,11 @@ func buildDimensions(e *store.Edge) pcommon.Map {
return dims
}

func addExtraLabel(dimensions pcommon.Map, label, value string) pcommon.Map {
dimensions.PutStr(label, value)
return dimensions
}

func (p *serviceGraphConnector) buildMetrics() (pmetric.Metrics, error) {
m := pmetric.NewMetrics()
ilm := m.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
Expand Down
139 changes: 138 additions & 1 deletion connector/servicegraphconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func TestConnectorStart(t *testing.T) {
Expand Down Expand Up @@ -194,7 +197,6 @@ func buildSampleTrace(t *testing.T, attrValue string) ptrace.Traces {
clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
clientSpan.Attributes().PutStr("some-attribute", attrValue) // Attribute selected as dimension for metrics

serverSpan := scopeSpans.Spans().AppendEmpty()
serverSpan.SetName("server span")
serverSpan.SetSpanID(serverSpanID)
Expand Down Expand Up @@ -231,6 +233,18 @@ func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, md pmetric.Metri
return nil
}

// GetMetrics is the race-condition-safe way to get the metrics that have been consumed by the exporter.
func (m *mockMetricsExporter) GetMetrics() []pmetric.Metrics {
m.mtx.Lock()
defer m.mtx.Unlock()

// Create a copy of m.md to avoid returning a reference to the original slice
mdCopy := make([]pmetric.Metrics, len(m.md))
copy(mdCopy, m.md)

return mdCopy
}

func TestUpdateDurationMetrics(t *testing.T) {
p := serviceGraphConnector{
reqTotal: make(map[string]int64),
Expand Down Expand Up @@ -436,3 +450,126 @@ func TestValidateOwnTelemetry(t *testing.T) {
}
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}

func TestExtraDimensionsLabels(t *testing.T) {
extraDimensions := []string{"db.system", "messaging.system"}
cfg := &Config{
Dimensions: extraDimensions,
LatencyHistogramBuckets: []time.Duration{time.Duration(0.1 * float64(time.Second)), time.Duration(1 * float64(time.Second)), time.Duration(10 * float64(time.Second))},
Store: StoreConfig{MaxItems: 10},
}

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
conn, err := newConnector(set, cfg, newMockMetricsExporter())
assert.NoError(t, err)

assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer require.NoError(t, conn.Shutdown(context.Background()))

td, err := golden.ReadTraces("testdata/extra-dimensions-queue-db-trace.yaml")
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

conn.store.Expire()

metrics := conn.metricsConsumer.(*mockMetricsExporter).GetMetrics()
require.Len(t, metrics, 1)

expectedMetrics, err := golden.ReadMetrics("testdata/extra-dimensions-queue-db-expected-metrics.yaml")
assert.NoError(t, err)

err = pmetrictest.CompareMetrics(expectedMetrics, metrics[0],
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
)
require.NoError(t, err)
}

func TestVirtualNodeServerLabels(t *testing.T) {
virtualNodeDimensions := []string{"peer.service", "db.system", "messaging.system"}
cfg := &Config{
Dimensions: virtualNodeDimensions,
LatencyHistogramBuckets: []time.Duration{time.Duration(0.1 * float64(time.Second)), time.Duration(1 * float64(time.Second)), time.Duration(10 * float64(time.Second))},
Store: StoreConfig{MaxItems: 10},
VirtualNodePeerAttributes: virtualNodeDimensions,
VirtualNodeExtraLabel: true,
MetricsFlushInterval: time.Millisecond,
}

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)

trace := "testdata/virtual-node-label-server-trace.yaml"
expected := "testdata/virtual-node-label-server-expected-metrics.yaml"

conn, err := newConnector(set, cfg, newMockMetricsExporter())
assert.NoError(t, err)
assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))

td, err := golden.ReadTraces(trace)
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

conn.store.Expire()
assert.Eventually(t, func() bool {
return conn.store.Len() == 0
}, 100*time.Millisecond, 2*time.Millisecond)
require.NoError(t, conn.Shutdown(context.Background()))

metrics := conn.metricsConsumer.(*mockMetricsExporter).GetMetrics()
require.GreaterOrEqual(t, len(metrics), 1) // Unreliable sleep-based check

expectedMetrics, err := golden.ReadMetrics(expected)
assert.NoError(t, err)

err = pmetrictest.CompareMetrics(expectedMetrics, metrics[0],
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
)
require.NoError(t, err)
}

func TestVirtualNodeClientLabels(t *testing.T) {
virtualNodeDimensions := []string{"peer.service", "db.system", "messaging.system"}
cfg := &Config{
Dimensions: virtualNodeDimensions,
LatencyHistogramBuckets: []time.Duration{time.Duration(0.1 * float64(time.Second)), time.Duration(1 * float64(time.Second)), time.Duration(10 * float64(time.Second))},
Store: StoreConfig{MaxItems: 10},
VirtualNodePeerAttributes: virtualNodeDimensions,
VirtualNodeExtraLabel: true,
MetricsFlushInterval: time.Millisecond,
}

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)

trace := "testdata/virtual-node-label-client-trace.yaml"
expected := "testdata/virtual-node-label-client-expected-metrics.yaml"

conn, err := newConnector(set, cfg, newMockMetricsExporter())
assert.NoError(t, err)
assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))

td, err := golden.ReadTraces(trace)
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

conn.store.Expire()
assert.Eventually(t, func() bool {
return conn.store.Len() == 0
}, 100*time.Millisecond, 2*time.Millisecond)
require.NoError(t, conn.Shutdown(context.Background()))

metrics := conn.metricsConsumer.(*mockMetricsExporter).GetMetrics()
require.GreaterOrEqual(t, len(metrics), 1) // Unreliable sleep-based check

expectedMetrics, err := golden.ReadMetrics(expected)
assert.NoError(t, err)

err = pmetrictest.CompareMetrics(expectedMetrics, metrics[0],
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
)
require.NoError(t, err)
}
9 changes: 9 additions & 0 deletions connector/servicegraphconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/servi
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.103.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.103.0
go.opentelemetry.io/collector/config/configtelemetry v0.103.0
Expand Down Expand Up @@ -46,6 +48,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.103.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
Expand Down Expand Up @@ -103,3 +106,9 @@ retract (
v0.76.1
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
12 changes: 12 additions & 0 deletions connector/servicegraphconnector/internal/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ const (
VirtualNode ConnectionType = "virtual_node"
)

type VirtualNodeLabel string

const (
UnknownVirtualNode VirtualNodeLabel = ""
ClientVirtualNode VirtualNodeLabel = "client"
ServerVirtualNode VirtualNodeLabel = "server"
)

// Edge is an Edge between two nodes in the graph
type Edge struct {
Key Key
Expand All @@ -37,7 +45,11 @@ type Edge struct {
// expiration is the time at which the Edge expires, expressed as Unix time
expiration time.Time

// Peer is a map of peer attributes to be used for virtual node matching
Peer map[string]string

// VirtualNodeLabel is an optional label to be added to the spans
VirtualNodeLabel VirtualNodeLabel
}

func newEdge(key Key, ttl time.Duration) *Edge {
Expand Down
4 changes: 2 additions & 2 deletions connector/servicegraphconnector/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) *S
return s
}

// len is only used for testing.
func (s *Store) len() int {
// Len is only used for testing.
func (s *Store) Len() int {
return s.l.Len()
}

Expand Down
Loading