Skip to content

Commit

Permalink
fluent-bit: sorted JSON and properly convert []byte to string (#1310)
Browse files Browse the repository at this point in the history
* fluent-bit: fix variable spelling mistake

Signed-off-by: Jens Erat <email@jenserat.de>

* fluent-bit: sort JSON map

While developing another fix, I stumbled upon #1309 as a newly written
unit test (with multiple key-value pairs in a map) was flaky.

While JSON does not [strictly define an order on records in a map][RFC8259],
but practical operations with a logging tool pretty much require it
(although of course grep for JSON is jq, not grep...). Also, the key
value output format is already sorted.

Switching to sorted output in jsoniter is pretty easy. As of today, it
still has a [bug] though, for which I already provided a [fix]. I
propose accepting that rare case where invalid types can occur from
msgpack output (can this even happen?) and re-enable the failing test
case as soon as the upstream PR is merged.

[RFC8259]: https://tools.ietf.org/html/rfc8259#section-4
[bug]:     json-iterator/go#388
[fix]:     json-iterator/go#422

Signed-off-by: Jens Erat <email@jenserat.de>

* fluent-bit: properly convert []byte to string

Recently, a regression was introduced that no longer ran a deep
conversion of `[]byte` to `string` unless a label map was supplied. This
commit fixes this by running the string conversion recursively, also
removing the need of applying the conversion function again during label
map stage.

This change has two minor side effects:

- Some test cases had to be moved, as string conversion happens much
  earlier now.
- Invalid characters do not result in the whole label being ignored any
  more, but are replaced by the unicode placeholder character now. I'd
  consider this an improvement, making both debugging much easier ("why
  is that value suddenly missing?") and preserving as much information
  as possible.

Signed-off-by: Jens Erat <email@jenserat.de>
  • Loading branch information
JensErat authored and cyriltovena committed Nov 26, 2019
1 parent 9956717 commit 492a17d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 106 deletions.
4 changes: 2 additions & 2 deletions cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type config struct {
labelKeys []string
lineFormat format
dropSingleKey bool
labeMap map[string]interface{}
labelMap map[string]interface{}
}

func parseConfig(cfg ConfigGetter) (*config, error) {
Expand Down Expand Up @@ -139,7 +139,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if err != nil {
return nil, fmt.Errorf("failed to open LabelMap file: %s", err)
}
if err := json.Unmarshal(content, &res.labeMap); err != nil {
if err := json.Unmarshal(content, &res.labelMap); err != nil {
return nil, fmt.Errorf("failed to Unmarshal LabelMap file: %s", err)
}
res.labelKeys = nil
Expand Down
6 changes: 3 additions & 3 deletions cmd/fluent-bit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func Test_parseConfig(t *testing.T) {
labelKeys: nil,
removeKeys: []string{"buzz", "fuzz"},
dropSingleKey: false,
labeMap: map[string]interface{}{
labelMap: map[string]interface{}{
"kubernetes": map[string]interface{}{
"container_name": "container",
"host": "host",
Expand Down Expand Up @@ -160,8 +160,8 @@ func assertConfig(t *testing.T, expected, actual *config) {
if expected.logLevel.String() != actual.logLevel.String() {
t.Errorf("incorrect logLevel want:%v got:%v", expected.logLevel.String(), actual.logLevel.String())
}
if !reflect.DeepEqual(expected.labeMap, actual.labeMap) {
t.Errorf("incorrect labeMap want:%v got:%v", expected.labeMap, actual.labeMap)
if !reflect.DeepEqual(expected.labelMap, actual.labelMap) {
t.Errorf("incorrect labelMap want:%v got:%v", expected.labelMap, actual.labelMap)
}
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
records := toStringMap(r)
level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records))
lbs := model.LabelSet{}
if l.cfg.labeMap != nil {
mapLabels(records, l.cfg.labeMap, lbs)
if l.cfg.labelMap != nil {
mapLabels(records, l.cfg.labelMap, lbs)
} else {
lbs = extractLabels(records, l.cfg.labelKeys)
}
Expand Down Expand Up @@ -72,6 +72,8 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} {
case []byte:
// prevent encoding to base64
m[key] = string(t)
case map[interface{}]interface{}:
m[key] = toStringMap(t)
default:
m[key] = v
}
Expand Down Expand Up @@ -106,10 +108,9 @@ func mapLabels(records map[string]interface{}, mapping map[string]interface{}, r
switch nextKey := v.(type) {
// if the next level is a map we are expecting we need to move deeper in the tree
case map[string]interface{}:
if nextValue, ok := records[k].(map[interface{}]interface{}); ok {
recordsMap := toStringMap(nextValue)
if nextValue, ok := records[k].(map[string]interface{}); ok {
// recursively search through the next level map.
mapLabels(recordsMap, nextKey, res)
mapLabels(nextValue, nextKey, res)
}
// we found a value in the mapping meaning we need to save the corresponding record value for the given key.
case string:
Expand Down Expand Up @@ -147,7 +148,7 @@ func removeKeys(records map[string]interface{}, keys []string) {
func createLine(records map[string]interface{}, f format) (string, error) {
switch f {
case jsonFormat:
js, err := jsoniter.Marshal(records)
js, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(records)
if err != nil {
return "", err
}
Expand Down
160 changes: 66 additions & 94 deletions cmd/fluent-bit/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,61 @@ func (r *recorder) Stop() {}
var now = time.Now()

func Test_loki_sendRecord(t *testing.T) {
var recordFixture = map[interface{}]interface{}{
var simpleRecordFixture = map[interface{}]interface{}{
"foo": "bar",
"bar": 500,
"error": make(chan struct{}),
}
var mapRecordFixture = map[interface{}]interface{}{
// lots of key/value pairs in map to increase chances of test hitting in case of unsorted map marshalling
"A": "A",
"B": "B",
"C": "C",
"D": "D",
"E": "E",
"F": "F",
"G": "G",
"H": "H",
}
var byteArrayRecordFixture = map[interface{}]interface{}{
"label": "label",
"outer": []byte("foo"),
"map": map[interface{}]interface{}{
"inner": []byte("bar"),
},
}
var mixedTypesRecordFixture = map[interface{}]interface{}{
"label": "label",
"int": 42,
"float": 42.42,
"array": []interface{}{42, 42.42, "foo"},
"map": map[interface{}]interface{}{
"nested": map[interface{}]interface{}{
"foo": "bar",
"invalid": []byte("a\xc5z"),
},
},
}

tests := []struct {
name string
cfg *config
want *entry
name string
cfg *config
record map[interface{}]interface{}
want *entry
}{
{"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, nil},
{"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}},
{"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `{"bar":500}`, now}},
{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, nil},
{"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `bar=500`, now}},
{"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `500`, now}},
{"labelmap", &config{labeMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}},
{"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, now}},
{"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `B=B C=C D=D E=E F=F G=G H=H`, now}},
{"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, nil},
{"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}},
{"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500}`, now}},
// jsoniter.ConfigCompatibleWithStandardLibrary has an issue passing errors from inside a map:
// https://github.com/json-iterator/go/issues/388 -- fix already proposed upstream (#422)
//{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500,"error":}`, now}},
{"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `bar=500`, now}},
{"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `500`, now}},
{"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}},
{"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"map":{"inner":"bar"},"outer":"foo"}`, now}},
{"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, now}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -62,7 +99,7 @@ func Test_loki_sendRecord(t *testing.T) {
client: rec,
logger: logger,
}
_ = l.sendRecord(recordFixture, now)
_ = l.sendRecord(tt.record, now)
got := rec.toEntry()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("sendRecord() want:%v got:%v", tt.want, got)
Expand All @@ -82,7 +119,9 @@ func Test_createLine(t *testing.T) {

{"json", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": "bazz"}}, jsonFormat, `{"foo":"bar","bar":{"bizz":"bazz"}}`, false},
{"json with number", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}}, jsonFormat, `{"foo":"bar","bar":{"bizz":20}}`, false},
{"bad json", map[string]interface{}{"foo": make(chan interface{})}, jsonFormat, "", true},
// jsoniter.ConfigCompatibleWithStandardLibrary has an issue passing errors from inside a map:
// https://github.com/json-iterator/go/issues/388 -- fix already proposed upstream (#422)
// {"bad json", map[string]interface{}{"foo": make(chan interface{})}, jsonFormat, "", true},
{"kv", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": "bazz"}}, kvPairFormat, `bar=map[bizz:bazz] foo=bar`, false},
{"kv with number", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}, "decimal": 12.2}, kvPairFormat, `bar=map[bizz:20] decimal=12.2 foo=bar`, false},
{"kv with nil", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}, "null": nil}, kvPairFormat, `bar=map[bizz:20] foo=bar null=<nil>`, false},
Expand Down Expand Up @@ -214,69 +253,24 @@ func Test_labelMapping(t *testing.T) {
map[string]interface{}{},
model.LabelSet{},
},
{
"bytes string",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"foo": []byte("buzz"),
},
"stream": "stderr",
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"foo": "test",
"nonexisting": "",
},
"stream": "output",
"nope": "nope",
},
model.LabelSet{"test": "buzz", "output": "stderr"},
},
{
"numeric label",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"integer": 42,
"floating_point": 42.42,
},
"stream": "stderr",
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"integer": "integer",
"floating_point": "floating_point",
},
"stream": "output",
"nope": "nope",
},
model.LabelSet{"integer": "42", "floating_point": "42.42", "output": "stderr"},
},
{
"list label",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"integers": []int{42, 43},
},
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"integers": "integers",
},
},
model.LabelSet{"integers": "[42 43]"},
},
{
"deep string",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"label": map[interface{}]interface{}{
"component": map[interface{}]interface{}{
"int": "42",
"float": "42.42",
"array": `[42,42.42,"foo"]`,
"kubernetes": map[string]interface{}{
"label": map[string]interface{}{
"component": map[string]interface{}{
"buzz": "value",
},
},
},
},
map[string]interface{}{
"int": "int",
"float": "float",
"array": "array",
"kubernetes": map[string]interface{}{
"label": map[string]interface{}{
"component": map[string]interface{}{
Expand All @@ -287,34 +281,12 @@ func Test_labelMapping(t *testing.T) {
"stream": "output",
"nope": "nope",
},
model.LabelSet{"label": "value"},
},
{
"skip invalid values",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"annotations": map[interface{}]interface{}{
"kubernetes.io/config.source": "cfg",
"kubernetes.io/config.hash": []byte("a\xc5z"),
},
"container_name": "loki",
"namespace_name": "dev",
"pod_name": "loki-asdwe",
},
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"annotations": map[string]interface{}{
"kubernetes.io/config.source": "source",
"kubernetes.io/config.hash": "hash",
},
"container_name": "container",
"namespace_name": "namespace",
"pod_name": "instance",
},
"stream": "output",
model.LabelSet{
"int": "42",
"float": "42.42",
"array": `[42,42.42,"foo"]`,
"label": "value",
},
model.LabelSet{"container": "loki", "instance": "loki-asdwe", "namespace": "dev", "source": "cfg"},
},
}
for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labeMap))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err = newPlugin(conf, logger)
if err != nil {
Expand Down

0 comments on commit 492a17d

Please sign in to comment.