Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fluent-bit: sorted JSON and properly convert []byte to string #1310

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type config struct {
labelKeys []string
lineFormat format
dropSingleKey bool
labeMap map[string]interface{}
labelMap map[string]interface{}
}

func parseConfig(cfg ConfigGetter) (*config, error) {
Expand Down Expand Up @@ -139,7 +139,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if err != nil {
return nil, fmt.Errorf("failed to open LabelMap file: %s", err)
}
if err := json.Unmarshal(content, &res.labeMap); err != nil {
if err := json.Unmarshal(content, &res.labelMap); err != nil {
return nil, fmt.Errorf("failed to Unmarshal LabelMap file: %s", err)
}
res.labelKeys = nil
Expand Down
6 changes: 3 additions & 3 deletions cmd/fluent-bit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func Test_parseConfig(t *testing.T) {
labelKeys: nil,
removeKeys: []string{"buzz", "fuzz"},
dropSingleKey: false,
labeMap: map[string]interface{}{
labelMap: map[string]interface{}{
"kubernetes": map[string]interface{}{
"container_name": "container",
"host": "host",
Expand Down Expand Up @@ -160,8 +160,8 @@ func assertConfig(t *testing.T, expected, actual *config) {
if expected.logLevel.String() != actual.logLevel.String() {
t.Errorf("incorrect logLevel want:%v got:%v", expected.logLevel.String(), actual.logLevel.String())
}
if !reflect.DeepEqual(expected.labeMap, actual.labeMap) {
t.Errorf("incorrect labeMap want:%v got:%v", expected.labeMap, actual.labeMap)
if !reflect.DeepEqual(expected.labelMap, actual.labelMap) {
t.Errorf("incorrect labelMap want:%v got:%v", expected.labelMap, actual.labelMap)
}
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
records := toStringMap(r)
level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records))
lbs := model.LabelSet{}
if l.cfg.labeMap != nil {
mapLabels(records, l.cfg.labeMap, lbs)
if l.cfg.labelMap != nil {
mapLabels(records, l.cfg.labelMap, lbs)
} else {
lbs = extractLabels(records, l.cfg.labelKeys)
}
Expand Down Expand Up @@ -72,6 +72,8 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} {
case []byte:
// prevent encoding to base64
m[key] = string(t)
case map[interface{}]interface{}:
m[key] = toStringMap(t)
default:
m[key] = v
}
Expand Down Expand Up @@ -106,10 +108,9 @@ func mapLabels(records map[string]interface{}, mapping map[string]interface{}, r
switch nextKey := v.(type) {
// if the next level is a map we are expecting we need to move deeper in the tree
case map[string]interface{}:
if nextValue, ok := records[k].(map[interface{}]interface{}); ok {
recordsMap := toStringMap(nextValue)
if nextValue, ok := records[k].(map[string]interface{}); ok {
// recursively search through the next level map.
mapLabels(recordsMap, nextKey, res)
mapLabels(nextValue, nextKey, res)
}
// we found a value in the mapping meaning we need to save the corresponding record value for the given key.
case string:
Expand Down Expand Up @@ -147,7 +148,7 @@ func removeKeys(records map[string]interface{}, keys []string) {
func createLine(records map[string]interface{}, f format) (string, error) {
switch f {
case jsonFormat:
js, err := jsoniter.Marshal(records)
js, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(records)
if err != nil {
return "", err
}
Expand Down
160 changes: 66 additions & 94 deletions cmd/fluent-bit/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,61 @@ func (r *recorder) Stop() {}
var now = time.Now()

func Test_loki_sendRecord(t *testing.T) {
var recordFixture = map[interface{}]interface{}{
var simpleRecordFixture = map[interface{}]interface{}{
"foo": "bar",
"bar": 500,
"error": make(chan struct{}),
}
var mapRecordFixture = map[interface{}]interface{}{
// lots of key/value pairs in map to increase chances of test hitting in case of unsorted map marshalling
"A": "A",
"B": "B",
"C": "C",
"D": "D",
"E": "E",
"F": "F",
"G": "G",
"H": "H",
}
var byteArrayRecordFixture = map[interface{}]interface{}{
"label": "label",
"outer": []byte("foo"),
"map": map[interface{}]interface{}{
"inner": []byte("bar"),
},
}
var mixedTypesRecordFixture = map[interface{}]interface{}{
"label": "label",
"int": 42,
"float": 42.42,
"array": []interface{}{42, 42.42, "foo"},
"map": map[interface{}]interface{}{
"nested": map[interface{}]interface{}{
"foo": "bar",
"invalid": []byte("a\xc5z"),
},
},
}

tests := []struct {
name string
cfg *config
want *entry
name string
cfg *config
record map[interface{}]interface{}
want *entry
}{
{"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, nil},
{"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}},
{"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `{"bar":500}`, now}},
{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, nil},
{"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `bar=500`, now}},
{"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, &entry{model.LabelSet{}, `500`, now}},
{"labelmap", &config{labeMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}},
{"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, now}},
{"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `B=B C=C D=D E=E F=F G=G H=H`, now}},
{"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, nil},
{"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}},
{"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500}`, now}},
// jsoniter.ConfigCompatibleWithStandardLibrary has an issue passing errors from inside a map:
// https://github.com/json-iterator/go/issues/388 -- fix already proposed upstream (#422)
//{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500,"error":}`, now}},
{"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `bar=500`, now}},
{"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `500`, now}},
{"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}},
{"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"map":{"inner":"bar"},"outer":"foo"}`, now}},
{"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, now}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -62,7 +99,7 @@ func Test_loki_sendRecord(t *testing.T) {
client: rec,
logger: logger,
}
_ = l.sendRecord(recordFixture, now)
_ = l.sendRecord(tt.record, now)
got := rec.toEntry()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("sendRecord() want:%v got:%v", tt.want, got)
Expand All @@ -82,7 +119,9 @@ func Test_createLine(t *testing.T) {

{"json", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": "bazz"}}, jsonFormat, `{"foo":"bar","bar":{"bizz":"bazz"}}`, false},
{"json with number", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}}, jsonFormat, `{"foo":"bar","bar":{"bizz":20}}`, false},
{"bad json", map[string]interface{}{"foo": make(chan interface{})}, jsonFormat, "", true},
// jsoniter.ConfigCompatibleWithStandardLibrary has an issue passing errors from inside a map:
// https://github.com/json-iterator/go/issues/388 -- fix already proposed upstream (#422)
// {"bad json", map[string]interface{}{"foo": make(chan interface{})}, jsonFormat, "", true},
{"kv", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": "bazz"}}, kvPairFormat, `bar=map[bizz:bazz] foo=bar`, false},
{"kv with number", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}, "decimal": 12.2}, kvPairFormat, `bar=map[bizz:20] decimal=12.2 foo=bar`, false},
{"kv with nil", map[string]interface{}{"foo": "bar", "bar": map[string]interface{}{"bizz": 20}, "null": nil}, kvPairFormat, `bar=map[bizz:20] foo=bar null=<nil>`, false},
Expand Down Expand Up @@ -214,69 +253,24 @@ func Test_labelMapping(t *testing.T) {
map[string]interface{}{},
model.LabelSet{},
},
{
"bytes string",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"foo": []byte("buzz"),
},
"stream": "stderr",
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"foo": "test",
"nonexisting": "",
},
"stream": "output",
"nope": "nope",
},
model.LabelSet{"test": "buzz", "output": "stderr"},
},
{
"numeric label",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"integer": 42,
"floating_point": 42.42,
},
"stream": "stderr",
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"integer": "integer",
"floating_point": "floating_point",
},
"stream": "output",
"nope": "nope",
},
model.LabelSet{"integer": "42", "floating_point": "42.42", "output": "stderr"},
},
{
"list label",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"integers": []int{42, 43},
},
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"integers": "integers",
},
},
model.LabelSet{"integers": "[42 43]"},
},
{
"deep string",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"label": map[interface{}]interface{}{
"component": map[interface{}]interface{}{
"int": "42",
"float": "42.42",
"array": `[42,42.42,"foo"]`,
"kubernetes": map[string]interface{}{
"label": map[string]interface{}{
"component": map[string]interface{}{
"buzz": "value",
},
},
},
},
map[string]interface{}{
"int": "int",
"float": "float",
"array": "array",
"kubernetes": map[string]interface{}{
"label": map[string]interface{}{
"component": map[string]interface{}{
Expand All @@ -287,34 +281,12 @@ func Test_labelMapping(t *testing.T) {
"stream": "output",
"nope": "nope",
},
model.LabelSet{"label": "value"},
},
{
"skip invalid values",
map[string]interface{}{
"kubernetes": map[interface{}]interface{}{
"annotations": map[interface{}]interface{}{
"kubernetes.io/config.source": "cfg",
"kubernetes.io/config.hash": []byte("a\xc5z"),
},
"container_name": "loki",
"namespace_name": "dev",
"pod_name": "loki-asdwe",
},
},
map[string]interface{}{
"kubernetes": map[string]interface{}{
"annotations": map[string]interface{}{
"kubernetes.io/config.source": "source",
"kubernetes.io/config.hash": "hash",
},
"container_name": "container",
"namespace_name": "namespace",
"pod_name": "instance",
},
"stream": "output",
model.LabelSet{
"int": "42",
"float": "42.42",
"array": `[42,42.42,"foo"]`,
"label": "value",
},
model.LabelSet{"container": "loki", "instance": "loki-asdwe", "namespace": "dev", "source": "cfg"},
},
}
for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labeMap))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))

plugin, err = newPlugin(conf, logger)
if err != nil {
Expand Down