Skip to content

Commit

Permalink
fix(docker-driver): Propagate promtail's client.Stop properly (#2898)
Browse files Browse the repository at this point in the history
* impr(Makefile): add support to take docker-driver image via ENV

* impr(promtail/client): Add cancelling mechanism for batchSend

stores `context` and its `cancel` in client and uses it to
make upstream calls that need `ctx`.

Also properly cancel's all the upstream calls that using `ctx` when
client's Stop is called.

* Add `StopNow()` method to stop the client without retries.

Use `StopNow()` from the docker-driver

* Add test for StopNow

* reverting back the client.Timeout
  • Loading branch information
kavirajk authored Dec 1, 2020
1 parent f6fd6ae commit 410b9d9
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 18 deletions.
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ helm-clean:
#################

# optionally set the tag or the arch suffix (-arm64)
LOKI_DOCKER_DRIVER ?= "grafana/loki-docker-driver"
PLUGIN_TAG ?= $(IMAGE_TAG)
PLUGIN_ARCH ?=

Expand All @@ -393,24 +394,24 @@ docker-driver: docker-driver-clean
(docker export $$ID | tar -x -C cmd/docker-driver/rootfs) && \
docker rm -vf $$ID
docker rmi rootfsimage -f
docker plugin create grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) cmd/docker-driver
docker plugin create grafana/loki-docker-driver:latest$(PLUGIN_ARCH) cmd/docker-driver
docker plugin create $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) cmd/docker-driver
docker plugin create $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH) cmd/docker-driver

cmd/docker-driver/docker-driver: $(APP_GO_FILES)
CGO_ENABLED=0 go build $(GO_FLAGS) -o $@ ./$(@D)
$(NETGO_CHECK)

docker-driver-push: docker-driver
docker plugin push grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
docker plugin push grafana/loki-docker-driver:latest$(PLUGIN_ARCH)
docker plugin push $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)
docker plugin push $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH)

docker-driver-enable:
docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
docker plugin enable $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)

docker-driver-clean:
-docker plugin disable grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm grafana/loki-docker-driver:latest$(PLUGIN_ARCH)
-docker plugin disable $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH)
rm -rf cmd/docker-driver/rootfs

#####################
Expand Down
2 changes: 1 addition & 1 deletion cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ func (l *loki) Name() string {

// Log implements `logger.Logger`
func (l *loki) Close() error {
l.client.Stop()
l.client.StopNow()
return nil
}
4 changes: 3 additions & 1 deletion cmd/docker-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ func main() {
}
logger := newLogger(logLevel)
level.Info(util.Logger).Log("msg", "Starting docker-plugin", "version", version.Info())

h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)

handlers(&h, newDriver(logger))

if err := h.ServeUnix(socketAddress, 0); err != nil {
panic(err)
}

}

func newLogger(lvl logging.Level) log.Logger {
Expand Down
6 changes: 6 additions & 0 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (c *dqueClient) Stop() {
c.loki.Stop()
}

// Stop the client
func (c *dqueClient) StopNow() {
c.once.Do(func() { c.queue.Close() })
c.loki.StopNow()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/fluent-bit/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (r *recorder) toEntry() *entry { return r.entry }

func (r *recorder) Stop() {}

func (r *recorder) StopNow() {}

var now = time.Now()

func Test_loki_sendRecord(t *testing.T) {
Expand Down
43 changes: 35 additions & 8 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,29 @@ type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()

// Stop goroutine sending batch of entries without retries.
StopNow()
}

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
logger log.Logger
cfg Config
client *http.Client
quit chan struct{}
logger log.Logger
cfg Config
client *http.Client

// quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead.

This comment has been minimized.

Copy link
@james-callahan

james-callahan Dec 2, 2020

Contributor

typo: deprecated

quit chan struct{}

once sync.Once
entries chan entry
wg sync.WaitGroup

externalLabels model.LabelSet

// ctx is used in any upstream calls from the `client`.
ctx context.Context
cancel context.CancelFunc
}

type entry struct {
Expand All @@ -139,13 +149,17 @@ func New(cfg Config, logger log.Logger) (Client, error) {
return nil, errors.New("client needs target URL")
}

ctx, cancel := context.WithCancel(context.Background())

c := &client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
}

err := cfg.Client.Validate()
Expand Down Expand Up @@ -246,12 +260,13 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
for {
start := time.Now()
status, err = c.send(ctx, tenantID, buf)
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(context.Background(), tenantID, buf)

requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
Expand Down Expand Up @@ -288,6 +303,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()

// Make sure it sends at least once before checking for retry.
if !backoff.Ongoing() {
break
}
}

if err != nil {
Expand Down Expand Up @@ -353,6 +373,13 @@ func (c *client) Stop() {
c.wg.Wait()
}

// StopNow stops the client without retries
func (c *client) StopNow() {
// cancel any upstream calls made using client's `ctx`.
c.cancel()
c.Stop()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
Expand Down
141 changes: 141 additions & 0 deletions pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,147 @@ func TestClient_Handle(t *testing.T) {
}
}

func TestClient_StopNow(t *testing.T) {
cases := []struct {
name string
clientBatchSize int
clientBatchWait time.Duration
clientMaxRetries int
clientTenantID string
serverResponseStatus int
inputEntries []entry
inputDelay time.Duration
expectedReqs []receivedReq
expectedMetrics string
}{
{
name: "send requests shouldn't be cancelled after StopNow()",
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 200,
inputEntries: []entry{logEntries[0], logEntries[1], logEntries[2]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
`,
},
{
name: "shouldn't retry after StopNow()",
clientBatchSize: 10,
clientBatchWait: 10 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 429,
inputEntries: []entry{logEntries[0]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
`,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)

// Start a local HTTP server
server := httptest.NewServer(createServerHandler(receivedReqsChan, c.serverResponseStatus))
require.NotNil(t, server)
defer server.Close()

// Get the URL at which the local test server is listening to
serverURL := flagext.URLValue{}
err := serverURL.Set(server.URL)
require.NoError(t, err)

// Instance the client
cfg := Config{
URL: serverURL,
BatchWait: c.clientBatchWait,
BatchSize: c.clientBatchSize,
Client: config.HTTPClientConfig{},
BackoffConfig: util.BackoffConfig{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: c.clientMaxRetries},
ExternalLabels: lokiflag.LabelSet{},
Timeout: 1 * time.Second,
TenantID: c.clientTenantID,
}

cl, err := New(cfg, log.NewNopLogger())
require.NoError(t, err)

// Send all the input log entries
for i, logEntry := range c.inputEntries {
err = cl.Handle(logEntry.labels, logEntry.Timestamp, logEntry.Line)
require.NoError(t, err)

if c.inputDelay > 0 && i < len(c.inputEntries)-1 {
time.Sleep(c.inputDelay)
}
}

// Wait until the expected push requests are received (with a timeout)
deadline := time.Now().Add(1 * time.Second)
for len(receivedReqsChan) < len(c.expectedReqs) && time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
}

// StopNow should have cancelled client's ctx
cc := cl.(*client)
require.NoError(t, cc.ctx.Err())

// Stop the client: it waits until the current batch is sent
cl.StopNow()
close(receivedReqsChan)

require.Error(t, cc.ctx.Err()) // non-nil error if its cancelled.

// Get all push requests received on the server side
receivedReqs := make([]receivedReq, 0)
for req := range receivedReqsChan {
receivedReqs = append(receivedReqs, req)
}

// Due to implementation details (maps iteration ordering is random) we just check
// that the expected requests are equal to the received requests, without checking
// the exact order which is not guaranteed in case of multi-tenant
require.ElementsMatch(t, c.expectedReqs, receivedReqs)

expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
}

func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Parse the request
Expand Down
5 changes: 5 additions & 0 deletions pkg/promtail/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func (c *Client) Stop() {
c.OnStop()
}

// StopNow implements client.Client
func (c *Client) StopNow() {
c.OnStop()
}

// Handle implements client.Client
func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error {
return c.OnHandleEntry.Handle(labels, time, entry)
Expand Down
2 changes: 2 additions & 0 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config)

func (*logger) Stop() {}

func (*logger) StopNow() {}

func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error {
l.Lock()
defer l.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ func (m MultiClient) Stop() {
c.Stop()
}
}

// StopNow implements Client
func (m MultiClient) StopNow() {
for _, c := range m {
c.StopNow()
}
}

0 comments on commit 410b9d9

Please sign in to comment.