Skip to content

Commit

Permalink
Add a otelcol.connector.servicegraph component
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Aug 31, 2023
1 parent ce8eb4d commit 2fc97e1
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Main (unreleased)

- `otelcol.connector.spanlogs` - creates logs from spans. It is the flow mode equivalent
to static mode's `automatic_logging` processor. (@ptodev)
- `otelcol.connector.servicegraph` - creates service graph metrics from spans. It is the
flow mode equivalent to static mode's `service_graphs` processor. (@ptodev)

- Flow: allow the HTTP server to be configured with TLS in the config file
using the new `http` config block. (@rfratto)
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/auth/headers" // Import otelcol.auth.headers
_ "github.com/grafana/agent/component/otelcol/auth/oauth2" // Import otelcol.auth.oauth2
_ "github.com/grafana/agent/component/otelcol/auth/sigv4" // Import otelcol.auth.sigv4
_ "github.com/grafana/agent/component/otelcol/connector/servicegraph" // Import otelcol.connector.servicegraph
_ "github.com/grafana/agent/component/otelcol/connector/spanlogs" // Import otelcol.connector.spanlogs
_ "github.com/grafana/agent/component/otelcol/connector/spanmetrics" // Import otelcol.connector.spanmetrics
_ "github.com/grafana/agent/component/otelcol/exporter/jaeger" // Import otelcol.exporter.jaeger
Expand Down
178 changes: 178 additions & 0 deletions component/otelcol/connector/servicegraph/servicegraph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package servicegraph

import (
"fmt"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/connector"
"github.com/grafana/river"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.connector.servicegraph",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := servicegraphconnector.NewFactory()
return connector.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.connector.servicegraph component.
type Arguments struct {
// LatencyHistogramBuckets is the list of durations representing latency histogram buckets.
// See defaultLatencyHistogramBucketsMs in processor.go for the default value.
LatencyHistogramBuckets []time.Duration `river:"latency_histogram_buckets,attr,optional"`

// Dimensions defines the list of additional dimensions on top of the provided:
// - client
// - server
// - failed
// - connection_type
// The dimensions will be fetched from the span's attributes. Examples of some conventionally used attributes:
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go.
Dimensions []string `river:"dimensions,attr,optional"`

// Store contains the config for the in-memory store used to find requests between services by pairing spans.
Store StoreConfig `river:"store,block,optional"`
// CacheLoop is the time to cleans the cache periodically.
CacheLoop time.Duration `river:"cache_loop,attr,optional"`
// CacheLoop is the time to expire old entries from the store periodically.
StoreExpirationLoop time.Duration `river:"store_expiration_loop,attr,optional"`
// VirtualNodePeerAttributes the list of attributes need to match, the higher the front, the higher the priority.
//TODO: Ad VirtualNodePeerAttributes when it's no longer controlled by
// the "processor.servicegraph.virtualNode" feature gate.
// VirtualNodePeerAttributes []string `river:"virtual_node_peer_attributes,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}

type StoreConfig struct {
// MaxItems is the maximum number of items to keep in the store.
MaxItems int `river:"max_items,attr,optional"`
// TTL is the time to live for items in the store.
TTL time.Duration `river:"ttl,attr,optional"`
}

var (
_ river.Validator = (*Arguments)(nil)
_ river.Defaulter = (*Arguments)(nil)
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
LatencyHistogramBuckets: []time.Duration{
2 * time.Millisecond,
4 * time.Millisecond,
6 * time.Millisecond,
8 * time.Millisecond,
10 * time.Millisecond,
50 * time.Millisecond,
100 * time.Millisecond,
200 * time.Millisecond,
400 * time.Millisecond,
800 * time.Millisecond,
1 * time.Second,
1400 * time.Millisecond,
2 * time.Second,
5 * time.Second,
10 * time.Second,
15 * time.Second,
},
Dimensions: []string{},
Store: StoreConfig{
MaxItems: 1000,
TTL: 2 * time.Millisecond,
},
CacheLoop: 1 * time.Minute,
StoreExpirationLoop: 2 * time.Second,
//TODO: Ad VirtualNodePeerAttributes when it's no longer controlled by
// the "processor.servicegraph.virtualNode" feature gate.
// VirtualNodePeerAttributes: []string{
// semconv.AttributeDBName,
// semconv.AttributeNetSockPeerAddr,
// semconv.AttributeNetPeerName,
// semconv.AttributeRPCService,
// semconv.AttributeNetSockPeerName,
// semconv.AttributeNetPeerName,
// semconv.AttributeHTTPURL,
// semconv.AttributeHTTPTarget,
// },
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
if args.CacheLoop <= 0 {
return fmt.Errorf("cache_loop must be greater than 0")
}

if args.StoreExpirationLoop <= 0 {
return fmt.Errorf("store_expiration_loop must be greater than 0")
}

if args.Store.MaxItems <= 0 {
return fmt.Errorf("store.max_items must be greater than 0")
}

if args.Store.TTL <= 0 {
return fmt.Errorf("store.ttl must be greater than 0")
}

return nil
}

// Convert implements connector.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &servicegraphprocessor.Config{
// Never set a metric exporter.
// The consumer of metrics will be set via Otel's Connector API.
//
// MetricsExporter: "",
LatencyHistogramBuckets: args.LatencyHistogramBuckets,
Dimensions: args.Dimensions,
Store: servicegraphprocessor.StoreConfig{
MaxItems: args.Store.MaxItems,
TTL: args.Store.TTL,
},
CacheLoop: args.CacheLoop,
StoreExpirationLoop: args.StoreExpirationLoop,
//TODO: Ad VirtualNodePeerAttributes when it's no longer controlled by
// the "processor.servicegraph.virtualNode" feature gate.
// VirtualNodePeerAttributes: args.VirtualNodePeerAttributes,
}, nil
}

// Extensions implements connector.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements connector.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements connector.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// ConnectorType() int implements connector.Arguments.
func (Arguments) ConnectorType() int {
return connector.ConnectorTracesToMetrics
}
156 changes: 156 additions & 0 deletions component/otelcol/connector/servicegraph/servicegraph_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package servicegraph_test

import (
"testing"
"time"

"github.com/grafana/agent/component/otelcol/connector/servicegraph"
"github.com/grafana/river"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor"
"github.com/stretchr/testify/require"
)

func TestArguments_UnmarshalRiver(t *testing.T) {
tests := []struct {
testName string
cfg string
expected servicegraphprocessor.Config
errorMsg string
}{
{
testName: "Defaults",
cfg: `
output {}
`,
expected: servicegraphprocessor.Config{
LatencyHistogramBuckets: []time.Duration{
2 * time.Millisecond,
4 * time.Millisecond,
6 * time.Millisecond,
8 * time.Millisecond,
10 * time.Millisecond,
50 * time.Millisecond,
100 * time.Millisecond,
200 * time.Millisecond,
400 * time.Millisecond,
800 * time.Millisecond,
1 * time.Second,
1400 * time.Millisecond,
2 * time.Second,
5 * time.Second,
10 * time.Second,
15 * time.Second,
},
Dimensions: []string{},
Store: servicegraphprocessor.StoreConfig{
MaxItems: 1000,
TTL: 2 * time.Millisecond,
},
CacheLoop: 1 * time.Minute,
StoreExpirationLoop: 2 * time.Second,
//TODO: Ad VirtualNodePeerAttributes when it's no longer controlled by
// the "processor.servicegraph.virtualNode" feature gate.
// VirtualNodePeerAttributes: []string{
// "db.name",
// "net.sock.peer.addr",
// "net.peer.name",
// "rpc.service",
// "net.sock.peer.name",
// "net.peer.name",
// "http.url",
// "http.target",
// },
},
},
{
testName: "ExplicitValues",
cfg: `
dimensions = ["foo", "bar"]
latency_histogram_buckets = ["2ms", "4s", "6h"]
store {
max_items = 333
ttl = "12h"
}
cache_loop = "55m"
store_expiration_loop = "77s"
output {}
`,
expected: servicegraphprocessor.Config{
LatencyHistogramBuckets: []time.Duration{
2 * time.Millisecond,
4 * time.Second,
6 * time.Hour,
},
Dimensions: []string{"foo", "bar"},
Store: servicegraphprocessor.StoreConfig{
MaxItems: 333,
TTL: 12 * time.Hour,
},
CacheLoop: 55 * time.Minute,
StoreExpirationLoop: 77 * time.Second,
//TODO: Ad VirtualNodePeerAttributes when it's no longer controlled by
// the "processor.servicegraph.virtualNode" feature gate.
// VirtualNodePeerAttributes: []string{"attr1", "attr2"},
},
},
{
testName: "InvalidCacheLoop",
cfg: `
cache_loop = "0s"
output {}
`,
errorMsg: "cache_loop must be greater than 0",
},
{
testName: "InvalidStoreExpirationLoop",
cfg: `
store_expiration_loop = "0s"
output {}
`,
errorMsg: "store_expiration_loop must be greater than 0",
},
{
testName: "InvalidStoreMaxItems",
cfg: `
store {
max_items = 0
}
output {}
`,
errorMsg: "store.max_items must be greater than 0",
},
{
testName: "InvalidStoreTTL",
cfg: `
store {
ttl = "0s"
}
output {}
`,
errorMsg: "store.ttl must be greater than 0",
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args servicegraph.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)
if tc.errorMsg != "" {
require.ErrorContains(t, err, tc.errorMsg)
return
}

require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*servicegraphprocessor.Config)

require.Equal(t, tc.expected, *actual)
})
}
}
Loading

0 comments on commit 2fc97e1

Please sign in to comment.