diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2eaf3d282e23..fcced9f93ad2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -311,6 +311,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286] - Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505] - Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701] +- Add support to set the document id in the json reader. {pull}5844[5844] *Heartbeat* diff --git a/filebeat/docs/inputs/input-common-harvester-options.asciidoc b/filebeat/docs/inputs/input-common-harvester-options.asciidoc index 8f161006416e..c7f053dfdc01 100644 --- a/filebeat/docs/inputs/input-common-harvester-options.asciidoc +++ b/filebeat/docs/inputs/input-common-harvester-options.asciidoc @@ -191,6 +191,10 @@ must be at the top level in the JSON object and the value associated with the key must be a string, otherwise no filtering or multiline aggregation will occur. +*`document_id`*:: Option configuration setting that specifies the JSON key to +set the document id. If configured, the field will be removed from the original +json document and stored in `@metadata.id` + *`ignore_decoding_error`*:: An optional configuration setting that specifies if JSON decoding errors should be logged or not. If set to true, errors will not be logged. The default is false. diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 8abe9fcf559c..fd5c8c8dd6d0 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -360,15 +360,21 @@ func (h *Harvester) onMessage( jsonFields = f.(common.MapStr) } + var meta common.MapStr timestamp := message.Ts - if h.config.JSON != nil && len(jsonFields) > 0 { - ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) + id, ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) if !ts.IsZero() { // there was a `@timestamp` key in the event, so overwrite // the resulting timestamp timestamp = ts } + + if id != "" { + meta = common.MapStr{ + "id": id, + } + } } else if &text != nil { if fields == nil { fields = common.MapStr{} @@ -379,6 +385,7 @@ func (h *Harvester) onMessage( err := forwarder.Send(beat.Event{ Timestamp: timestamp, Fields: fields, + Meta: meta, Private: state, }) return err == nil diff --git a/filebeat/tests/files/logs/json_id.log b/filebeat/tests/files/logs/json_id.log new file mode 100644 index 000000000000..283db704b1dd --- /dev/null +++ b/filebeat/tests/files/logs/json_id.log @@ -0,0 +1,3 @@ +{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": "test", "id": "0"} +{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": 5, "id": "1"} +{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": {"hello": "shouldn't work"}, "id": "2"} diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 938e9e6a9c63..0c48b34ebc24 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -76,6 +76,7 @@ filebeat.{{input_config | default("inputs")}}: {% if json.overwrite_keys %}overwrite_keys: true{% endif %} {% if json.add_error_key %}add_error_key: true{% endif %} {% if json.ignore_decoding_error %}ignore_decoding_error: true{% endif %} + {% if json.document_id %}document_id: {{json.document_id}}{% endif %} {% endif %} {% if multiline %} diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index 0fec0c680b51..04794e5ccee2 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -230,6 +230,34 @@ def test_type_in_message(self): assert output[2]["error.message"] == \ "type not overwritten (not string)" + def test_id_in_message(self): + """ + Extract document ID from json contents. + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + json=dict( + message_key="msg", + document_id="id", + ), + ) + os.mkdir(self.working_dir + "/log/") + self.copy_files(["logs/json_id.log"], + target_dir="log") + proc = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + proc.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 3 + for i in xrange(len(output)): + assert("@metadata.id" in output[i]) + assert(output[i]["@metadata.id"] == str(i)) + assert("json.id" not in output[i]) + def test_with_generic_filtering(self): """ It should work fine to combine JSON decoding with diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index 72bea9cea359..a669b3f49ae0 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -114,7 +114,7 @@ func createJSONError(message string) common.MapStr { // respecting the KeysUnderRoot and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always // takes precedence. -func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) time.Time { +func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) (string, time.Time) { // The message key might have been modified by multiline if len(config.MessageKey) > 0 && text != nil { jsonFields[config.MessageKey] = *text @@ -127,6 +127,16 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, data["message"] = *text } + var id string + if key := config.DocumentID; key != "" { + if tmp, err := jsonFields.GetValue(key); err == nil { + if v, ok := tmp.(string); ok { + id = v + jsonFields.Delete(key) + } + } + } + if config.KeysUnderRoot { // Delete existing json key delete(data, "json") @@ -147,7 +157,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, } jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys, config.AddErrorKey) - return event.Timestamp + return id, event.Timestamp } - return time.Time{} + return id, time.Time{} } diff --git a/libbeat/reader/readjson/json_config.go b/libbeat/reader/readjson/json_config.go index a95b9db08447..5469f00a3c6a 100644 --- a/libbeat/reader/readjson/json_config.go +++ b/libbeat/reader/readjson/json_config.go @@ -20,6 +20,7 @@ package readjson // Config holds the options a JSON reader. type Config struct { MessageKey string `config:"message_key"` + DocumentID string `config:"document_id"` KeysUnderRoot bool `config:"keys_under_root"` OverwriteKeys bool `config:"overwrite_keys"` AddErrorKey bool `config:"add_error_key"` diff --git a/libbeat/reader/readjson/json_test.go b/libbeat/reader/readjson/json_test.go index 5f4deebd2bee..d6f82585e492 100644 --- a/libbeat/reader/readjson/json_test.go +++ b/libbeat/reader/readjson/json_test.go @@ -199,17 +199,15 @@ func TestAddJSONFields(t *testing.T) { now := time.Now().UTC() - tests := []struct { - Name string + tests := map[string]struct { Data common.MapStr Text *string JSONConfig Config ExpectedItems common.MapStr ExpectedTimestamp time.Time + ExpectedID string }{ - { - // by default, don't overwrite keys - Name: "default: do not overwrite", + "default: do not overwrite": { Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true}, @@ -219,9 +217,7 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { - // overwrite keys if asked - Name: "overwrite keys if configured", + "overwrite keys if configured": { Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true}, @@ -231,9 +227,8 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + "use json namespace w/o keys_under_root": { // without keys_under_root, put everything in a json key - Name: "use json namespace w/o keys_under_root", Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}}, Text: &text, JSONConfig: Config{}, @@ -242,9 +237,9 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + + "write result to message_key field": { // when MessageKey is defined, the Text overwrites the value of that key - Name: "write result to message_key field", Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hi"}}, Text: &text, JSONConfig: Config{MessageKey: "text"}, @@ -254,10 +249,9 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + "parse @timestamp": { // when @timestamp is in JSON and overwrite_keys is true, parse it // in a common.Time - Name: "parse @timestamp", Data: common.MapStr{"@timestamp": now, "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true}, @@ -266,10 +260,9 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time(common.MustParseTime("2016-04-05T18:47:18.444Z")), }, - { + "fail to parse @timestamp": { // when the parsing on @timestamp fails, leave the existing value and add an error key // in a common.Time - Name: "fail to parse @timestamp", Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true}, @@ -279,10 +272,10 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + + "wrong @timestamp format": { // when the @timestamp has the wrong type, leave the existing value and add an error key // in a common.Time - Name: "wrong @timestamp format", Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": 42}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true}, @@ -292,9 +285,8 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + "ignore non-string type field": { // if overwrite_keys is true, but the `type` key in json is not a string, ignore it - Name: "ignore non-string type field", Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": 42}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true}, @@ -304,9 +296,9 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + + "ignore empty type field": { // if overwrite_keys is true, but the `type` key in json is empty, ignore it - Name: "ignore empty type field", Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": ""}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true}, @@ -316,9 +308,8 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { + "ignore type names starting with underscore": { // if overwrite_keys is true, but the `type` key in json starts with _, ignore it - Name: "ignore type names starting with underscore", Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "_type"}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: true}, @@ -328,9 +319,7 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, - { - // if AddErrorKey is false, err should not be set. - Name: "ignore type names starting with underscore", + "do not set error if AddErrorKey is false": { Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "_type"}}, Text: &text, JSONConfig: Config{KeysUnderRoot: true, OverwriteKeys: true, AddErrorKey: false}, @@ -340,22 +329,35 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, + "extract event id": { + // if document_id is set, extract the ID from the event + Data: common.MapStr{"@timestamp": common.Time(now), "json": common.MapStr{"id": "test_id"}}, + JSONConfig: Config{DocumentID: "id"}, + ExpectedID: "test_id", + }, + "extract event id with wrong type": { + // if document_id is set, extract the ID from the event + Data: common.MapStr{"@timestamp": common.Time(now), "json": common.MapStr{"id": 42}}, + JSONConfig: Config{DocumentID: "id"}, + ExpectedID: "", + }, } - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { + for name, test := range tests { + t.Run(name, func(t *testing.T) { var jsonFields common.MapStr if fields, ok := test.Data["json"]; ok { jsonFields = fields.(common.MapStr) } - ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig) + id, ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig) t.Log("Executing test:", test) for k, v := range test.ExpectedItems { assert.Equal(t, v, test.Data[k]) } assert.Equal(t, test.ExpectedTimestamp, ts) + assert.Equal(t, test.ExpectedID, id) }) } }