diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index eb20434dec..9e9cc5fdf2 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..643730000f 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -214,9 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 22a2b56d3a..a4b9e15dcc 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -242,9 +242,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 1e494e71c1..7eca6e7349 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 2c731616a5..fbf70e22c0 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,9 +37,7 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host - - // newComponentsCh is written to when schedComponents gets updated. - newComponentsCh chan struct{} + running bool // onPause is called when scheduler is making changes to running components. onPause func() @@ -51,89 +49,102 @@ type Scheduler struct { // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: func() {}, - onResume: func() {}, + log: l, + onPause: func() {}, + onResume: func() {}, } } -// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to -// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it -// will call onResume as a last step. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: onPause, - onResume: onResume, + log: l, + onPause: onPause, + onResume: onResume, } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. +// +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. -func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) { +// updateConsumers is called after the components are paused and before starting the new components. +// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component. +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. + if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() + return + } + + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. + cs.onPause() + + // 2. Stop the old components + cs.stopComponents(ctx, cs.schedComponents...) + + // 3. Change the consumers + // This can only be done after stopping the pervious components and before starting the new ones. + updateConsumers() + + // 4. Start the new components + level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? - select { - case cs.newComponentsCh <- struct{}{}: - // Queued new message. - default: - // A message is already queued for refreshing running components so we - // don't have to do anything here. - } + // 5. Start the consumer + // The new components will now start accepting telemetry data. + cs.onResume() } -// Run starts the Scheduler. Run will watch for schedule components to appear -// and run them, terminating previously running components if they exist. +// Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { - firstRun := true - var components []otelcomponent.Component + cs.schedMut.Lock() + cs.running = true + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + + cs.schedMut.Unlock() // Make sure we terminate all of our running components on shutdown. defer func() { - if !firstRun { // always handle the callbacks correctly - cs.onPause() - } - cs.stopComponents(context.Background(), components...) + cs.schedMut.Lock() + defer cs.schedMut.Unlock() + cs.stopComponents(context.Background(), cs.schedComponents...) + // this Resume call should not be needed but is added for robustness to ensure that + // it does not ever exit in "paused" state. cs.onResume() }() - // Wait for a write to cs.newComponentsCh. The initial list of components is - // always empty so there's nothing to do until cs.newComponentsCh is written - // to. - for { - select { - case <-ctx.Done(): - return nil - case <-cs.newComponentsCh: - if !firstRun { - cs.onPause() // do not pause on first run - } else { - firstRun = false - } - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, components...) - - cs.schedMut.Lock() - components = cs.schedComponents - host := cs.host - cs.schedMut.Unlock() - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) - components = cs.startComponents(ctx, host, components...) - cs.onResume() - } - } + <-ctx.Done() + return nil } func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 469d679b7f..c034e262f8 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" @@ -32,7 +33,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -52,12 +53,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -81,26 +82,32 @@ func TestScheduler(t *testing.T) { require.NoError(t, err) }() + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + // The Run function starts the components. They should be paused and then resumed. + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) - - toInt := func(a *atomic.Int32) int { return int(a.Load()) } + cs.Schedule(ctx, func() {}, h, component) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") - assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") + assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(ctx, func() {}, h) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") @@ -109,8 +116,8 @@ func TestScheduler(t *testing.T) { cancel() require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") + assert.Equal(t, 4, toInt(resumeCalls), "resume callback should be called on shutdown") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") }) @@ -133,7 +140,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..08a7a6181c 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -237,9 +237,13 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) + return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 80b82efb06..ff4c3c21d1 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil }