diff --git a/cmd/docker-driver/config.go b/cmd/docker-driver/config.go index 9676d5defe284..f34a281689432 100644 --- a/cmd/docker-driver/config.go +++ b/cmd/docker-driver/config.go @@ -74,7 +74,9 @@ var ( MaxBackoff: client.MaxBackoff, MaxRetries: client.MaxRetries, }, - Timeout: client.Timeout, + // Avoid blocking the docker-driver on the worst case + // https://github.com/grafana/loki/pull/2898#issuecomment-730218963 + Timeout: 5 * time.Second, } ) diff --git a/cmd/docker-driver/loki.go b/cmd/docker-driver/loki.go index 11a5a7b8c618e..6496bf380e3f9 100644 --- a/cmd/docker-driver/loki.go +++ b/cmd/docker-driver/loki.go @@ -70,6 +70,6 @@ func (l *loki) Name() string { // Log implements `logger.Logger` func (l *loki) Close() error { - l.client.Stop() + l.client.StopNow() return nil } diff --git a/cmd/docker-driver/main.go b/cmd/docker-driver/main.go index 6dd8d1f87b4fc..3f76ed7fc6876 100644 --- a/cmd/docker-driver/main.go +++ b/cmd/docker-driver/main.go @@ -30,12 +30,14 @@ func main() { } logger := newLogger(logLevel) level.Info(util.Logger).Log("msg", "Starting docker-plugin", "version", version.Info()) + h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`) + handlers(&h, newDriver(logger)) + if err := h.ServeUnix(socketAddress, 0); err != nil { panic(err) } - } func newLogger(lvl logging.Level) log.Logger { diff --git a/cmd/fluent-bit/dque.go b/cmd/fluent-bit/dque.go index 66d2cb55ed9b5..8636eb2ef4e53 100644 --- a/cmd/fluent-bit/dque.go +++ b/cmd/fluent-bit/dque.go @@ -109,6 +109,12 @@ func (c *dqueClient) Stop() { c.loki.Stop() } +// Stop the client +func (c *dqueClient) StopNow() { + c.once.Do(func() { c.queue.Close() }) + c.loki.StopNow() +} + // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error { if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil { diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index d687ee12df3f8..3c3f437a75b13 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -33,6 +33,8 @@ func (r *recorder) toEntry() *entry { return r.entry } func (r *recorder) Stop() {} +func (r *recorder) StopNow() {} + var now = time.Now() func Test_loki_sendRecord(t *testing.T) { diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index a9de17e2e2816..fdadb6a69e732 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -112,14 +112,20 @@ type Client interface { api.EntryHandler // Stop goroutine sending batch of entries. Stop() + + // Stop goroutine sending batch of entries without retries. + StopNow() } // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { - logger log.Logger - cfg Config - client *http.Client - quit chan struct{} + logger log.Logger + cfg Config + client *http.Client + + // quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead. + quit chan struct{} + once sync.Once entries chan entry wg sync.WaitGroup @@ -256,9 +262,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) { backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig) var status int - for backoff.Ongoing() { + for { start := time.Now() - status, err = c.send(c.ctx, tenantID, buf) + // send uses `timeout` internally, so `context.Background` is good enough. + status, err = c.send(context.Background(), tenantID, buf) + requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) if err == nil { @@ -295,6 +303,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) { level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() backoff.Wait() + + // Make sure it sends at least once before checking for retry. + if !backoff.Ongoing() { + break + } } if err != nil { @@ -356,13 +369,17 @@ func (c *client) getTenantID(labels model.LabelSet) string { // Stop the client. func (c *client) Stop() { - // cancel any upstream calls made using client's `ctx`. - c.cancel() - c.once.Do(func() { close(c.quit) }) c.wg.Wait() } +// StopNow stops the client without retries +func (c *client) StopNow() { + // cancel any upstream calls made using client's `ctx`. + c.cancel() + c.Stop() +} + // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { if len(c.externalLabels) > 0 { diff --git a/pkg/promtail/client/fake/client.go b/pkg/promtail/client/fake/client.go index b104388912c27..ab7bd1686c00a 100644 --- a/pkg/promtail/client/fake/client.go +++ b/pkg/promtail/client/fake/client.go @@ -19,6 +19,11 @@ func (c *Client) Stop() { c.OnStop() } +// StopNow implements client.Client +func (c *Client) StopNow() { + c.OnStop() +} + // Handle implements client.Client func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error { return c.OnHandleEntry.Handle(labels, time, entry) diff --git a/pkg/promtail/client/logger.go b/pkg/promtail/client/logger.go index b2ec8cac62061..dc164fa1141b6 100644 --- a/pkg/promtail/client/logger.go +++ b/pkg/promtail/client/logger.go @@ -58,6 +58,8 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) func (*logger) Stop() {} +func (*logger) StopNow() {} + func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error { l.Lock() defer l.Unlock() diff --git a/pkg/promtail/client/multi.go b/pkg/promtail/client/multi.go index a5f2c63e47b07..0e8720ca5da3f 100644 --- a/pkg/promtail/client/multi.go +++ b/pkg/promtail/client/multi.go @@ -55,3 +55,10 @@ func (m MultiClient) Stop() { c.Stop() } } + +// StopNow implements Client +func (m MultiClient) StopNow() { + for _, c := range m { + c.StopNow() + } +}