Skip to content

Commit

Permalink
component/otelcol: initial commit
Browse files Browse the repository at this point in the history
This commit introduces the component/otelcol package, which will
eventually hold a collection of `otelcol.*` components.

Initial prototyping work for Flow OpenTelemetry Collector components was
done in grafana#1843, which demonstrated that the full set of code needed to
implement OpenTelemetry Components is quite large. I will be splitting
up the needed code across a few changes; this is the first.

This initial commit starts setting up the framework for running
OpenTelemetry Collector components inside of Flow with a
`componentScheduler` struct.

Related to grafana#2213.
  • Loading branch information
rfratto committed Sep 27, 2022
1 parent c3768a5 commit 6567cac
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 0 deletions.
41 changes: 41 additions & 0 deletions component/otelcol/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package otelcol

import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
)

// host implements otelcomponent.Host for Grafana Agent Flow.
type host struct {
log log.Logger

extensions map[otelconfig.ComponentID]otelcomponent.Extension
exporters map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
}

func newHost(l log.Logger) *host {
return &host{log: l}
}

var _ otelcomponent.Host = (*host)(nil)

func (h *host) ReportFatalError(err error) {
level.Error(h.log).Log("msg", "fatal error running component", "err", err)
}

func (h *host) GetFactory(kind otelcomponent.Kind, componentType otelconfig.Type) otelcomponent.Factory {
// GetFactory is used for components to create other components. It's not
// clear if we want to allow this right now, so it's disabled.
return nil
}

func (h *host) GetExtensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return h.extensions
}

func (h *host) GetExporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return h.exporters
}
139 changes: 139 additions & 0 deletions component/otelcol/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package otelcol

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
otelcomponent "go.opentelemetry.io/collector/component"
"go.uber.org/multierr"
)

// componentScheduler implements manages a set of OpenTelemetry Collector
// components. componentScheduler is intended to be used from Flow components
// which need to schedule OpenTelemetry Collector components; it does not
// implement the full component.Component interface.
//
// Each OpenTelemetry Collector component has one instance per supported
// telemetry signal, hence supporting multiple OpenTelemetry Collector
// components inside the scheduler. componentScheduler should only be used to
// manage multiple instances of the same OpenTelemetry Collector component.
type componentScheduler struct {
log log.Logger

healthMut sync.RWMutex
health component.Health

schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host

// newComponentsCh is written to when schedComponents gets updated.
newComponentsCh chan struct{}
}

func newComponentScheduler(l log.Logger) *componentScheduler {
return &componentScheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
}
}

func (cs *componentScheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

cs.schedComponents = cc
cs.host = h

select {
case cs.newComponentsCh <- struct{}{}:
// Queued new message.
default:
// A message is already queued for refreshing running components so we
// don't have to do anything here.
}
}

// Run starts the componentScheduler. Run will watch for schedule components to
// appear and run them, terminating previously running components if they
// exist.
func (cs *componentScheduler) Run(ctx context.Context) error {
var components []otelcomponent.Component

// Make sure we terminate all of our running components on shutdown.
defer func() {
cs.stopComponents(context.Background(), components...)
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
// always empty so there's nothing to do until cs.newComponentsCh is written
// to.
for {
select {
case <-ctx.Done():
return nil
case <-cs.newComponentsCh:
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, components...)

cs.schedMut.Lock()
components = cs.schedComponents
host := cs.host
cs.schedMut.Unlock()

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
cs.startComponents(ctx, host, components...)
}
}
}

func (cs *componentScheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
for _, c := range cc {
if err := c.Shutdown(ctx); err != nil {
level.Error(cs.log).Log("msg", "failed to stop scheduled component; future updates may fail", "err", err)
}
}
}

func (cs *componentScheduler) startComponents(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) {
var errs error

for _, c := range cc {
if err := c.Start(ctx, h); err != nil {
level.Error(cs.log).Log("msg", "failed to start scheduled component", "err", err)
errs = multierr.Append(errs, err)
}
}

if errs != nil {
cs.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("failed to create components: %s", errs),
UpdateTime: time.Now(),
})
} else {
cs.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "started scheduled components",
UpdateTime: time.Now(),
})
}
}

// CurrentHealth implements component.HealthComponent.
func (cs *componentScheduler) CurrentHealth() component.Health {
cs.healthMut.RLock()
defer cs.healthMut.RUnlock()
return cs.health
}

func (cs *componentScheduler) setHealth(h component.Health) {
cs.healthMut.Lock()
defer cs.healthMut.Unlock()
cs.health = h
}
126 changes: 126 additions & 0 deletions component/otelcol/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package otelcol

import (
"context"
"testing"
"time"

"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/util"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
)

func Test_scheduler(t *testing.T) {
t.Run("Scheduled components get started", func(t *testing.T) {
var (
l = util.TestLogger(t)
cs = newComponentScheduler(l)
h = newHost(l)
)

// Run our scheduler in the background.
go func() {
err := cs.Run(componenttest.TestContext(t))
require.NoError(t, err)
}()

// Schedule our component, which should notify the started trigger once it is
// running.
component, started, _ := newTriggerComponent()
cs.Schedule(h, component)
require.NoError(t, started.Wait(5*time.Second), "component did not start")
})

t.Run("Unscheduled components get stopped", func(t *testing.T) {
var (
l = util.TestLogger(t)
cs = newComponentScheduler(l)
h = newHost(l)
)

// Run our scheduler in the background.
go func() {
err := cs.Run(componenttest.TestContext(t))
require.NoError(t, err)
}()

// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

t.Run("Running components get stopped on shutdown", func(t *testing.T) {
var (
l = util.TestLogger(t)
cs = newComponentScheduler(l)
h = newHost(l)
)

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Run our scheduler in the background.
go func() {
err := cs.Run(ctx)
require.NoError(t, err)
}()

// Schedule our component which will notify our trigger when Shutdown gets
// called.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

// Wait for the component to start, and then stop our scheduler, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cancel()
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})
}

func newTriggerComponent() (component otelcomponent.Component, started, stopped *util.WaitTrigger) {
started = util.NewWaitTrigger()
stopped = util.NewWaitTrigger()

component = &fakeComponent{
StartFunc: func(_ context.Context, _ otelcomponent.Host) error {
started.Trigger()
return nil
},
ShutdownFunc: func(_ context.Context) error {
stopped.Trigger()
return nil
},
}

return
}

type fakeComponent struct {
StartFunc func(ctx context.Context, host otelcomponent.Host) error
ShutdownFunc func(ctx context.Context) error
}

var _ otelcomponent.Component = (*fakeComponent)(nil)

func (fc *fakeComponent) Start(ctx context.Context, host otelcomponent.Host) error {
if fc.StartFunc != nil {
fc.StartFunc(ctx, host)
}
return nil
}

func (fc *fakeComponent) Shutdown(ctx context.Context) error {
if fc.ShutdownFunc != nil {
return fc.ShutdownFunc(ctx)
}
return nil
}

0 comments on commit 6567cac

Please sign in to comment.