Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype: OpenTelemetry components for Flow #1843

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cmd/agentflow/example-config.flow
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,38 @@ local "file" "this-file" {
detector = "fsnotify"
}

otel "receiver_zipkin" "default" {
http {}

output {
traces = [otel.exporter_logging.default.input, otel.exporter_otlp.default.input]
}
}

otel "receiver_jaeger" "default" {
grpc {}
http {}

output {
traces = [otel.exporter_logging.default.input, otel.exporter_otlp.default.input]
}
}

otel "receiver_otlp" "default" {
grpc {}
http {}

output {
traces = [otel.exporter_logging.default.input, otel.exporter_otlp.default.input]
}
}

otel "exporter_logging" "default" {}

otel "exporter_otlp" "default" {
client {
endpoint = "localhost:54317"
}
}


9 changes: 7 additions & 2 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
package all

import (
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/targets/mutate" // Import targets.mutate
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/otel/jaegerreceiver" // Import otel.receiver_jaeger
_ "github.com/grafana/agent/component/otel/loggingexporter" // Import otel.exporter_logging
_ "github.com/grafana/agent/component/otel/otlpexporter" // Import otel.receiver_otlp
_ "github.com/grafana/agent/component/otel/otlpreceiver" // Import otel.exporter_otlp
_ "github.com/grafana/agent/component/otel/zipkinreceiver" // Import otel.receiver_zipkin
_ "github.com/grafana/agent/component/targets/mutate" // Import targets.mutate
)
159 changes: 159 additions & 0 deletions component/otel/component_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package otel

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.uber.org/multierr"
)

// componentRunner is shared between Flow component implementations, manaaging
// the running of OpenTelemetry Collector components.
type componentRunner struct {
log log.Logger

healthMut sync.RWMutex
health component.Health

schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host

onComponents chan struct{}
}

func newComponentRunner(l log.Logger) *componentRunner {
return &componentRunner{
log: l,
onComponents: make(chan struct{}, 1),
}
}

// Schedule schedules a new set of components to run. Schedule may be called
// before Run, but scheduled components won't start until Run has been invoked.
func (cr *componentRunner) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
cr.schedMut.Lock()
defer cr.schedMut.Unlock()

cr.schedComponents = cc
cr.host = h

select {
case cr.onComponents <- struct{}{}:
// Queued new message.
default:
// Nothing to do: this would be the case if onComponents is full (i.e., the
// message is already sent but not handled yet) and we don't need to do
// anything else.
}
}

// Run starts the componentRunner. Run will watch for scheduled components to
// appear and run them, terminating previously running components if they
// exist.
func (cr *componentRunner) Run(ctx context.Context) error {
var components []otelcomponent.Component

// Make sure we terminate all of our running components on shutdown.
defer func() {
cr.stopComponents(context.Background(), components...)
}()

for {
select {
case <-ctx.Done():
return nil
case <-cr.onComponents:
// We must stop the old components before running the new scheduled ones.
cr.stopComponents(ctx, components...)

cr.schedMut.Lock()
components = cr.schedComponents
host := cr.host
cr.schedMut.Unlock()

level.Debug(cr.log).Log("msg", "scheduling components", "count", len(components))
cr.startComponents(ctx, host, components...)
}
}
}

func (cr *componentRunner) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
for _, c := range cc {
if err := c.Shutdown(ctx); err != nil {
level.Error(cr.log).Log("msg", "failed to stop down inner otel component, future updates may fail", "err", err)
}
}
}

func (cr *componentRunner) startComponents(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) {
var errs error

for _, c := range cc {
if err := c.Start(ctx, h); err != nil {
level.Error(cr.log).Log("msg", "failed when starting component", "err", err)
errs = multierr.Append(errs, err)
}
}

if errs != nil {
cr.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("failed to create components: %s", errs),
UpdateTime: time.Now(),
})
} else {
cr.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "created components",
UpdateTime: time.Now(),
})
}
}

// CurrentHealth implements component.HealthComponent.
func (cr *componentRunner) CurrentHealth() component.Health {
cr.healthMut.RLock()
defer cr.healthMut.RUnlock()
return cr.health
}

func (cr *componentRunner) setHealth(h component.Health) {
cr.healthMut.Lock()
defer cr.healthMut.Unlock()
cr.health = h
}

type flowHost struct {
log log.Logger

extensions map[otelconfig.ComponentID]otelcomponent.Extension
exporters map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
}

var _ otelcomponent.Host = (*flowHost)(nil)

func (h *flowHost) ReportFatalError(err error) {
level.Error(h.log).Log("msg", "fatal error running component", "err", err)
}

func (h *flowHost) GetFactory(kind otelcomponent.Kind, componentType otelconfig.Type) otelcomponent.Factory {
// GetFactory is used for components to create other components. It's not
// clear if we want to allow this right now, so it's disabled.
return nil
}

func (h *flowHost) GetExtensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return h.extensions
}

func (h *flowHost) GetExporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return h.exporters
}
77 changes: 77 additions & 0 deletions component/otel/config_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package otel

import (
"fmt"
"time"

"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Defaults for Exporter settings. These types don't implement gohcl.Decoder
// and so defaults must be applied by the wrapping type.
var (
DefaultExporterQueueSettings = ExporterQueueSettings{
Enabled: true,
NumConsumers: 10,
// For 5000 queue elements at 100 requests/sec gives about 50 sec of
// survive of destination outage. This is a pretty decent value for
// production. Users should calculate this from the perspective of how many
// seconds to buffer in case of a backend outage, and then multiple that by
// the number of requests per second.
QueueSize: 5000,
}

DefaultExporterRetrySettings = ExporterRetrySettings{
Enabled: true,
InitalInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 5 * time.Minute,
}
)

// ExporterQueueSettings holds common settings for a queue_config block.
type ExporterQueueSettings struct {
Enabled bool `hcl:"enabled,optional"`
NumConsumers int `hcl:"num_consumers,optional"`
QueueSize int `hcl:"queue_size,optional"`
}

// Convert transforms s into the otel QueueSettings type.
func (s *ExporterQueueSettings) Convert() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: s.Enabled,
NumConsumers: s.NumConsumers,
QueueSize: s.QueueSize,
}
}

// Validate retursn an error if s is invalid.
func (s *ExporterQueueSettings) Validate() error {
if !s.Enabled {
return nil
}

if s.QueueSize <= 0 {
return fmt.Errorf("queue_size must be greater than 0, got %d", s.QueueSize)
}

return nil
}

// ExporterRetrySettings holds common settings for a retry_on_failure block.
type ExporterRetrySettings struct {
Enabled bool `hcl:"enabled,optional"`
InitalInterval time.Duration `hcl:"initial_interval,optional"`
MaxInterval time.Duration `hcl:"max_interval,optional"`
MaxElapsedTime time.Duration `hcl:"max_elapsed_time,optional"`
}

// Convert transforms s into the otel RetrySettings type.
func (s *ExporterRetrySettings) Convert() exporterhelper.RetrySettings {
return exporterhelper.RetrySettings{
Enabled: s.Enabled,
InitialInterval: s.InitalInterval,
MaxInterval: s.MaxInterval,
MaxElapsedTime: s.MaxElapsedTime,
}
}
100 changes: 100 additions & 0 deletions component/otel/consumer_exports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package otel

import (
"context"
"sync"

"github.com/grafana/agent/component"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
)

func init() {
component.RegisterGoStruct("Consumer", Consumer{})
}

// CombinedConsumer is a combined consumer of all telemetry types.
type CombinedConsumer interface {
otelconsumer.Metrics
otelconsumer.Logs
otelconsumer.Traces
}

// Consumer is the registered Go struct used by components to pass around
// consumers.
type Consumer struct{ CombinedConsumer }

// ConsumerExports is a common Exports type for components which are processors
// or exporters.
type ConsumerExports struct {
// Input is a collection of consumers that other components can use to send
// telemetry data.
Input *Consumer `hcl:"input,attr"`
}

// lazyCombinedConsumer is used by FlowExporter and FlowProcessor to expose
// their consumers as fast as possible even before components are constructed.
// Calls to process telemetry data will block while there is no active
// consumer.
type lazyCombinedConsumer struct {
mut sync.RWMutex
metricsConsumer otelconsumer.Metrics
logsConsumer otelconsumer.Logs
tracesConsumer otelconsumer.Traces
}

var _ CombinedConsumer = (*lazyCombinedConsumer)(nil)

func newLazyCombinedConsumer() *lazyCombinedConsumer {
return &lazyCombinedConsumer{}
}

func (c *lazyCombinedConsumer) Capabilities() otelconsumer.Capabilities {
// TODO(rfratto): this is probably fairly inefficient over the upstream
// collector and needs to be improved. As long as lazyCombinedConsumer is
// always used in a context where MutatesData is constant (i.e., because the
// consumers created by a component always have the same value for
// MutatesData, we can pass the value here).
return otelconsumer.Capabilities{
MutatesData: true,
}
}

func (c *lazyCombinedConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
c.mut.RLock()
defer c.mut.RUnlock()
if c.metricsConsumer == nil {
// TODO(rfratto): should this log?
return nil
}
return c.metricsConsumer.ConsumeMetrics(ctx, md)
}

func (c *lazyCombinedConsumer) ConsumeLogs(ctx context.Context, md pdata.Logs) error {
c.mut.RLock()
defer c.mut.RUnlock()
if c.logsConsumer == nil {
// TODO(rfratto): should this log?
return nil
}
return c.logsConsumer.ConsumeLogs(ctx, md)
}

func (c *lazyCombinedConsumer) ConsumeTraces(ctx context.Context, md pdata.Traces) error {
c.mut.RLock()
defer c.mut.RUnlock()
if c.tracesConsumer == nil {
// TODO(rfratto): should this log?
return nil
}
return c.tracesConsumer.ConsumeTraces(ctx, md)
}

func (c *lazyCombinedConsumer) SetConsumers(m otelconsumer.Metrics, l otelconsumer.Logs, t otelconsumer.Traces) {
c.mut.Lock()
defer c.mut.Unlock()

c.metricsConsumer = m
c.logsConsumer = l
c.tracesConsumer = t
}
Loading