Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FIFO queue persistent buffering for fluent bit output plugin #2142

Merged
merged 3 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/fluent-bit/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ COPY . /src/loki
WORKDIR /src/loki
RUN make clean && make BUILD_IN_CONTAINER=false fluent-bit-plugin

FROM fluent/fluent-bit:1.2
FROM fluent/fluent-bit:1.4
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
COPY --from=build /src/loki/cmd/fluent-bit/out_loki.so /fluent-bit/bin
COPY cmd/fluent-bit/fluent-bit.conf /fluent-bit/etc/fluent-bit.conf
EXPOSE 2020
Expand Down
28 changes: 27 additions & 1 deletion cmd/fluent-bit/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Fluent Bit output plugin

[Fluent Bit](https://fluentbit.io/) is a Fast and Lightweight Data Forwarder, it can be configured with the [Loki output plugin](https://fluentbit.io/documentation/0.12/output/) to ship logs to Loki. You can define which log files you want to collect using the [`Tail`](https://fluentbit.io/documentation/0.12/input/tail.html) [input plugin](https://fluentbit.io/documentation/0.12/getting_started/input.html). Additionally Fluent Bit supports multiple `Filter` and `Parser` plugins (`Kubernetes`, `JSON`, etc..) to structure and alter log lines.
[Fluent Bit](https://fluentbit.io/) is a Fast and Lightweight Data Forwarder, it can be configured with the [Loki output plugin](https://fluentbit.io/documentation/0.12/output/) to ship logs to Loki. You can define which log files you want to collect using the [`Tail`](https://fluentbit.io/documentation/0.12/input/tail.html) or [`Stdin`](https://docs.fluentbit.io/manual/pipeline/inputs/standard-input) [input plugin](https://fluentbit.io/documentation/0.12/getting_started/input.html). Additionally Fluent Bit supports multiple `Filter` and `Parser` plugins (`Kubernetes`, `JSON`, etc..) to structure and alter log lines.

This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/fluent/fluent-bit-go) interface. It pushes logs to Loki using a GRPC connection.

Expand All @@ -22,6 +22,12 @@ This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/flue
| LineFormat | Format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format <key>=<value>. | json |
| DropSingleKey | If set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.| true |
| LabelMapPath | Path to a json file defining how to transform nested records. | none
| Buffer | Enable buffering mechanism | false
| BufferType | Specify the buffering mechanism to use (currently only dque is implemented). | dque
| DqueDir| Path to the directory for queued logs | /tmp/flb-storage/loki
| DqueSegmentSize| Segment size in terms of number of records per segment | 500
| DqueSync| Whether to fsync each queue change | false
| DqueName | Queue name, must be uniq per output | dque

### Labels

Expand Down Expand Up @@ -75,6 +81,26 @@ The labels extracted will be `{team="x-men", container="promtail", pod="promtail

If you don't want the `kubernetes` and `HOSTNAME` fields to appear in the log line you can use the `RemoveKeys` configuration field. (e.g. `RemoveKeys kubernetes,HOSTNAME`).

### Buffering
Buffering refers to the ability to store the records somewhere, and while they are processed and delivered, still be able to store more. Loki output plugin in certain situation can be blocked by loki client because of its design:
* BatchSize is over limit, output plugin pause receiving new records until the pending batch is sucessfully sent to the server
* Loki server is unreachable (retry 429s, 500s and connection-level errors), output plugin blocks new records until loki server will be avalible again and the pending batch is sucessfully sent to the server or as long as the maximum number of attempts has been reached within configured backoff mechanism

The blocking state with some of the input plugins is not acceptable because it can have a undesirable side effects on the part that generates the logs. Fluent Bit implements buffering mechanism that is based on parallel processing and it cannot send logs in order which is loki requirement (loki logs must be in increasing time order per stream).

Loki output plugin has buffering mechanism based on [`dque`](https://github.com/joncrlsn/dque) which is compatible with loki server strict time ordering and can be set up by configuration flag:
```properties
[Output]
Name loki
Match *
Url http://localhost:3100/loki/api/v1/push
Buffer true
DqueSegmentSize 8096
DqueDir /tmp/flb-storage/buffer
DqueName loki.0
```


### Configuration examples

To configure the Loki output plugin add this section to fluent-bit.conf
Expand Down
30 changes: 30 additions & 0 deletions cmd/fluent-bit/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"fmt"

"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/promtail/client"
)

type bufferConfig struct {
buffer bool
bufferType string
dqueConfig dqueConfig
}

var defaultBufferConfig = bufferConfig{
buffer: false,
bufferType: "dque",
dqueConfig: defaultDqueConfig,
}

// NewBuffer makes a new buffered Client.
func NewBuffer(cfg *config, logger log.Logger) (client.Client, error) {
switch cfg.bufferConfig.bufferType {
case "dque":
return newDque(cfg, logger)
default:
return nil, fmt.Errorf("failed to parse bufferType: %s", cfg.bufferConfig.bufferType)
}
}
14 changes: 14 additions & 0 deletions cmd/fluent-bit/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/promtail/client"
)

// NewClient creates a new client based on the fluentbit configuration.
func NewClient(cfg *config, logger log.Logger) (client.Client, error) {
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger)
}
return client.New(cfg.clientConfig, logger)
}
52 changes: 52 additions & 0 deletions cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (

type config struct {
clientConfig client.Config
bufferConfig bufferConfig
logLevel logging.Level
autoKubernetesLabels bool
removeKeys []string
Expand All @@ -51,6 +52,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
res := &config{}

res.clientConfig = defaultClientCfg
res.bufferConfig = defaultBufferConfig

url := cfg.Get("URL")
var clientURL flagext.URLValue
Expand Down Expand Up @@ -159,5 +161,55 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
}
res.labelKeys = nil
}

// enable loki plugin buffering
buffer := cfg.Get("Buffer")
switch buffer {
case "false", "":
res.bufferConfig.buffer = false
case "true":
res.bufferConfig.buffer = true
default:
return nil, fmt.Errorf("invalid boolean Buffer: %v", buffer)
}

// buffering type
bufferType := cfg.Get("BufferType")
if bufferType != "" {
res.bufferConfig.bufferType = bufferType
}

// dque directory
queueDir := cfg.Get("DqueDir")
if queueDir != "" {
res.bufferConfig.dqueConfig.queueDir = queueDir
}

// dque segment size (queueEntry unit)
queueSegmentSize := cfg.Get("DqueSegmentSize")
if queueSegmentSize != "" {
res.bufferConfig.dqueConfig.queueSegmentSize, err = strconv.Atoi(queueSegmentSize)
if err != nil {
return nil, fmt.Errorf("impossible to convert string to integer DqueSegmentSize: %v", queueSegmentSize)
}
}

// dque control file change sync to disk as they happen aka dque.turbo mode
queueSync := cfg.Get("DqueSync")
switch queueSync {
case "normal", "":
res.bufferConfig.dqueConfig.queueSync = false
case "full":
res.bufferConfig.dqueConfig.queueSync = true
default:
return nil, fmt.Errorf("invalid string queueSync: %v", queueSync)
}

// dque name
queueName := cfg.Get("DqueName")
if queueName != "" {
res.bufferConfig.dqueConfig.queueName = queueName
}

return res, nil
}
132 changes: 132 additions & 0 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"fmt"
"os"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"

"github.com/grafana/loki/pkg/promtail/client"
"github.com/joncrlsn/dque"
"github.com/prometheus/common/model"
)

type dqueConfig struct {
queueDir string
queueSegmentSize int
queueSync bool
queueName string
}

var defaultDqueConfig = dqueConfig{
queueDir: "/tmp/flb-storage/loki",
queueSegmentSize: 500,
queueSync: false,
queueName: "dque",
}

type dqueEntry struct {
Lbs model.LabelSet
Ts time.Time
Line string
}

func dqueEntryBuilder() interface{} {
return &dqueEntry{}
}

type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
quit chan struct{}
once sync.Once
wg sync.WaitGroup
}

// New makes a new dque loki client
func newDque(cfg *config, logger log.Logger) (client.Client, error) {
var err error

q := &dqueClient{
logger: log.With(logger, "component", "queue", "name", cfg.bufferConfig.dqueConfig.queueName),
quit: make(chan struct{}),
}

err = os.MkdirAll(cfg.bufferConfig.dqueConfig.queueDir, 0644)
if err != nil {
return nil, fmt.Errorf("cannot create queue directory: %s", err)
}

q.queue, err = dque.NewOrOpen(cfg.bufferConfig.dqueConfig.queueName, cfg.bufferConfig.dqueConfig.queueDir, cfg.bufferConfig.dqueConfig.queueSegmentSize, dqueEntryBuilder)
if err != nil {
return nil, err
}

if !cfg.bufferConfig.dqueConfig.queueSync {
q.queue.TurboOn()
}

q.loki, err = client.New(cfg.clientConfig, logger)
if err != nil {
return nil, err
}

q.wg.Add(1)
go q.dequeuer()
return q, nil
}

func (c *dqueClient) dequeuer() {
defer func() {
if err := c.queue.Close(); err != nil {
level.Error(c.logger).Log("msg", "error closing queue", "err", err)
}
c.wg.Done()
}()

for {
select {
case <-c.quit:
return
default:
}

// Dequeue the next item in the queue
entry, err := c.queue.DequeueBlock()
if err != nil {
level.Error(c.logger).Log("msg", "error dequeuing record", "error", err)
continue
}

// Assert type of the response to an Item pointer so we can work with it
record, ok := entry.(*dqueEntry)
if !ok {
level.Error(c.logger).Log("msg", "error dequeued record is not an valid type", "error")
continue
}

if err := c.loki.Handle(record.Lbs, record.Ts, record.Line); err != nil {
level.Error(c.logger).Log("msg", "error sending record to Loki", "error", err)
}
}
}

// Stop the client
func (c *dqueClient) Stop() {
c.once.Do(func() { close(c.quit) })
c.loki.Stop()
c.wg.Wait()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
return fmt.Errorf("cannot enqueue record %s: %s", s, err)
}

return nil
}
3 changes: 1 addition & 2 deletions cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type loki struct {
}

func newPlugin(cfg *config, logger log.Logger) (*loki, error) {
client, err := client.New(cfg.clientConfig, logger)
client, err := NewClient(cfg, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -70,7 +70,6 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {

// prevent base64-encoding []byte values (default json.Encoder rule) by
// converting them to strings

func toStringSlice(slice []interface{}) []interface{} {
var s []interface{}
for _, v := range slice {
Expand Down
6 changes: 5 additions & 1 deletion cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(paramLogger).Log("LineFormat", conf.lineFormat)
level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey)
level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

level.Info(paramLogger).Log("Buffer", conf.bufferConfig.buffer)
level.Info(paramLogger).Log("BufferType", conf.bufferConfig.bufferType)
level.Info(paramLogger).Log("DqueDir", conf.bufferConfig.dqueConfig.queueDir)
level.Info(paramLogger).Log("DqueSegmentSize", conf.bufferConfig.dqueConfig.queueSegmentSize)
level.Info(paramLogger).Log("DqueSync", conf.bufferConfig.dqueConfig.queueSync)
plugin, err := newPlugin(conf, logger)
if err != nil {
level.Error(logger).Log("newPlugin", err)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/frankban/quicktest v1.7.2 // indirect
github.com/go-kit/kit v0.10.0
github.com/go-logfmt/logfmt v0.5.0
github.com/gofrs/flock v0.7.1 // indirect
github.com/gogo/protobuf v1.3.1 // remember to update loki-build-image/Dockerfile too
github.com/golang/snappy v0.0.1
github.com/gorilla/mux v1.7.3
Expand All @@ -33,6 +34,7 @@ require (
github.com/hpcloud/tail v1.0.0
github.com/influxdata/go-syslog/v2 v2.0.1
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/json-iterator/go v1.1.9
github.com/klauspost/compress v1.9.4
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4/go.mod h1:4Fw1eo5iaEhD
github.com/gocql/gocql v0.0.0-20200121121104-95d072f1b5bb h1:gAHg4RMULuLo7Y3jhY5CHh/QuSwjeTZt4qVdJ9ytcVI=
github.com/gocql/gocql v0.0.0-20200121121104-95d072f1b5bb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -542,6 +544,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible h1:f4ZGkY12AQ+YvzWDDWMLMGejA4ceg7nIPlqJ9fQ9T4c=
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down
24 changes: 24 additions & 0 deletions vendor/github.com/gofrs/flock/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading