diff --git a/Makefile b/Makefile index e7810aa5c1f7..c19d637333aa 100644 --- a/Makefile +++ b/Makefile @@ -383,6 +383,7 @@ helm-clean: ################# # optionally set the tag or the arch suffix (-arm64) +LOKI_DOCKER_DRIVER ?= "grafana/loki-docker-driver" PLUGIN_TAG ?= $(IMAGE_TAG) PLUGIN_ARCH ?= @@ -393,24 +394,24 @@ docker-driver: docker-driver-clean (docker export $$ID | tar -x -C cmd/docker-driver/rootfs) && \ docker rm -vf $$ID docker rmi rootfsimage -f - docker plugin create grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) cmd/docker-driver - docker plugin create grafana/loki-docker-driver:latest$(PLUGIN_ARCH) cmd/docker-driver + docker plugin create $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) cmd/docker-driver + docker plugin create $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH) cmd/docker-driver cmd/docker-driver/docker-driver: $(APP_GO_FILES) CGO_ENABLED=0 go build $(GO_FLAGS) -o $@ ./$(@D) $(NETGO_CHECK) docker-driver-push: docker-driver - docker plugin push grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) - docker plugin push grafana/loki-docker-driver:latest$(PLUGIN_ARCH) + docker plugin push $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) + docker plugin push $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH) docker-driver-enable: - docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) + docker plugin enable $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) docker-driver-clean: - -docker plugin disable grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) - -docker plugin rm grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) - -docker plugin rm grafana/loki-docker-driver:latest$(PLUGIN_ARCH) + -docker plugin disable $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) + -docker plugin rm $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) + -docker plugin rm $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH) rm -rf cmd/docker-driver/rootfs ##################### diff --git a/cmd/docker-driver/loki.go b/cmd/docker-driver/loki.go index 11a5a7b8c618..6496bf380e3f 100644 --- a/cmd/docker-driver/loki.go +++ b/cmd/docker-driver/loki.go @@ -70,6 +70,6 @@ func (l *loki) Name() string { // Log implements `logger.Logger` func (l *loki) Close() error { - l.client.Stop() + l.client.StopNow() return nil } diff --git a/cmd/docker-driver/main.go b/cmd/docker-driver/main.go index 6dd8d1f87b4f..3f76ed7fc687 100644 --- a/cmd/docker-driver/main.go +++ b/cmd/docker-driver/main.go @@ -30,12 +30,14 @@ func main() { } logger := newLogger(logLevel) level.Info(util.Logger).Log("msg", "Starting docker-plugin", "version", version.Info()) + h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`) + handlers(&h, newDriver(logger)) + if err := h.ServeUnix(socketAddress, 0); err != nil { panic(err) } - } func newLogger(lvl logging.Level) log.Logger { diff --git a/cmd/fluent-bit/dque.go b/cmd/fluent-bit/dque.go index 66d2cb55ed9b..8636eb2ef4e5 100644 --- a/cmd/fluent-bit/dque.go +++ b/cmd/fluent-bit/dque.go @@ -109,6 +109,12 @@ func (c *dqueClient) Stop() { c.loki.Stop() } +// Stop the client +func (c *dqueClient) StopNow() { + c.once.Do(func() { c.queue.Close() }) + c.loki.StopNow() +} + // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error { if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil { diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index d687ee12df3f..3c3f437a75b1 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -33,6 +33,8 @@ func (r *recorder) toEntry() *entry { return r.entry } func (r *recorder) Stop() {} +func (r *recorder) StopNow() {} + var now = time.Now() func Test_loki_sendRecord(t *testing.T) { diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index 543268b68fb2..fdadb6a69e73 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -112,19 +112,29 @@ type Client interface { api.EntryHandler // Stop goroutine sending batch of entries. Stop() + + // Stop goroutine sending batch of entries without retries. + StopNow() } // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { - logger log.Logger - cfg Config - client *http.Client - quit chan struct{} + logger log.Logger + cfg Config + client *http.Client + + // quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead. + quit chan struct{} + once sync.Once entries chan entry wg sync.WaitGroup externalLabels model.LabelSet + + // ctx is used in any upstream calls from the `client`. + ctx context.Context + cancel context.CancelFunc } type entry struct { @@ -139,6 +149,8 @@ func New(cfg Config, logger log.Logger) (Client, error) { return nil, errors.New("client needs target URL") } + ctx, cancel := context.WithCancel(context.Background()) + c := &client{ logger: log.With(logger, "component", "client", "host", cfg.URL.Host), cfg: cfg, @@ -146,6 +158,8 @@ func New(cfg Config, logger log.Logger) (Client, error) { entries: make(chan entry), externalLabels: cfg.ExternalLabels.LabelSet, + ctx: ctx, + cancel: cancel, } err := cfg.Client.Validate() @@ -246,12 +260,13 @@ func (c *client) sendBatch(tenantID string, batch *batch) { bufBytes := float64(len(buf)) encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - ctx := context.Background() - backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig) + backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig) var status int - for backoff.Ongoing() { + for { start := time.Now() - status, err = c.send(ctx, tenantID, buf) + // send uses `timeout` internally, so `context.Background` is good enough. + status, err = c.send(context.Background(), tenantID, buf) + requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) if err == nil { @@ -288,6 +303,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) { level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() backoff.Wait() + + // Make sure it sends at least once before checking for retry. + if !backoff.Ongoing() { + break + } } if err != nil { @@ -353,6 +373,13 @@ func (c *client) Stop() { c.wg.Wait() } +// StopNow stops the client without retries +func (c *client) StopNow() { + // cancel any upstream calls made using client's `ctx`. + c.cancel() + c.Stop() +} + // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { if len(c.externalLabels) > 0 { diff --git a/pkg/promtail/client/client_test.go b/pkg/promtail/client/client_test.go index 62e2a42ad4a1..9384997cdfa4 100644 --- a/pkg/promtail/client/client_test.go +++ b/pkg/promtail/client/client_test.go @@ -307,6 +307,147 @@ func TestClient_Handle(t *testing.T) { } } +func TestClient_StopNow(t *testing.T) { + cases := []struct { + name string + clientBatchSize int + clientBatchWait time.Duration + clientMaxRetries int + clientTenantID string + serverResponseStatus int + inputEntries []entry + inputDelay time.Duration + expectedReqs []receivedReq + expectedMetrics string + }{ + { + name: "send requests shouldn't be cancelled after StopNow()", + clientBatchSize: 10, + clientBatchWait: 100 * time.Millisecond, + clientMaxRetries: 3, + serverResponseStatus: 200, + inputEntries: []entry{logEntries[0], logEntries[1], logEntries[2]}, + expectedReqs: []receivedReq{ + { + tenantID: "", + pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}}, + }, + { + tenantID: "", + pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP promtail_sent_entries_total Number of log entries sent to the ingester. + # TYPE promtail_sent_entries_total counter + promtail_sent_entries_total{host="__HOST__"} 3.0 + # HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE promtail_dropped_entries_total counter + promtail_dropped_entries_total{host="__HOST__"} 0 + `, + }, + { + name: "shouldn't retry after StopNow()", + clientBatchSize: 10, + clientBatchWait: 10 * time.Millisecond, + clientMaxRetries: 3, + serverResponseStatus: 429, + inputEntries: []entry{logEntries[0]}, + expectedReqs: []receivedReq{ + { + tenantID: "", + pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}}, + }, + }, + expectedMetrics: ` + # HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries. + # TYPE promtail_dropped_entries_total counter + promtail_dropped_entries_total{host="__HOST__"} 1.0 + # HELP promtail_sent_entries_total Number of log entries sent to the ingester. + # TYPE promtail_sent_entries_total counter + promtail_sent_entries_total{host="__HOST__"} 0 + `, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Reset metrics + sentEntries.Reset() + droppedEntries.Reset() + + // Create a buffer channel where we do enqueue received requests + receivedReqsChan := make(chan receivedReq, 10) + + // Start a local HTTP server + server := httptest.NewServer(createServerHandler(receivedReqsChan, c.serverResponseStatus)) + require.NotNil(t, server) + defer server.Close() + + // Get the URL at which the local test server is listening to + serverURL := flagext.URLValue{} + err := serverURL.Set(server.URL) + require.NoError(t, err) + + // Instance the client + cfg := Config{ + URL: serverURL, + BatchWait: c.clientBatchWait, + BatchSize: c.clientBatchSize, + Client: config.HTTPClientConfig{}, + BackoffConfig: util.BackoffConfig{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: c.clientMaxRetries}, + ExternalLabels: lokiflag.LabelSet{}, + Timeout: 1 * time.Second, + TenantID: c.clientTenantID, + } + + cl, err := New(cfg, log.NewNopLogger()) + require.NoError(t, err) + + // Send all the input log entries + for i, logEntry := range c.inputEntries { + err = cl.Handle(logEntry.labels, logEntry.Timestamp, logEntry.Line) + require.NoError(t, err) + + if c.inputDelay > 0 && i < len(c.inputEntries)-1 { + time.Sleep(c.inputDelay) + } + } + + // Wait until the expected push requests are received (with a timeout) + deadline := time.Now().Add(1 * time.Second) + for len(receivedReqsChan) < len(c.expectedReqs) && time.Now().Before(deadline) { + time.Sleep(5 * time.Millisecond) + } + + // StopNow should have cancelled client's ctx + cc := cl.(*client) + require.NoError(t, cc.ctx.Err()) + + // Stop the client: it waits until the current batch is sent + cl.StopNow() + close(receivedReqsChan) + + require.Error(t, cc.ctx.Err()) // non-nil error if its cancelled. + + // Get all push requests received on the server side + receivedReqs := make([]receivedReq, 0) + for req := range receivedReqsChan { + receivedReqs = append(receivedReqs, req) + } + + // Due to implementation details (maps iteration ordering is random) we just check + // that the expected requests are equal to the received requests, without checking + // the exact order which is not guaranteed in case of multi-tenant + require.ElementsMatch(t, c.expectedReqs, receivedReqs) + + expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1) + err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total") + assert.NoError(t, err) + }) + } +} + func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { // Parse the request diff --git a/pkg/promtail/client/fake/client.go b/pkg/promtail/client/fake/client.go index b104388912c2..ab7bd1686c00 100644 --- a/pkg/promtail/client/fake/client.go +++ b/pkg/promtail/client/fake/client.go @@ -19,6 +19,11 @@ func (c *Client) Stop() { c.OnStop() } +// StopNow implements client.Client +func (c *Client) StopNow() { + c.OnStop() +} + // Handle implements client.Client func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error { return c.OnHandleEntry.Handle(labels, time, entry) diff --git a/pkg/promtail/client/logger.go b/pkg/promtail/client/logger.go index b2ec8cac6206..dc164fa1141b 100644 --- a/pkg/promtail/client/logger.go +++ b/pkg/promtail/client/logger.go @@ -58,6 +58,8 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) func (*logger) Stop() {} +func (*logger) StopNow() {} + func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error { l.Lock() defer l.Unlock() diff --git a/pkg/promtail/client/multi.go b/pkg/promtail/client/multi.go index a5f2c63e47b0..0e8720ca5da3 100644 --- a/pkg/promtail/client/multi.go +++ b/pkg/promtail/client/multi.go @@ -55,3 +55,10 @@ func (m MultiClient) Stop() { c.Stop() } } + +// StopNow implements Client +func (m MultiClient) StopNow() { + for _, c := range m { + c.StopNow() + } +}