diff --git a/cmd/fluent-bit/dque.go b/cmd/fluent-bit/dque.go index 7d2f546793558..66d2cb55ed9b5 100644 --- a/cmd/fluent-bit/dque.go +++ b/cmd/fluent-bit/dque.go @@ -42,9 +42,7 @@ type dqueClient struct { logger log.Logger queue *dque.DQue loki client.Client - quit chan struct{} once sync.Once - wg sync.WaitGroup } // New makes a new dque loki client @@ -53,7 +51,6 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) { q := &dqueClient{ logger: log.With(logger, "component", "queue", "name", cfg.bufferConfig.dqueConfig.queueName), - quit: make(chan struct{}), } err = os.MkdirAll(cfg.bufferConfig.dqueConfig.queueDir, 0644) @@ -75,31 +72,22 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) { return nil, err } - q.wg.Add(1) go q.dequeuer() return q, nil } func (c *dqueClient) dequeuer() { - defer func() { - if err := c.queue.Close(); err != nil { - level.Error(c.logger).Log("msg", "error closing queue", "err", err) - } - c.wg.Done() - }() - for { - select { - case <-c.quit: - return - default: - } - // Dequeue the next item in the queue entry, err := c.queue.DequeueBlock() if err != nil { - level.Error(c.logger).Log("msg", "error dequeuing record", "error", err) - continue + switch err { + case dque.ErrQueueClosed: + return + default: + level.Error(c.logger).Log("msg", "error dequeuing record", "error", err) + continue + } } // Assert type of the response to an Item pointer so we can work with it @@ -117,9 +105,8 @@ func (c *dqueClient) dequeuer() { // Stop the client func (c *dqueClient) Stop() { - c.once.Do(func() { close(c.quit) }) + c.once.Do(func() { c.queue.Close() }) c.loki.Stop() - c.wg.Wait() } // Handle implement EntryHandler; adds a new line to the next batch; send is async.