Skip to content

Commit

Permalink
fluent-bit: multi-instance support (#1294)
Browse files Browse the repository at this point in the history
To run multiple plugin instances at the same time, the plugin instance
must be registered and later retrieved.

Additionally, a list of registered plugin instances must be stored for
proper disposal during fluent-bit shutdown.

Based on the [out_multiinstance example] in the upstream repository.

[out_multiinstance example]: https://github.com/fluent/fluent-bit-go/blob/fc386d263885e50387dd0081a77adf4072e8e4b6/examples/out_multiinstance/out.go

Signed-off-by: Jens Erat <email@jenserat.de>
Co-authored-by: Hiroshi Hatake <cosmo0920.oucc@gmail.com>
  • Loading branch information
2 people authored and cyriltovena committed Jan 6, 2020
1 parent c9016f4 commit 557f9cc
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
4 changes: 4 additions & 0 deletions cmd/fluent-bit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ To configure the Loki output plugin add this section to fluent-bit.conf
```
A full [example configuration file](fluent-bit.conf) is also available in this repository.

### Running multiple plugin instances

You can run multiple plugin instances in the same fluent-bit process, for example if you want to push to different Loki servers or route logs into different Loki tenant IDs. To do so, add additional `[Output]` sections.

## Building

## Prerequisites
Expand Down
68 changes: 44 additions & 24 deletions cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"github.com/weaveworks/common/logging"
)

var plugin *loki
var logger log.Logger
var (
// registered loki plugin instances, required for disposal during shutdown
plugins []*loki
logger log.Logger
)

func init() {
var logLevel logging.Level
Expand All @@ -40,38 +43,53 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

conf, err := parseConfig(&pluginConfig{ctx: ctx})
if err != nil {
level.Error(logger).Log("[flb-go]", "failed to launch", "error", err)
return output.FLB_ERROR
}
logger = newLogger(conf.logLevel)

// numeric plugin ID, only used for user-facing purpose (logging, ...)
id := len(plugins)
logger := log.With(newLogger(conf.logLevel), "id", id)

level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-loki", "version", version.Info())
level.Info(logger).Log("[flb-go]", "provided parameter", "URL", conf.clientConfig.URL)
level.Info(logger).Log("[flb-go]", "provided parameter", "TenantID", conf.clientConfig.TenantID)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchWait", conf.clientConfig.BatchWait)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize)
level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel)
level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err = newPlugin(conf, logger)
paramLogger := log.With(logger, "[flb-go]", "provided parameter")
level.Info(paramLogger).Log("URL", conf.clientConfig.URL)
level.Info(paramLogger).Log("TenantID", conf.clientConfig.TenantID)
level.Info(paramLogger).Log("BatchWait", conf.clientConfig.BatchWait)
level.Info(paramLogger).Log("BatchSize", conf.clientConfig.BatchSize)
level.Info(paramLogger).Log("Labels", conf.clientConfig.ExternalLabels)
level.Info(paramLogger).Log("LogLevel", conf.logLevel.String())
level.Info(paramLogger).Log("AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(paramLogger).Log("RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(paramLogger).Log("LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(paramLogger).Log("LineFormat", conf.lineFormat)
level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey)
level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err := newPlugin(conf, logger)
if err != nil {
level.Error(logger).Log("newPlugin", err)
return output.FLB_ERROR
}

// register plugin instance, to be retrievable when sending logs
output.FLBPluginSetContext(ctx, plugin)
// remember plugin instance, required to cleanly dispose when fluent-bit is shutting down
plugins = append(plugins, plugin)

return output.FLB_OK
}

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, _ *C.char) int {
plugin := output.FLBPluginGetContext(ctx).(*loki)
if plugin == nil {
level.Error(logger).Log("[flb-go]", "plugin not initialized")
return output.FLB_ERROR
}

var ret int
var ts interface{}
var record map[interface{}]interface{}
Expand All @@ -92,13 +110,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
level.Warn(logger).Log("msg", "timestamp isn't known format. Use current time.")
level.Warn(plugin.logger).Log("msg", "timestamp isn't known format. Use current time.")
timestamp = time.Now()
}

err := plugin.sendRecord(record, timestamp)
if err != nil {
level.Error(logger).Log("msg", "error sending record to Loki", "error", err)
level.Error(plugin.logger).Log("msg", "error sending record to Loki", "error", err)
return output.FLB_ERROR
}
}
Expand All @@ -113,8 +131,10 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {

//export FLBPluginExit
func FLBPluginExit() int {
if plugin.client != nil {
plugin.client.Stop()
for _, plugin := range plugins {
if plugin.client != nil {
plugin.client.Stop()
}
}
return output.FLB_OK
}
Expand Down

0 comments on commit 557f9cc

Please sign in to comment.