Skip to content

Commit

Permalink
Add support to set the document ID in the filebeat json reader (#5844)
Browse files Browse the repository at this point in the history
* Add support to set the document ID in the filebeat json reader

Add support to configure a key for setting the document ID in
the harvester JSON settings. The ID will be store in the events
Meta["id"] for the output to pick up. With #5811 will the elasticsearch
output is the Meta["id"] field to set the document its ID (uses
op_type="create" to count duplicate inserts of same ID). For other
output type, the document ID will be forwarded via `@metadata.id`.
  • Loading branch information
Steffen Siering authored Sep 19, 2019
1 parent e598d6c commit 7e25933
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286]
- Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505]
- Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701]
- Add support to set the document id in the json reader. {pull}5844[5844]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/docs/inputs/input-common-harvester-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ must be at the top level in the JSON object and the value associated with the
key must be a string, otherwise no filtering or multiline aggregation will
occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
json document and stored in `@metadata.id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
be logged. The default is false.
Expand Down
11 changes: 9 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,21 @@ func (h *Harvester) onMessage(
jsonFields = f.(common.MapStr)
}

var meta common.MapStr
timestamp := message.Ts

if h.config.JSON != nil && len(jsonFields) > 0 {
ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
id, ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
if !ts.IsZero() {
// there was a `@timestamp` key in the event, so overwrite
// the resulting timestamp
timestamp = ts
}

if id != "" {
meta = common.MapStr{
"id": id,
}
}
} else if &text != nil {
if fields == nil {
fields = common.MapStr{}
Expand All @@ -379,6 +385,7 @@ func (h *Harvester) onMessage(
err := forwarder.Send(beat.Event{
Timestamp: timestamp,
Fields: fields,
Meta: meta,
Private: state,
})
return err == nil
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/files/logs/json_id.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": "test", "id": "0"}
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": 5, "id": "1"}
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": {"hello": "shouldn't work"}, "id": "2"}
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ filebeat.{{input_config | default("inputs")}}:
{% if json.overwrite_keys %}overwrite_keys: true{% endif %}
{% if json.add_error_key %}add_error_key: true{% endif %}
{% if json.ignore_decoding_error %}ignore_decoding_error: true{% endif %}
{% if json.document_id %}document_id: {{json.document_id}}{% endif %}
{% endif %}

{% if multiline %}
Expand Down
28 changes: 28 additions & 0 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,34 @@ def test_type_in_message(self):
assert output[2]["error.message"] == \
"type not overwritten (not string)"

def test_id_in_message(self):
"""
Extract document ID from json contents.
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
message_key="msg",
document_id="id",
),
)
os.mkdir(self.working_dir + "/log/")
self.copy_files(["logs/json_id.log"],
target_dir="log")
proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()

assert len(output) == 3
for i in xrange(len(output)):
assert("@metadata.id" in output[i])
assert(output[i]["@metadata.id"] == str(i))
assert("json.id" not in output[i])

def test_with_generic_filtering(self):
"""
It should work fine to combine JSON decoding with
Expand Down
16 changes: 13 additions & 3 deletions libbeat/reader/readjson/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func createJSONError(message string) common.MapStr {
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) time.Time {
func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) (string, time.Time) {
// The message key might have been modified by multiline
if len(config.MessageKey) > 0 && text != nil {
jsonFields[config.MessageKey] = *text
Expand All @@ -127,6 +127,16 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string,
data["message"] = *text
}

var id string
if key := config.DocumentID; key != "" {
if tmp, err := jsonFields.GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
jsonFields.Delete(key)
}
}
}

if config.KeysUnderRoot {
// Delete existing json key
delete(data, "json")
Expand All @@ -147,7 +157,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string,
}
jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys, config.AddErrorKey)

return event.Timestamp
return id, event.Timestamp
}
return time.Time{}
return id, time.Time{}
}
1 change: 1 addition & 0 deletions libbeat/reader/readjson/json_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package readjson
// Config holds the options a JSON reader.
type Config struct {
MessageKey string `config:"message_key"`
DocumentID string `config:"document_id"`
KeysUnderRoot bool `config:"keys_under_root"`
OverwriteKeys bool `config:"overwrite_keys"`
AddErrorKey bool `config:"add_error_key"`
Expand Down
62 changes: 32 additions & 30 deletions libbeat/reader/readjson/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,15 @@ func TestAddJSONFields(t *testing.T) {

now := time.Now().UTC()

tests := []struct {
Name string
tests := map[string]struct {
Data common.MapStr
Text *string
JSONConfig Config
ExpectedItems common.MapStr
ExpectedTimestamp time.Time
ExpectedID string
}{
{
// by default, don't overwrite keys
Name: "default: do not overwrite",
"default: do not overwrite": {
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true},
Expand All @@ -219,9 +217,7 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
// overwrite keys if asked
Name: "overwrite keys if configured",
"overwrite keys if configured": {
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true},
Expand All @@ -231,9 +227,8 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
"use json namespace w/o keys_under_root": {
// without keys_under_root, put everything in a json key
Name: "use json namespace w/o keys_under_root",
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}},
Text: &text,
JSONConfig: Config{},
Expand All @@ -242,9 +237,9 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{

"write result to message_key field": {
// when MessageKey is defined, the Text overwrites the value of that key
Name: "write result to message_key field",
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hi"}},
Text: &text,
JSONConfig: Config{MessageKey: "text"},
Expand All @@ -254,10 +249,9 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
"parse @timestamp": {
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Name: "parse @timestamp",
Data: common.MapStr{"@timestamp": now, "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true},
Expand All @@ -266,10 +260,9 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time(common.MustParseTime("2016-04-05T18:47:18.444Z")),
},
{
"fail to parse @timestamp": {
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Name: "fail to parse @timestamp",
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true},
Expand All @@ -279,10 +272,10 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{

"wrong @timestamp format": {
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Name: "wrong @timestamp format",
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": 42}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true},
Expand All @@ -292,9 +285,8 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
"ignore non-string type field": {
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Name: "ignore non-string type field",
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": 42}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true},
Expand All @@ -304,9 +296,9 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{

"ignore empty type field": {
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Name: "ignore empty type field",
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": ""}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true},
Expand All @@ -316,9 +308,8 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
"ignore type names starting with underscore": {
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Name: "ignore type names starting with underscore",
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "_type"}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true},
Expand All @@ -328,9 +319,7 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
// if AddErrorKey is false, err should not be set.
Name: "ignore type names starting with underscore",
"do not set error if AddErrorKey is false": {
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "_type"}},
Text: &text,
JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: false},
Expand All @@ -340,22 +329,35 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
"extract event id": {
// if document_id is set, extract the ID from the event
Data: common.MapStr{"@timestamp": common.Time(now), "json": common.MapStr{"id": "test_id"}},
JSONConfig: Config{DocumentID: "id"},
ExpectedID: "test_id",
},
"extract event id with wrong type": {
// if document_id is set, extract the ID from the event
Data: common.MapStr{"@timestamp": common.Time(now), "json": common.MapStr{"id": 42}},
JSONConfig: Config{DocumentID: "id"},
ExpectedID: "",
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
var jsonFields common.MapStr
if fields, ok := test.Data["json"]; ok {
jsonFields = fields.(common.MapStr)
}

ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig)
id, ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig)

t.Log("Executing test:", test)
for k, v := range test.ExpectedItems {
assert.Equal(t, v, test.Data[k])
}
assert.Equal(t, test.ExpectedTimestamp, ts)
assert.Equal(t, test.ExpectedID, id)
})
}
}

0 comments on commit 7e25933

Please sign in to comment.