From b7bf45be05b6d746792d089f6dc56c65d0592c7d Mon Sep 17 00:00:00 2001 From: William Dumont Date: Wed, 11 Dec 2024 17:44:59 +0100 Subject: [PATCH 1/5] make otel scheduler sync --- internal/component/otelcol/auth/auth.go | 2 +- .../component/otelcol/connector/connector.go | 6 +- .../component/otelcol/exporter/exporter.go | 6 +- .../component/otelcol/extension/extension.go | 2 +- .../otelcol/internal/scheduler/scheduler.go | 83 +++---------------- .../internal/scheduler/scheduler_test.go | 63 +------------- .../component/otelcol/processor/processor.go | 7 +- .../component/otelcol/receiver/receiver.go | 2 +- 8 files changed, 32 insertions(+), 139 deletions(-) diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index eb20434dec..e333e61122 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(host, components...) + a.sched.Schedule(a.ctx, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..a0eda435f0 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), + sched: scheduler.New(opts.Logger), collector: collector, } if err := p.Update(args); err != nil { @@ -215,8 +215,10 @@ func (p *Connector) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) + p.consumer.Pause() p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + p.sched.Schedule(p.ctx, host, components...) + p.consumer.Resume() return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 22a2b56d3a..b63d0c3487 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), + sched: scheduler.New(opts.Logger), collector: collector, supportedSignals: supportedSignals, @@ -243,8 +243,10 @@ func (e *Exporter) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.consumer.Pause() e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + e.sched.Schedule(e.ctx, host, components...) + e.consumer.Resume() return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 1e494e71c1..457e0a9b17 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.sched.Schedule(e.ctx, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 2c731616a5..4649bda527 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,37 +37,13 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host - - // newComponentsCh is written to when schedComponents gets updated. - newComponentsCh chan struct{} - - // onPause is called when scheduler is making changes to running components. - onPause func() - // onResume is called when scheduler is done making changes to running components. - onResume func() } // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: func() {}, - onResume: func() {}, - } -} - -// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to -// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it -// will call onResume as a last step. -func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { - return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: onPause, - onResume: onResume, + log: l, } } @@ -77,63 +53,28 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched // Schedule completely overrides the set of previously running components; // components which have been removed since the last call to Schedule will be // stopped. -func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) { +func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc - cs.host = h + // Stop the old components before running new scheduled ones. + cs.stopComponents(ctx, cs.schedComponents...) - select { - case cs.newComponentsCh <- struct{}{}: - // Queued new message. - default: - // A message is already queued for refreshing running components so we - // don't have to do anything here. - } + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) } -// Run starts the Scheduler. Run will watch for schedule components to appear -// and run them, terminating previously running components if they exist. +// Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { - firstRun := true - var components []otelcomponent.Component - // Make sure we terminate all of our running components on shutdown. defer func() { - if !firstRun { // always handle the callbacks correctly - cs.onPause() - } - cs.stopComponents(context.Background(), components...) - cs.onResume() + cs.schedMut.Lock() + defer cs.schedMut.Unlock() + cs.stopComponents(context.Background(), cs.schedComponents...) }() - // Wait for a write to cs.newComponentsCh. The initial list of components is - // always empty so there's nothing to do until cs.newComponentsCh is written - // to. - for { - select { - case <-ctx.Done(): - return nil - case <-cs.newComponentsCh: - if !firstRun { - cs.onPause() // do not pause on first run - } else { - firstRun = false - } - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, components...) - - cs.schedMut.Lock() - components = cs.schedComponents - host := cs.host - cs.schedMut.Unlock() - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) - components = cs.startComponents(ctx, host, components...) - cs.onResume() - } - } + <-ctx.Done() + return nil } func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 469d679b7f..5a9a59bd78 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,10 +5,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" @@ -32,7 +30,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -52,68 +50,15 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(context.Background(), h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) - t.Run("Pause callbacks are called", func(t *testing.T) { - var ( - pauseCalls = &atomic.Int32{} - resumeCalls = &atomic.Int32{} - l = util.TestLogger(t) - cs = scheduler.NewWithPauseCallbacks( - l, - func() { pauseCalls.Inc() }, - func() { resumeCalls.Inc() }, - ) - h = scheduler.NewHost(l) - ) - ctx, cancel := context.WithCancel(context.Background()) - - // Run our scheduler in the background. - go func() { - err := cs.Run(ctx) - require.NoError(t, err) - }() - - // Schedule our component, which should notify the started and stopped - // trigger once it starts and stops respectively. - component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) - - toInt := func(a *atomic.Int32) int { return int(a.Load()) } - - require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") - assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") - }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") - - // Wait for the component to start, and then unschedule all components, which - // should cause our running component to terminate. - require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") - }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") - - require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") - - // Stop the scheduler - cancel() - - require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") - }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") - }) - t.Run("Running components get stopped on shutdown", func(t *testing.T) { var ( l = util.TestLogger(t) @@ -133,7 +78,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(ctx, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..312369f079 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), + sched: scheduler.New(opts.Logger), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), @@ -238,8 +238,11 @@ func (p *Processor) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) + p.consumer.Pause() p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(p.ctx, host, components...) + p.consumer.Resume() + return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 80b82efb06..673552d271 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(host, components...) + r.sched.Schedule(r.ctx, host, components...) return nil } From b133971f49f5870082a68cb8e7556da7ad6ca0c0 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 18 Dec 2024 16:15:17 +0200 Subject: [PATCH 2/5] Don't start components until Run is called (#2296) * Don't start components until Run is called * Update consumers after stopping the component * Minor fixes --- internal/component/otelcol/auth/auth.go | 2 +- .../component/otelcol/connector/connector.go | 11 +-- .../component/otelcol/exporter/exporter.go | 11 +-- .../component/otelcol/extension/extension.go | 2 +- .../otelcol/internal/scheduler/scheduler.go | 82 +++++++++++++++++-- .../internal/scheduler/scheduler_test.go | 8 +- .../component/otelcol/processor/processor.go | 11 +-- .../component/otelcol/receiver/receiver.go | 2 +- 8 files changed, 98 insertions(+), 31 deletions(-) diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index e333e61122..9e9cc5fdf2 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(a.ctx, host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index a0eda435f0..643730000f 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, } if err := p.Update(args); err != nil { @@ -214,11 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.consumer.Pause() - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) - p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index b63d0c3487..a4b9e15dcc 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, supportedSignals: supportedSignals, @@ -242,11 +242,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.consumer.Pause() - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) - e.sched.Schedule(e.ctx, host, components...) - e.consumer.Resume() + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 457e0a9b17..7eca6e7349 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(e.ctx, host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 4649bda527..b519db4b32 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,35 +37,99 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host + running bool + + // onPause is called when scheduler is making changes to running components. + onPause func() + // onResume is called when scheduler is done making changes to running components. + onResume func() } // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, + log: l, + onPause: func() {}, + onResume: func() {}, } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. +func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { + return &Scheduler{ + log: l, + onPause: onPause, + onResume: onResume, + } +} + +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. -func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) { +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - // Stop the old components before running new scheduled ones. + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. + if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() + return + } + + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. + cs.onPause() + + // 2. Stop the old components cs.stopComponents(ctx, cs.schedComponents...) - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc)) + // 3. Change the consumers + // This is can only be done after stopping the pervious components and before starting the new ones. + updateConsumers() + + // 4. Start the new components + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) cs.schedComponents = cs.startComponents(ctx, h, cc...) + cs.host = h + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? + + // 5. Start the consumer + // The new components will now start accepting telemetry data. + cs.onResume() } // Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { + cs.schedMut.Lock() + cs.running = true + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + + cs.schedMut.Unlock() + // Make sure we terminate all of our running components on shutdown. defer func() { cs.schedMut.Lock() diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 5a9a59bd78..477809c960 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -30,7 +30,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(context.Background(), h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -50,12 +50,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(context.Background(), h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(context.Background(), h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -78,7 +78,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(ctx, h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 312369f079..08a7a6181c 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), @@ -237,11 +237,12 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.consumer.Pause() - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) - p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 673552d271..ff4c3c21d1 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(r.ctx, host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil } From f1da2d77d1a8aefb0a2d5e8c2203244ef405b873 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 Jan 2025 14:20:02 +0100 Subject: [PATCH 3/5] Update internal/component/otelcol/internal/scheduler/scheduler.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- internal/component/otelcol/internal/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index b519db4b32..079f8e0ab8 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -105,7 +105,7 @@ func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h ote cs.stopComponents(ctx, cs.schedComponents...) // 3. Change the consumers - // This is can only be done after stopping the pervious components and before starting the new ones. + // This can only be done after stopping the pervious components and before starting the new ones. updateConsumers() // 4. Start the new components From f5312dda2ddcf24f185b58390ecc74e66e724f64 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 Jan 2025 15:01:11 +0100 Subject: [PATCH 4/5] test and docs --- .../otelcol/internal/scheduler/scheduler.go | 5 +- .../internal/scheduler/scheduler_test.go | 62 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 079f8e0ab8..11de9236a9 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -74,6 +74,9 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched // // Schedule() completely overrides the set of previously running components. // Components which have been removed since the last call to Schedule will be stopped. +// +// updateConsumers is called after the components are paused and before starting the new components. +// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component. func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() @@ -109,7 +112,7 @@ func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h ote updateConsumers() // 4. Start the new components - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) + level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents)) cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 477809c960..a7c50a8d24 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "go.uber.org/atomic" + + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" @@ -59,6 +62,65 @@ func TestScheduler(t *testing.T) { require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) + t.Run("Pause callbacks are called", func(t *testing.T) { + var ( + pauseCalls = &atomic.Int32{} + resumeCalls = &atomic.Int32{} + l = util.TestLogger(t) + cs = scheduler.NewWithPauseCallbacks( + l, + func() { pauseCalls.Inc() }, + func() { resumeCalls.Inc() }, + ) + h = scheduler.NewHost(l) + ) + ctx, cancel := context.WithCancel(context.Background()) + + // Run our scheduler in the background. + go func() { + err := cs.Run(ctx) + require.NoError(t, err) + }() + + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + // The Run function starts the components. They should be paused and then resumed. + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + // Schedule our component, which should notify the started and stopped + // trigger once it starts and stops respectively. + component, started, stopped := newTriggerComponent() + cs.Schedule(ctx, func() {}, h, component) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + // Wait for the component to start, and then unschedule all components, which + // should cause our running component to terminate. + require.NoError(t, started.Wait(5*time.Second), "component did not start") + cs.Schedule(ctx, func() {}, h) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") + + // Stop the scheduler + cancel() + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should not be called on shutdown") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + }) + t.Run("Running components get stopped on shutdown", func(t *testing.T) { var ( l = util.TestLogger(t) From aaba017d6838a7d7426f2817e3d09b379af0458e Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 Jan 2025 16:13:25 +0100 Subject: [PATCH 5/5] add resume call on exist for robustness --- internal/component/otelcol/internal/scheduler/scheduler.go | 3 +++ .../component/otelcol/internal/scheduler/scheduler_test.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 11de9236a9..fbf70e22c0 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -138,6 +138,9 @@ func (cs *Scheduler) Run(ctx context.Context) error { cs.schedMut.Lock() defer cs.schedMut.Unlock() cs.stopComponents(context.Background(), cs.schedComponents...) + // this Resume call should not be needed but is added for robustness to ensure that + // it does not ever exit in "paused" state. + cs.onResume() }() <-ctx.Done() diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index a7c50a8d24..c034e262f8 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -117,7 +117,7 @@ func TestScheduler(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should not be called on shutdown") + assert.Equal(t, 4, toInt(resumeCalls), "resume callback should be called on shutdown") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") })