diff --git a/cmd/fluent-bit/loki.go b/cmd/fluent-bit/loki.go index 5a3fec1a34745..8c7564cea7a05 100644 --- a/cmd/fluent-bit/loki.go +++ b/cmd/fluent-bit/loki.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "errors" "fmt" "os" "sort" @@ -41,7 +42,10 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error { level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records)) lbs := model.LabelSet{} if l.cfg.autoKubernetesLabels { - lbs = autoLabels(records) + err := autoLabels(records, lbs) + if err != nil { + level.Error(l.logger).Log("msg", err.Error(), "records", fmt.Sprintf("%+v", records)) + } } else if l.cfg.labelMap != nil { mapLabels(records, l.cfg.labelMap, lbs) } else { @@ -65,7 +69,6 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error { func toStringMap(record map[interface{}]interface{}) map[string]interface{} { m := make(map[string]interface{}) - for k, v := range record { key, ok := k.(string) if !ok { @@ -81,36 +84,32 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} { m[key] = v } } + return m } -func autoLabels(records map[string]interface{}) model.LabelSet { - kuberneteslbs := model.LabelSet{} +func autoLabels(records map[string]interface{}, kuberneteslbs model.LabelSet) error { + kube, ok := records["kubernetes"] + if !ok { + return errors.New("kubernetes labels not found, no labels will be added") + } + replacer := strings.NewReplacer("/", "_", ".", "_", "-", "_") - for k, v := range records["kubernetes"].(map[interface{}]interface{}) { - switch key := k.(string); key { + for k, v := range kube.(map[string]interface{}) { + switch k { case "labels": - for m, n := range v.(map[interface{}]interface{}) { - switch t := n.(type) { - case []byte: - kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(string(t)) - default: - kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(fmt.Sprintf("%v", n)) - } + for m, n := range v.(map[string]interface{}) { + kuberneteslbs[model.LabelName(replacer.Replace(m))] = model.LabelValue(fmt.Sprintf("%v", n)) } case "docker_id", "pod_id", "annotations": // do nothing continue default: - switch t := v.(type) { - case []byte: - kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(string(t)) - default: - kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(fmt.Sprintf("%v", v)) - } + kuberneteslbs[model.LabelName(k)] = model.LabelValue(fmt.Sprintf("%v", v)) } } - return kuberneteslbs + + return nil } func extractLabels(records map[string]interface{}, keys []string) model.LabelSet { diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index b4af527c26f19..630c969890ac7 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -1,6 +1,7 @@ package main import ( + "errors" "reflect" "testing" "time" @@ -299,3 +300,64 @@ func Test_labelMapping(t *testing.T) { }) } } + +func Test_AutoKubernetesLabels(t *testing.T) { + tests := []struct { + name string + records map[interface{}]interface{} + want model.LabelSet + err error + }{ + { + "records without labels", + map[interface{}]interface{}{ + "kubernetes": map[interface{}]interface{}{ + "foo": []byte("buzz"), + }, + }, + model.LabelSet{ + "foo": "buzz", + }, + nil, + }, + { + "records with labels", + map[interface{}]interface{}{ + "kubernetes": map[string]interface{}{ + "labels": map[string]interface{}{ + "foo": "bar", + "buzz": "value", + }, + }, + }, + model.LabelSet{ + "foo": "bar", + "buzz": "value", + }, + nil, + }, + { + "records without kubernetes labels", + map[interface{}]interface{}{ + "foo": "bar", + "label": "value", + }, + model.LabelSet{}, + errors.New("kubernetes labels not found, no labels will be added"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := toStringMap(tt.records) + lbs := model.LabelSet{} + err := autoLabels(m, lbs) + if err != nil && err.Error() != tt.err.Error() { + t.Errorf("error in autolabels, error = %v", err) + return + } + if !reflect.DeepEqual(lbs, tt.want) { + t.Errorf("mapLabels() = %v, want %v", lbs, tt.want) + } + }) + } +}