From 5810719ee318452542d4af071a9e3f7c4f41da19 Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Thu, 21 Nov 2019 15:53:18 +0100 Subject: [PATCH 1/3] fluent-bit: fix variable spelling mistake Signed-off-by: Jens Erat --- cmd/fluent-bit/config.go | 4 ++-- cmd/fluent-bit/config_test.go | 6 +++--- cmd/fluent-bit/loki.go | 4 ++-- cmd/fluent-bit/loki_test.go | 2 +- cmd/fluent-bit/out_loki.go | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/fluent-bit/config.go b/cmd/fluent-bit/config.go index 6dbaee128b276..610eabdd89fb9 100644 --- a/cmd/fluent-bit/config.go +++ b/cmd/fluent-bit/config.go @@ -42,7 +42,7 @@ type config struct { labelKeys []string lineFormat format dropSingleKey bool - labeMap map[string]interface{} + labelMap map[string]interface{} } func parseConfig(cfg ConfigGetter) (*config, error) { @@ -139,7 +139,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if err != nil { return nil, fmt.Errorf("failed to open LabelMap file: %s", err) } - if err := json.Unmarshal(content, &res.labeMap); err != nil { + if err := json.Unmarshal(content, &res.labelMap); err != nil { return nil, fmt.Errorf("failed to Unmarshal LabelMap file: %s", err) } res.labelKeys = nil diff --git a/cmd/fluent-bit/config_test.go b/cmd/fluent-bit/config_test.go index 90b903cfd985c..cec3b57ef3cb0 100644 --- a/cmd/fluent-bit/config_test.go +++ b/cmd/fluent-bit/config_test.go @@ -97,7 +97,7 @@ func Test_parseConfig(t *testing.T) { labelKeys: nil, removeKeys: []string{"buzz", "fuzz"}, dropSingleKey: false, - labeMap: map[string]interface{}{ + labelMap: map[string]interface{}{ "kubernetes": map[string]interface{}{ "container_name": "container", "host": "host", @@ -160,8 +160,8 @@ func assertConfig(t *testing.T, expected, actual *config) { if expected.logLevel.String() != actual.logLevel.String() { t.Errorf("incorrect logLevel want:%v got:%v", expected.logLevel.String(), actual.logLevel.String()) } - if !reflect.DeepEqual(expected.labeMap, actual.labeMap) { - t.Errorf("incorrect labeMap want:%v got:%v", expected.labeMap, actual.labeMap) + if !reflect.DeepEqual(expected.labelMap, actual.labelMap) { + t.Errorf("incorrect labelMap want:%v got:%v", expected.labelMap, actual.labelMap) } } diff --git a/cmd/fluent-bit/loki.go b/cmd/fluent-bit/loki.go index 7f6bcf3dd0b16..28fb27b7877f9 100644 --- a/cmd/fluent-bit/loki.go +++ b/cmd/fluent-bit/loki.go @@ -38,8 +38,8 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error { records := toStringMap(r) level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records)) lbs := model.LabelSet{} - if l.cfg.labeMap != nil { - mapLabels(records, l.cfg.labeMap, lbs) + if l.cfg.labelMap != nil { + mapLabels(records, l.cfg.labelMap, lbs) } else { lbs = extractLabels(records, l.cfg.labelKeys) } diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index 31f6ea30a4453..856793419e2bd 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -52,7 +52,7 @@ func Test_loki_sendRecord(t *testing.T) { {"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, nil}, {"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `bar=500`, now}}, {"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `500`, now}}, - {"labelmap", &config{labeMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}}, + {"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/cmd/fluent-bit/out_loki.go b/cmd/fluent-bit/out_loki.go index dd0533f2cf372..3ebf983e45ec0 100644 --- a/cmd/fluent-bit/out_loki.go +++ b/cmd/fluent-bit/out_loki.go @@ -56,7 +56,7 @@ func FLBPluginInit(ctx unsafe.Pointer) int { level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys)) level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat) level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey) - level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labeMap)) + level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap)) plugin, err = newPlugin(conf, logger) if err != nil { From 96fe3d2ed71bbac1ee7fca7788a680a08d25657b Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Fri, 22 Nov 2019 17:21:30 +0100 Subject: [PATCH 2/3] fluent-bit: sort JSON map While developing another fix, I stumbled upon #1309 as a newly written unit test (with multiple key-value pairs in a map) was flaky. While JSON does not [strictly define an order on records in a map][RFC8259], but practical operations with a logging tool pretty much require it (although of course grep for JSON is jq, not grep...). Also, the key value output format is already sorted. Switching to sorted output in jsoniter is pretty easy. As of today, it still has a [bug] though, for which I already provided a [fix]. I propose accepting that rare case where invalid types can occur from msgpack output (can this even happen?) and re-enable the failing test case as soon as the upstream PR is merged. [RFC8259]: https://tools.ietf.org/html/rfc8259#section-4 [bug]: https://github.com/json-iterator/go/issues/388 [fix]: https://github.com/json-iterator/go/issues/422 Signed-off-by: Jens Erat --- cmd/fluent-bit/loki.go | 2 +- cmd/fluent-bit/loki_test.go | 44 ++++++++++++++++++++++++++----------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/cmd/fluent-bit/loki.go b/cmd/fluent-bit/loki.go index 28fb27b7877f9..3f3ef7a17bec7 100644 --- a/cmd/fluent-bit/loki.go +++ b/cmd/fluent-bit/loki.go @@ -147,7 +147,7 @@ func removeKeys(records map[string]interface{}, keys []string) { func createLine(records map[string]interface{}, f format) (string, error) { switch f { case jsonFormat: - js, err := jsoniter.Marshal(records) + js, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(records) if err != nil { return "", err } diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index 856793419e2bd..df7d3c1f2c184 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -35,24 +35,40 @@ func (r *recorder) Stop() {} var now = time.Now() func Test_loki_sendRecord(t *testing.T) { - var recordFixture = map[interface{}]interface{}{ + var simpleRecordFixture = map[interface{}]interface{}{ "foo": "bar", "bar": 500, "error": make(chan struct{}), } + var mapRecordFixture = map[interface{}]interface{}{ + // lots of key/value pairs in map to increase chances of test hitting in case of unsorted map marshalling + "A": "A", + "B": "B", + "C": "C", + "D": "D", + "E": "E", + "F": "F", + "G": "G", + "H": "H", + } tests := []struct { - name string - cfg *config - want *entry + name string + cfg *config + record map[interface{}]interface{} + want *entry }{ - {"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, nil}, - {"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}}, - {"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `{"bar":500}`, now}}, - {"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, nil}, - {"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `bar=500`, now}}, - {"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `500`, now}}, - {"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}}, + {"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, now}}, + {"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `B=B C=C D=D E=E F=F G=G H=H`, now}}, + {"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, nil}, + {"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}}, + {"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500}`, now}}, + // jsoniter.ConfigCompatibleWithStandardLibrary has an issue passing errors from inside a map: + // https://github.com/json-iterator/go/issues/388 -- fix already proposed upstream (#422) + //{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500,"error":}`, now}}, + {"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `bar=500`, now}}, + {"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `500`, now}}, + {"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -62,7 +78,7 @@ func Test_loki_sendRecord(t *testing.T) { client: rec, logger: logger, } - _ = l.sendRecord(recordFixture, now) + _ = l.sendRecord(tt.record, now) got := rec.toEntry() if !reflect.DeepEqual(got, tt.want) { t.Errorf("sendRecord() want:%v got:%v", tt.want, got) @@ -82,7 +98,9 @@ func Test_createLine(t *testing.T) { {"json", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": "bazz"}}, jsonFormat, `{"foo":"bar","bar":{"bizz":"bazz"}}`, false}, {"json with number", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}}, jsonFormat, `{"foo":"bar","bar":{"bizz":20}}`, false}, - {"bad json", map[string]interface{}{"foo": make(chan interface{})}, jsonFormat, "", true}, + // jsoniter.ConfigCompatibleWithStandardLibrary has an issue passing errors from inside a map: + // https://github.com/json-iterator/go/issues/388 -- fix already proposed upstream (#422) + // {"bad json", map[string]interface{}{"foo": make(chan interface{})}, jsonFormat, "", true}, {"kv", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": "bazz"}}, kvPairFormat, `bar=map[bizz:bazz] foo=bar`, false}, {"kv with number", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}, "decimal": 12.2}, kvPairFormat, `bar=map[bizz:20] decimal=12.2 foo=bar`, false}, {"kv with nil", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}, "null": nil}, kvPairFormat, `bar=map[bizz:20] foo=bar null=`, false}, From bede393466378d445fd99b3b043a53e8b7671bd0 Mon Sep 17 00:00:00 2001 From: Jens Erat Date: Fri, 22 Nov 2019 18:53:36 +0100 Subject: [PATCH 3/3] fluent-bit: properly convert []byte to string Recently, a regression was introduced that no longer ran a deep conversion of `[]byte` to `string` unless a label map was supplied. This commit fixes this by running the string conversion recursively, also removing the need of applying the conversion function again during label map stage. This change has two minor side effects: - Some test cases had to be moved, as string conversion happens much earlier now. - Invalid characters do not result in the whole label being ignored any more, but are replaced by the unicode placeholder character now. I'd consider this an improvement, making both debugging much easier ("why is that value suddenly missing?") and preserving as much information as possible. Signed-off-by: Jens Erat --- cmd/fluent-bit/loki.go | 7 ++- cmd/fluent-bit/loki_test.go | 116 +++++++++++------------------------- 2 files changed, 39 insertions(+), 84 deletions(-) diff --git a/cmd/fluent-bit/loki.go b/cmd/fluent-bit/loki.go index 3f3ef7a17bec7..254b6985662d4 100644 --- a/cmd/fluent-bit/loki.go +++ b/cmd/fluent-bit/loki.go @@ -72,6 +72,8 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} { case []byte: // prevent encoding to base64 m[key] = string(t) + case map[interface{}]interface{}: + m[key] = toStringMap(t) default: m[key] = v } @@ -106,10 +108,9 @@ func mapLabels(records map[string]interface{}, mapping map[string]interface{}, r switch nextKey := v.(type) { // if the next level is a map we are expecting we need to move deeper in the tree case map[string]interface{}: - if nextValue, ok := records[k].(map[interface{}]interface{}); ok { - recordsMap := toStringMap(nextValue) + if nextValue, ok := records[k].(map[string]interface{}); ok { // recursively search through the next level map. - mapLabels(recordsMap, nextKey, res) + mapLabels(nextValue, nextKey, res) } // we found a value in the mapping meaning we need to save the corresponding record value for the given key. case string: diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index df7d3c1f2c184..0797fcb43e936 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -51,6 +51,25 @@ func Test_loki_sendRecord(t *testing.T) { "G": "G", "H": "H", } + var byteArrayRecordFixture = map[interface{}]interface{}{ + "label": "label", + "outer": []byte("foo"), + "map": map[interface{}]interface{}{ + "inner": []byte("bar"), + }, + } + var mixedTypesRecordFixture = map[interface{}]interface{}{ + "label": "label", + "int": 42, + "float": 42.42, + "array": []interface{}{42, 42.42, "foo"}, + "map": map[interface{}]interface{}{ + "nested": map[interface{}]interface{}{ + "foo": "bar", + "invalid": []byte("a\xc5z"), + }, + }, + } tests := []struct { name string @@ -69,6 +88,8 @@ func Test_loki_sendRecord(t *testing.T) { {"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `bar=500`, now}}, {"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `500`, now}}, {"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}}, + {"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"map":{"inner":"bar"},"outer":"foo"}`, now}}, + {"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, now}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -232,69 +253,24 @@ func Test_labelMapping(t *testing.T) { map[string]interface{}{}, model.LabelSet{}, }, - { - "bytes string", - map[string]interface{}{ - "kubernetes": map[interface{}]interface{}{ - "foo": []byte("buzz"), - }, - "stream": "stderr", - }, - map[string]interface{}{ - "kubernetes": map[string]interface{}{ - "foo": "test", - "nonexisting": "", - }, - "stream": "output", - "nope": "nope", - }, - model.LabelSet{"test": "buzz", "output": "stderr"}, - }, - { - "numeric label", - map[string]interface{}{ - "kubernetes": map[interface{}]interface{}{ - "integer": 42, - "floating_point": 42.42, - }, - "stream": "stderr", - }, - map[string]interface{}{ - "kubernetes": map[string]interface{}{ - "integer": "integer", - "floating_point": "floating_point", - }, - "stream": "output", - "nope": "nope", - }, - model.LabelSet{"integer": "42", "floating_point": "42.42", "output": "stderr"}, - }, - { - "list label", - map[string]interface{}{ - "kubernetes": map[interface{}]interface{}{ - "integers": []int{42, 43}, - }, - }, - map[string]interface{}{ - "kubernetes": map[string]interface{}{ - "integers": "integers", - }, - }, - model.LabelSet{"integers": "[42 43]"}, - }, { "deep string", map[string]interface{}{ - "kubernetes": map[interface{}]interface{}{ - "label": map[interface{}]interface{}{ - "component": map[interface{}]interface{}{ + "int": "42", + "float": "42.42", + "array": `[42,42.42,"foo"]`, + "kubernetes": map[string]interface{}{ + "label": map[string]interface{}{ + "component": map[string]interface{}{ "buzz": "value", }, }, }, }, map[string]interface{}{ + "int": "int", + "float": "float", + "array": "array", "kubernetes": map[string]interface{}{ "label": map[string]interface{}{ "component": map[string]interface{}{ @@ -305,34 +281,12 @@ func Test_labelMapping(t *testing.T) { "stream": "output", "nope": "nope", }, - model.LabelSet{"label": "value"}, - }, - { - "skip invalid values", - map[string]interface{}{ - "kubernetes": map[interface{}]interface{}{ - "annotations": map[interface{}]interface{}{ - "kubernetes.io/config.source": "cfg", - "kubernetes.io/config.hash": []byte("a\xc5z"), - }, - "container_name": "loki", - "namespace_name": "dev", - "pod_name": "loki-asdwe", - }, - }, - map[string]interface{}{ - "kubernetes": map[string]interface{}{ - "annotations": map[string]interface{}{ - "kubernetes.io/config.source": "source", - "kubernetes.io/config.hash": "hash", - }, - "container_name": "container", - "namespace_name": "namespace", - "pod_name": "instance", - }, - "stream": "output", + model.LabelSet{ + "int": "42", + "float": "42.42", + "array": `[42,42.42,"foo"]`, + "label": "value", }, - model.LabelSet{"container": "loki", "instance": "loki-asdwe", "namespace": "dev", "source": "cfg"}, }, } for _, tt := range tests {