Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fluent-bit: Fix fluent-bit exit callback when buffering is enabled #2365

Merged
merged 1 commit into from
Jul 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
quit chan struct{}
once sync.Once
wg sync.WaitGroup
}

// New makes a new dque loki client
Expand All @@ -53,7 +51,6 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {

q := &dqueClient{
logger: log.With(logger, "component", "queue", "name", cfg.bufferConfig.dqueConfig.queueName),
quit: make(chan struct{}),
}

err = os.MkdirAll(cfg.bufferConfig.dqueConfig.queueDir, 0644)
Expand All @@ -75,31 +72,22 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {
return nil, err
}

q.wg.Add(1)
go q.dequeuer()
return q, nil
}

func (c *dqueClient) dequeuer() {
defer func() {
if err := c.queue.Close(); err != nil {
level.Error(c.logger).Log("msg", "error closing queue", "err", err)
}
c.wg.Done()
}()

for {
select {
case <-c.quit:
return
default:
}

// Dequeue the next item in the queue
entry, err := c.queue.DequeueBlock()
if err != nil {
level.Error(c.logger).Log("msg", "error dequeuing record", "error", err)
continue
switch err {
case dque.ErrQueueClosed:
return
default:
level.Error(c.logger).Log("msg", "error dequeuing record", "error", err)
continue
}
}

// Assert type of the response to an Item pointer so we can work with it
Expand All @@ -117,9 +105,8 @@ func (c *dqueClient) dequeuer() {

// Stop the client
func (c *dqueClient) Stop() {
c.once.Do(func() { close(c.quit) })
c.once.Do(func() { c.queue.Close() })
c.loki.Stop()
c.wg.Wait()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
Expand Down