Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add json_lines webhook input format #100

Merged
merged 1 commit into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ input:
# POST body envelope must be a json array "[ <entry>, <entry> ]". Log
# entry text is selected from the value of a json key determined by
# webhook_json_selector.
# json_lines: Webhook POST body contains multiple json log entries, with
# newline-separated log lines holding an individual json object. JSON
# object itself may not contain newlines. For example:
# example:
# { app="foo", stage="prod", log="example log message" }
# { app="bar", stage="dev", log="another line" }
# Log entry text is selected from the value of a json key determined
# by webhook_json_selector.
# Default is `text_single`
webhook_format: json_bulk

Expand Down Expand Up @@ -361,7 +369,7 @@ labels:
If you don't want the full path but only the file name, you can use the `base` template function, see next section.

#### extra
The `extra` variable is always present for input type `webhook` with format being either `json_single` or `json_bulk`.
The `extra` variable is always present for input type `webhook` with format being either `json_single`, `json_lines` or `json_bulk`.
It contains the entire JSON object that was parsed.
You can use it like this:

Expand Down
4 changes: 2 additions & 2 deletions config/v3/configV3.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (c *InputConfig) validate() error {
} else if c.WebhookPath[0] != '/' {
return fmt.Errorf("invalid input configuration: 'input.webhook_path' must start with \"/\"")
}
if c.WebhookFormat != "text_single" && c.WebhookFormat != "text_bulk" && c.WebhookFormat != "json_single" && c.WebhookFormat != "json_bulk" {
return fmt.Errorf("invalid input configuration: 'input.webhook_format' must be \"text_single|text_bulk|json_single|json_bulk\"")
if c.WebhookFormat != "text_single" && c.WebhookFormat != "text_bulk" && c.WebhookFormat != "json_single" && c.WebhookFormat != "json_bulk" && c.WebhookFormat != "json_lines" {
return fmt.Errorf("invalid input configuration: 'input.webhook_format' must be \"text_single|text_bulk|json_single|json_bulk|json_lines\"")
}
if c.WebhookJsonSelector == "" {
return fmt.Errorf("invalid input configuration: 'input.webhook_json_selector' is required for input type \"webhook\"")
Expand Down
8 changes: 8 additions & 0 deletions example/config_logstash_http_input_ipv6.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ input:
# json_single: Webhook POST body is a single json log entry. Log entry
# text is selected from the value of a json key determined by
# webhook_json_selector.
# json_lines: Webhook POST body contains multiple json log entries, with
# newline-separated log lines holding an individual json object. JSON
# object itself may not contain newlines. For example:
# example:
# { app="foo", stage="prod", log="example log message" }
# { app="bar", stage="dev", log="another line" }
# Log entry text is selected from the value of a json key determined
# by webhook_json_selector.
# json_bulk: Webhook POST body contains multiple json log entries. The
# POST body envelope must be a json array "[ <entry>, <entry> ]". Log
# entry text is selected from the value of a json key determined by
Expand Down
28 changes: 28 additions & 0 deletions tailer/webhookTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tailer

import (
"bytes"
"errors"
"fmt"
json "github.com/bitly/go-simplejson"
Expand Down Expand Up @@ -142,6 +143,33 @@ func WebhookProcessBody(c *configuration.InputConfig, b []byte) []context_string
break
}
strs = append(strs, context_string{line: s, extra: j.MustMap()})
case "json_lines":
if len(c.WebhookJsonSelector) == 0 || c.WebhookJsonSelector[0] != '.' {
logrus.Errorf("%v: invalid webhook json selector", c.WebhookJsonSelector)
break
}

for _, split := range bytes.Split(b, []byte("\n")) {
if len(split) == 0 {
continue
}
j, err := json.NewJson(split)
if err != nil {
logrus.WithFields(logrus.Fields{
"post_body": string(b),
}).Warn("Unable to Parse JSON")
break
}
s, err := processPath(j, c.WebhookJsonSelector)
if err != nil {
logrus.WithFields(logrus.Fields{
"post_body": string(b),
"webhook_json_selector": c.WebhookJsonSelector,
}).Warn("Unable to find selector path")
break
}
strs = append(strs, context_string{line: s, extra: j.MustMap()})
}
case "json_bulk":
if len(c.WebhookJsonSelector) == 0 || c.WebhookJsonSelector[0] != '.' {
logrus.Errorf("%v: invalid webhook json selector", c.WebhookJsonSelector)
Expand Down
36 changes: 36 additions & 0 deletions tailer/webhookTailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,42 @@ func TestWebhookJsonBulk(t *testing.T) {
}
}

func TestWebhookJsonLines(t *testing.T) {
// This test follows the format of Logstash HTTP Non-Bulk Output
// https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html
// format="json_batch"

c := &configuration.InputConfig{
Type: "webhook",
WebhookPath: "/webhook",
WebhookFormat: "json_lines",
WebhookJsonSelector: ".message",
WebhookTextBulkSeparator: "",
}

messages := []string{
"2016-04-18 09:33:27 H=(85.214.241.101) [114.37.190.56] F=<z2007tw@yahoo.com.tw> rejected RCPT <alan.a168@msa.hinet.net>: relay not permitted",
"2016-04-18 12:28:04 H=(85.214.241.101) [118.161.243.219] F=<z2007tw@yahoo.com.tw> rejected RCPT <alan.a168@msa.hinet.net>: relay not permitted",
"2016-04-18 19:16:30 H=(85.214.241.101) [114.24.5.12] F=<z2007tw@yahoo.com.tw> rejected RCPT <alan.a168@msa.hinet.net>: relay not permitted",
}

blobs := []string{}
for _, message := range messages {
blobs = append(blobs, strings.Replace(createJsonBlob(message), "\n", " ", -1))
}
s := strings.Join(blobs, "\n")
fmt.Printf("Sending Payload: %v", s)
lines := WebhookProcessBody(c, []byte(s))
if len(lines) != len(messages) {
t.Fatal("Expected number of lines to equal number of messages")
}
for i := range messages {
if messages[i] != lines[i].line {
t.Fatal("Expected line to match")
}
}
}

func TestWebhookJsonBulkNegativeMalformedJson(t *testing.T) {
// This test follows the format of Logstash HTTP Non-Bulk Output
// https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html
Expand Down