Skip to content

Commit

Permalink
[chore] [exporterheper] Fix not-started queue sender shutdown (#8995)
Browse files Browse the repository at this point in the history
Do not panic on the shutdown of a not-started queue sender
  • Loading branch information
dmitryax authored Nov 27, 2023
1 parent 0ae738f commit 575c5f5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
7 changes: 3 additions & 4 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type queueSender struct {
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
numConsumers int
consumers *internal.QueueConsumers[Request]
stopWG sync.WaitGroup
requeuingEnabled bool
Expand All @@ -101,18 +100,19 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
} else {
queue = internal.NewBoundedMemoryQueue[Request](config.QueueSize)
}
return &queueSender{
qs := &queueSender{
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
numConsumers: config.NumConsumers,
stopWG: sync.WaitGroup{},
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
}

// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
Expand Down Expand Up @@ -149,7 +149,6 @@ func (qs *queueSender) consume(ctx context.Context, req Request) {

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume)
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
}, time.Second, 1*time.Millisecond)
}

func TestQueueSenderNoStartShutdown(t *testing.T) {
qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil)
assert.NoError(t, qs.Shutdown(context.Background()))
}

type mockHost struct {
component.Host
ext map[component.ID]component.Component
Expand Down

0 comments on commit 575c5f5

Please sign in to comment.