From 04a93b78803f95f5ad13b1a37bc93b6eb1489df0 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 21 Oct 2019 20:22:14 +0200 Subject: [PATCH] promtail: fix handling of JMESPath expression returning nil while parsing JSON (#1179) --- pkg/logentry/stages/json.go | 2 + pkg/logentry/stages/json_test.go | 2 + pkg/logentry/stages/labels.go | 3 +- pkg/logentry/stages/pipeline_test.go | 89 +++++++++++++++++++++------- pkg/logentry/stages/util_test.go | 3 + 5 files changed, 74 insertions(+), 25 deletions(-) diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index c57abc57a9e90..46b453ca6c18b 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -151,6 +151,8 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac extracted[n] = r case bool: extracted[n] = r + case nil: + extracted[n] = nil default: // If the value wasn't a string or a number, marshal it back to json jm, err := json.Marshal(r) diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index c5756f08e536d..64a41b1ffd016 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -21,6 +21,7 @@ pipeline_stages: app: nested: duration: + unknown: ` var testJSONYamlMultiStageWithSource = ` @@ -63,6 +64,7 @@ func TestPipeline_JSON(t *testing.T) { "app": "loki", "nested": "{\"child\":\"value\"}", "duration": float64(125), + "unknown": nil, }, }, "successfully run a pipeline with 2 json stages with source": { diff --git a/pkg/logentry/stages/labels.go b/pkg/logentry/stages/labels.go index a556cafc4dfb0..f7dd828fbc55a 100644 --- a/pkg/logentry/stages/labels.go +++ b/pkg/logentry/stages/labels.go @@ -64,8 +64,7 @@ type labelStage struct { // Process implements Stage func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for lName, lSrc := range l.cfgs { - if _, ok := extracted[*lSrc]; ok { - lValue := extracted[*lSrc] + if lValue, ok := extracted[*lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go index fc8023b38cb68..6a01fe5fadaed 100644 --- a/pkg/logentry/stages/pipeline_test.go +++ b/pkg/logentry/stages/pipeline_test.go @@ -20,7 +20,7 @@ var ( processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"` ) -var testYaml = ` +var testMultiStageYaml = ` pipeline_stages: - match: selector: "{match=\"true\"}" @@ -40,6 +40,18 @@ pipeline_stages: status_code: "status" ` +var testLabelsFromJSONYaml = ` +pipeline_stages: +- json: + expressions: + app: + message: +- labels: + app: +- output: + source: message +` + func loadConfig(yml string) PipelineStages { var config map[string]interface{} err := yaml.Unmarshal([]byte(yml), &config) @@ -51,38 +63,32 @@ func loadConfig(yml string) PipelineStages { func TestNewPipeline(t *testing.T) { - p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil, prometheus.DefaultRegisterer) + p, err := NewPipeline(util.Logger, loadConfig(testMultiStageYaml), nil, prometheus.DefaultRegisterer) if err != nil { panic(err) } require.Equal(t, 1, len(p.stages)) } -func TestPipeline_MultiStage(t *testing.T) { +func TestPipeline_Process(t *testing.T) { + t.Parallel() + est, err := time.LoadLocation("America/New_York") if err != nil { t.Fatal("could not parse timestamp", err) } - var config map[string]interface{} - err = yaml.Unmarshal([]byte(testYaml), &config) - if err != nil { - panic(err) - } - p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer) - if err != nil { - panic(err) - } - tests := map[string]struct { + config string entry string expectedEntry string t time.Time expectedT time.Time - labels model.LabelSet + initialLabels model.LabelSet expectedLabels model.LabelSet }{ "happy path": { + testMultiStageYaml, rawTestLine, processedTestLine, time.Now(), @@ -98,6 +104,7 @@ func TestPipeline_MultiStage(t *testing.T) { }, }, "no match": { + testMultiStageYaml, rawTestLine, rawTestLine, ct, @@ -110,6 +117,7 @@ func TestPipeline_MultiStage(t *testing.T) { }, }, "should initialize the extracted map with the initial labels": { + testMultiStageYaml, rawTestLine, processedTestLine, time.Now(), @@ -127,16 +135,53 @@ func TestPipeline_MultiStage(t *testing.T) { "status_code": "200", }, }, + "should set a label from value extracted from JSON": { + testLabelsFromJSONYaml, + `{"message":"hello world","app":"api"}`, + "hello world", + ct, + ct, + map[model.LabelName]model.LabelValue{}, + map[model.LabelName]model.LabelValue{ + "app": "api", + }, + }, + "should not set a label if the field does not exist in the JSON": { + testLabelsFromJSONYaml, + `{"message":"hello world"}`, + "hello world", + ct, + ct, + map[model.LabelName]model.LabelValue{}, + map[model.LabelName]model.LabelValue{}, + }, + "should not set a label if the value extracted from JSON is null": { + testLabelsFromJSONYaml, + `{"message":"hello world","app":null}`, + "hello world", + ct, + ct, + map[model.LabelName]model.LabelValue{}, + map[model.LabelName]model.LabelValue{}, + }, } for tName, tt := range tests { tt := tt + t.Run(tName, func(t *testing.T) { - t.Parallel() + var config map[string]interface{} + + err := yaml.Unmarshal([]byte(tt.config), &config) + require.NoError(t, err) + + p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer) + require.NoError(t, err) + extracted := map[string]interface{}{} - p.Process(tt.labels, extracted, &tt.t, &tt.entry) + p.Process(tt.initialLabels, extracted, &tt.t, &tt.entry) - assert.Equal(t, tt.expectedLabels, tt.labels, "did not get expected labels") + assert.Equal(t, tt.expectedLabels, tt.initialLabels, "did not get expected labels") assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") if tt.t.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) @@ -146,9 +191,7 @@ func TestPipeline_MultiStage(t *testing.T) { } var ( - l = log.NewNopLogger() - //w = log.NewSyncWriter(os.Stdout) - //l = log.NewLogfmtLogger(w) + l = log.NewNopLogger() infoLogger = level.NewFilter(l, level.AllowInfo()) debugLogger = level.NewFilter(l, level.AllowDebug()) ) @@ -162,13 +205,13 @@ func BenchmarkPipeline(b *testing.B) { }{ { "two stage info level", - loadConfig(testYaml), + loadConfig(testMultiStageYaml), infoLogger, rawTestLine, }, { "two stage debug level", - loadConfig(testYaml), + loadConfig(testMultiStageYaml), debugLogger, rawTestLine, }, @@ -202,7 +245,7 @@ func (s *stubHandler) Handle(labels model.LabelSet, time time.Time, entry string func TestPipeline_Wrap(t *testing.T) { now := time.Now() var config map[string]interface{} - err := yaml.Unmarshal([]byte(testYaml), &config) + err := yaml.Unmarshal([]byte(testMultiStageYaml), &config) if err != nil { panic(err) } diff --git a/pkg/logentry/stages/util_test.go b/pkg/logentry/stages/util_test.go index 94750826ada40..5cf5e6c9cac86 100644 --- a/pkg/logentry/stages/util_test.go +++ b/pkg/logentry/stages/util_test.go @@ -65,6 +65,9 @@ func TestGetString(t *testing.T) { assert.Equal(t, "1", s64) assert.Equal(t, "2.02", s32) assert.Equal(t, "1562723913000", s64_1) + + _, err = getString(nil) + assert.Error(t, err) } var (