Skip to content

Commit

Permalink
component/otelcol: set up groundwork for OpenTelemetry Collector comp…
Browse files Browse the repository at this point in the history
…onents (#2224)

* component/otelcol: initial commit

This commit introduces the component/otelcol package, which will
eventually hold a collection of `otelcol.*` components.

Initial prototyping work for Flow OpenTelemetry Collector components was
done in #1843, which demonstrated that the full set of code needed to
implement OpenTelemetry Components is quite large. I will be splitting
up the needed code across a few changes; this is the first.

This initial commit starts setting up the framework for running
OpenTelemetry Collector components inside of Flow with a
`componentScheduler` struct.

Related to #2213.

* move scheduler to an internal/scheduler package.

The otelcol component scheduler code will need to be exposed to multiple
packages, but it doesn't make sense to make it part of the public API.

* fix linting errors
  • Loading branch information
rfratto authored Sep 27, 2022
1 parent c3768a5 commit 5c6b4a8
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 0 deletions.
49 changes: 49 additions & 0 deletions component/otelcol/internal/scheduler/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package scheduler

import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
)

// Host implements otelcomponent.Host for Grafana Agent Flow.
type Host struct {
log log.Logger

// TODO(rfratto): allow the below fields below to be used. For now they're
// always nil.

extensions map[otelconfig.ComponentID]otelcomponent.Extension
exporters map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
}

// NewHost creates a new Host.
func NewHost(l log.Logger) *Host {
return &Host{log: l}
}

var _ otelcomponent.Host = (*Host)(nil)

// ReportFatalError implements otelcomponent.Host.
func (h *Host) ReportFatalError(err error) {
level.Error(h.log).Log("msg", "fatal error running component", "err", err)
}

// GetFactory implements otelcomponent.Host.
func (h *Host) GetFactory(kind otelcomponent.Kind, componentType otelconfig.Type) otelcomponent.Factory {
// GetFactory is used for components to create other components. It's not
// clear if we want to allow this right now, so it's disabled.
return nil
}

// GetExtensions implements otelcomponent.Host.
func (h *Host) GetExtensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return h.extensions
}

// GetExporters implements otelcomponent.Host.
func (h *Host) GetExporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return h.exporters
}
150 changes: 150 additions & 0 deletions component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Package scheduler exposes utilities for scheduling and running OpenTelemetry
// Collector components.
package scheduler

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
otelcomponent "go.opentelemetry.io/collector/component"
"go.uber.org/multierr"
)

// Scheduler implements manages a set of OpenTelemetry Collector components.
// Scheduler is intended to be used from Flow components which need to schedule
// OpenTelemetry Collector components; it does not implement the full
// component.Component interface.
//
// Each OpenTelemetry Collector component has one instance per supported
// telemetry signal, hence supporting multiple OpenTelemetry Collector
// components inside the scheduler. Scheduler should only be used to manage
// multiple instances of the same OpenTelemetry Collector component.
type Scheduler struct {
log log.Logger

healthMut sync.RWMutex
health component.Health

schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host

// newComponentsCh is written to when schedComponents gets updated.
newComponentsCh chan struct{}
}

// New creates a new unstarted Scheduler. Call Run to start it, and call
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
}
}

// Schedule schedules a new set of OpenTelemetry Components to run. Components
// will only be scheduled when the Scheduler is running.
//
// Schedule completely overrides the set of previously running components;
// components which have been removed since the last call to Schedule will be
// stopped.
func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

cs.schedComponents = cc
cs.host = h

select {
case cs.newComponentsCh <- struct{}{}:
// Queued new message.
default:
// A message is already queued for refreshing running components so we
// don't have to do anything here.
}
}

// Run starts the Schduler. Run will watch for schedule components to appear
// and run them, terminating previously running components if they exist.
func (cs *Scheduler) Run(ctx context.Context) error {
var components []otelcomponent.Component

// Make sure we terminate all of our running components on shutdown.
defer func() {
cs.stopComponents(context.Background(), components...)
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
// always empty so there's nothing to do until cs.newComponentsCh is written
// to.
for {
select {
case <-ctx.Done():
return nil
case <-cs.newComponentsCh:
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, components...)

cs.schedMut.Lock()
components = cs.schedComponents
host := cs.host
cs.schedMut.Unlock()

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
cs.startComponents(ctx, host, components...)
}
}
}

func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
for _, c := range cc {
if err := c.Shutdown(ctx); err != nil {
level.Error(cs.log).Log("msg", "failed to stop scheduled component; future updates may fail", "err", err)
}
}
}

func (cs *Scheduler) startComponents(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) {
var errs error

for _, c := range cc {
if err := c.Start(ctx, h); err != nil {
level.Error(cs.log).Log("msg", "failed to start scheduled component", "err", err)
errs = multierr.Append(errs, err)
}
}

if errs != nil {
cs.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("failed to create components: %s", errs),
UpdateTime: time.Now(),
})
} else {
cs.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "started scheduled components",
UpdateTime: time.Now(),
})
}
}

// CurrentHealth implements component.HealthComponent. The component is
// reported as healthy when the most recent set of scheduled components were
// started successfully.
func (cs *Scheduler) CurrentHealth() component.Health {
cs.healthMut.RLock()
defer cs.healthMut.RUnlock()
return cs.health
}

func (cs *Scheduler) setHealth(h component.Health) {
cs.healthMut.Lock()
defer cs.healthMut.Unlock()
cs.health = h
}
127 changes: 127 additions & 0 deletions component/otelcol/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package scheduler_test

import (
"context"
"testing"
"time"

"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/util"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
)

func TestScheduler(t *testing.T) {
t.Run("Scheduled components get started", func(t *testing.T) {
var (
l = util.TestLogger(t)
cs = scheduler.New(l)
h = scheduler.NewHost(l)
)

// Run our scheduler in the background.
go func() {
err := cs.Run(componenttest.TestContext(t))
require.NoError(t, err)
}()

// Schedule our component, which should notify the started trigger once it is
// running.
component, started, _ := newTriggerComponent()
cs.Schedule(h, component)
require.NoError(t, started.Wait(5*time.Second), "component did not start")
})

t.Run("Unscheduled components get stopped", func(t *testing.T) {
var (
l = util.TestLogger(t)
cs = scheduler.New(l)
h = scheduler.NewHost(l)
)

// Run our scheduler in the background.
go func() {
err := cs.Run(componenttest.TestContext(t))
require.NoError(t, err)
}()

// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

t.Run("Running components get stopped on shutdown", func(t *testing.T) {
var (
l = util.TestLogger(t)
cs = scheduler.New(l)
h = scheduler.NewHost(l)
)

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Run our scheduler in the background.
go func() {
err := cs.Run(ctx)
require.NoError(t, err)
}()

// Schedule our component which will notify our trigger when Shutdown gets
// called.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

// Wait for the component to start, and then stop our scheduler, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cancel()
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})
}

func newTriggerComponent() (component otelcomponent.Component, started, stopped *util.WaitTrigger) {
started = util.NewWaitTrigger()
stopped = util.NewWaitTrigger()

component = &fakeComponent{
StartFunc: func(_ context.Context, _ otelcomponent.Host) error {
started.Trigger()
return nil
},
ShutdownFunc: func(_ context.Context) error {
stopped.Trigger()
return nil
},
}

return
}

type fakeComponent struct {
StartFunc func(ctx context.Context, host otelcomponent.Host) error
ShutdownFunc func(ctx context.Context) error
}

var _ otelcomponent.Component = (*fakeComponent)(nil)

func (fc *fakeComponent) Start(ctx context.Context, host otelcomponent.Host) error {
if fc.StartFunc != nil {
fc.StartFunc(ctx, host)
}
return nil
}

func (fc *fakeComponent) Shutdown(ctx context.Context) error {
if fc.ShutdownFunc != nil {
return fc.ShutdownFunc(ctx)
}
return nil
}

0 comments on commit 5c6b4a8

Please sign in to comment.