Skip to content

Commit

Permalink
promtail: fix handling of JMESPath expression returning nil while par…
Browse files Browse the repository at this point in the history
…sing JSON (#1179)
  • Loading branch information
pracucci authored and cyriltovena committed Oct 21, 2019
1 parent fa79c24 commit 04a93b7
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 25 deletions.
2 changes: 2 additions & 0 deletions pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
extracted[n] = r
case bool:
extracted[n] = r
case nil:
extracted[n] = nil
default:
// If the value wasn't a string or a number, marshal it back to json
jm, err := json.Marshal(r)
Expand Down
2 changes: 2 additions & 0 deletions pkg/logentry/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pipeline_stages:
app:
nested:
duration:
unknown:
`

var testJSONYamlMultiStageWithSource = `
Expand Down Expand Up @@ -63,6 +64,7 @@ func TestPipeline_JSON(t *testing.T) {
"app": "loki",
"nested": "{\"child\":\"value\"}",
"duration": float64(125),
"unknown": nil,
},
},
"successfully run a pipeline with 2 json stages with source": {
Expand Down
3 changes: 1 addition & 2 deletions pkg/logentry/stages/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ type labelStage struct {
// Process implements Stage
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for lName, lSrc := range l.cfgs {
if _, ok := extracted[*lSrc]; ok {
lValue := extracted[*lSrc]
if lValue, ok := extracted[*lSrc]; ok {
s, err := getString(lValue)
if err != nil {
if Debug {
Expand Down
89 changes: 66 additions & 23 deletions pkg/logentry/stages/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
)

var testYaml = `
var testMultiStageYaml = `
pipeline_stages:
- match:
selector: "{match=\"true\"}"
Expand All @@ -40,6 +40,18 @@ pipeline_stages:
status_code: "status"
`

var testLabelsFromJSONYaml = `
pipeline_stages:
- json:
expressions:
app:
message:
- labels:
app:
- output:
source: message
`

func loadConfig(yml string) PipelineStages {
var config map[string]interface{}
err := yaml.Unmarshal([]byte(yml), &config)
Expand All @@ -51,38 +63,32 @@ func loadConfig(yml string) PipelineStages {

func TestNewPipeline(t *testing.T) {

p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil, prometheus.DefaultRegisterer)
p, err := NewPipeline(util.Logger, loadConfig(testMultiStageYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}
require.Equal(t, 1, len(p.stages))
}

func TestPipeline_MultiStage(t *testing.T) {
func TestPipeline_Process(t *testing.T) {
t.Parallel()

est, err := time.LoadLocation("America/New_York")
if err != nil {
t.Fatal("could not parse timestamp", err)
}

var config map[string]interface{}
err = yaml.Unmarshal([]byte(testYaml), &config)
if err != nil {
panic(err)
}
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}

tests := map[string]struct {
config string
entry string
expectedEntry string
t time.Time
expectedT time.Time
labels model.LabelSet
initialLabels model.LabelSet
expectedLabels model.LabelSet
}{
"happy path": {
testMultiStageYaml,
rawTestLine,
processedTestLine,
time.Now(),
Expand All @@ -98,6 +104,7 @@ func TestPipeline_MultiStage(t *testing.T) {
},
},
"no match": {
testMultiStageYaml,
rawTestLine,
rawTestLine,
ct,
Expand All @@ -110,6 +117,7 @@ func TestPipeline_MultiStage(t *testing.T) {
},
},
"should initialize the extracted map with the initial labels": {
testMultiStageYaml,
rawTestLine,
processedTestLine,
time.Now(),
Expand All @@ -127,16 +135,53 @@ func TestPipeline_MultiStage(t *testing.T) {
"status_code": "200",
},
},
"should set a label from value extracted from JSON": {
testLabelsFromJSONYaml,
`{"message":"hello world","app":"api"}`,
"hello world",
ct,
ct,
map[model.LabelName]model.LabelValue{},
map[model.LabelName]model.LabelValue{
"app": "api",
},
},
"should not set a label if the field does not exist in the JSON": {
testLabelsFromJSONYaml,
`{"message":"hello world"}`,
"hello world",
ct,
ct,
map[model.LabelName]model.LabelValue{},
map[model.LabelName]model.LabelValue{},
},
"should not set a label if the value extracted from JSON is null": {
testLabelsFromJSONYaml,
`{"message":"hello world","app":null}`,
"hello world",
ct,
ct,
map[model.LabelName]model.LabelValue{},
map[model.LabelName]model.LabelValue{},
},
}

for tName, tt := range tests {
tt := tt

t.Run(tName, func(t *testing.T) {
t.Parallel()
var config map[string]interface{}

err := yaml.Unmarshal([]byte(tt.config), &config)
require.NoError(t, err)

p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer)
require.NoError(t, err)

extracted := map[string]interface{}{}
p.Process(tt.labels, extracted, &tt.t, &tt.entry)
p.Process(tt.initialLabels, extracted, &tt.t, &tt.entry)

assert.Equal(t, tt.expectedLabels, tt.labels, "did not get expected labels")
assert.Equal(t, tt.expectedLabels, tt.initialLabels, "did not get expected labels")
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
Expand All @@ -146,9 +191,7 @@ func TestPipeline_MultiStage(t *testing.T) {
}

var (
l = log.NewNopLogger()
//w = log.NewSyncWriter(os.Stdout)
//l = log.NewLogfmtLogger(w)
l = log.NewNopLogger()
infoLogger = level.NewFilter(l, level.AllowInfo())
debugLogger = level.NewFilter(l, level.AllowDebug())
)
Expand All @@ -162,13 +205,13 @@ func BenchmarkPipeline(b *testing.B) {
}{
{
"two stage info level",
loadConfig(testYaml),
loadConfig(testMultiStageYaml),
infoLogger,
rawTestLine,
},
{
"two stage debug level",
loadConfig(testYaml),
loadConfig(testMultiStageYaml),
debugLogger,
rawTestLine,
},
Expand Down Expand Up @@ -202,7 +245,7 @@ func (s *stubHandler) Handle(labels model.LabelSet, time time.Time, entry string
func TestPipeline_Wrap(t *testing.T) {
now := time.Now()
var config map[string]interface{}
err := yaml.Unmarshal([]byte(testYaml), &config)
err := yaml.Unmarshal([]byte(testMultiStageYaml), &config)
if err != nil {
panic(err)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/logentry/stages/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func TestGetString(t *testing.T) {
assert.Equal(t, "1", s64)
assert.Equal(t, "2.02", s32)
assert.Equal(t, "1562723913000", s64_1)

_, err = getString(nil)
assert.Error(t, err)
}

var (
Expand Down

0 comments on commit 04a93b7

Please sign in to comment.