From c980f54c9ab5fe3fe273f9d2b3e5b889cc9faa44 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 23 Feb 2021 14:25:04 -0500 Subject: [PATCH] Fix Shutdown behavior for batchprocessor I added a Shutdown() test that does basic verification of the behavior of the Shutdown() function. More verifications can be added later. The test revealed a bug in batchprocessor Shutdown() function which would not wait until all pending data was drained. --- component/componenttest/shutdown_verifier.go | 79 +++++++++++++++++++ processor/batchprocessor/batch_processor.go | 11 ++- .../batchprocessor/batch_processor_test.go | 5 ++ 3 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 component/componenttest/shutdown_verifier.go diff --git a/component/componenttest/shutdown_verifier.go b/component/componenttest/shutdown_verifier.go new file mode 100644 index 00000000000..b48a06e5979 --- /dev/null +++ b/component/componenttest/shutdown_verifier.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configerror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/consumer/pdata" +) + +func createSingleSpanTrace() pdata.Traces { + d := pdata.NewTraces() + d.ResourceSpans().Resize(1) + d.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + span := d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) + span.SetName("test span") + return d +} + +func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) { + // Create a processor and output its produce to a sink. + nextSink := new(consumertest.TracesSink) + processor, err := factory.CreateTracesProcessor( + context.Background(), + component.ProcessorCreateParams{Logger: zap.NewNop()}, + cfg, + nextSink, + ) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + return + } + require.NoError(t, err) + } + err = processor.Start(context.Background(), NewNopHost()) + assert.NoError(t, err) + + // Send some traces to the processor. + const generatedCount = 10 + for i := 0; i < generatedCount; i++ { + processor.ConsumeTraces(context.Background(), createSingleSpanTrace()) + } + + // Now shutdown the processor. + err = processor.Shutdown(context.Background()) + assert.NoError(t, err) + + // The Shutdown() is done. It means the processor must have sent everything we + // gave it to the next sink. + assert.EqualValues(t, generatedCount, nextSink.SpansCount()) +} + +func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) { + verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg) + // TODO: add metrics and logs verification. + // TODO: add other shutdown verifications. +} diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 66fbdcf53b8..2797270fcce 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -48,7 +48,7 @@ type batchProcessor struct { sendBatchMaxSize uint32 timer *time.Timer - done chan struct{} + doneCh chan struct{} newItem chan interface{} batch batch @@ -87,7 +87,7 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc sendBatchSize: cfg.SendBatchSize, sendBatchMaxSize: cfg.SendBatchMaxSize, timeout: cfg.Timeout, - done: make(chan struct{}, 1), + doneCh: make(chan struct{}, 1), newItem: make(chan interface{}, runtime.NumCPU()), batch: batch, ctx: ctx, @@ -108,7 +108,9 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error { // Shutdown is invoked during service shutdown. func (bp *batchProcessor) Shutdown(context.Context) error { bp.cancel() - <-bp.done + + // Wait until current batch is drained. + <-bp.doneCh return nil } @@ -132,7 +134,8 @@ func (bp *batchProcessor) startProcessingCycle() { // make it cancellable using the context that Shutdown gets as a parameter bp.sendItems(statTimeoutTriggerSend) } - close(bp.done) + // Indicate that we finished draining. + close(bp.doneCh) return case item := <-bp.newItem: if item == nil { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index b21bbe3dff8..9a6b95ec3c7 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord { } return logsReceivedByName } + +func TestShutdown(t *testing.T) { + factory := NewFactory() + componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig()) +}