fluent-bit: Fix fluent-bit exit callback when buffering is enabled #2365
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What this PR does / why we need it:
Fluent Bit loki output plugin cannot handle exit callback properly while dque buffering (#2142) is in use. Fluent Bit will remain in the blocking state forever and never stops normally because of loki output plugin.
Which issue(s) this PR fixes:
When Fluent Bit will stop using the instance of the output plugin (on shutdown), it will trigger the exit callback FLBPluginExit and this in turn will call Stop() func with shutdown procedure:
loki/cmd/fluent-bit/dque.go
Lines 119 to 122 in b27f92e
Dequeuer goroutine listen for a stop signal (non-blocking) in endless loop where next func DequeueBlock() is blocking call until new log is available:
loki/cmd/fluent-bit/dque.go
Lines 91 to 99 in b27f92e
In case where Dequeuer waits for a next log and Fluent Bit triggers the exit callback at the same time, the next log never arrives to unblock Dequeuer to be able to check quit channel in the next loop iteration. Fluent-bit will wait forever for exit callback to return but this will never happen because Dequeuer is blocked.
This PR fixes unwanted Dequeuer gorutine stop behavior by replacing quit channel and synchronization with Close() which is part of dque package. Close() func unblocks waiting Dequeuer goroutine and return ErrQueueClosed in DequeueBlock().