diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 117734bbd277..7afbbf46d609 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -310,6 +310,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286] - Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505] - Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701] +- Add support to set the document id in the json reader. {pull}5844[5844] *Heartbeat* diff --git a/filebeat/docs/inputs/input-common-harvester-options.asciidoc b/filebeat/docs/inputs/input-common-harvester-options.asciidoc index 8f161006416e..c7f053dfdc01 100644 --- a/filebeat/docs/inputs/input-common-harvester-options.asciidoc +++ b/filebeat/docs/inputs/input-common-harvester-options.asciidoc @@ -191,6 +191,10 @@ must be at the top level in the JSON object and the value associated with the key must be a string, otherwise no filtering or multiline aggregation will occur. +*`document_id`*:: Option configuration setting that specifies the JSON key to +set the document id. If configured, the field will be removed from the original +json document and stored in `@metadata.id` + *`ignore_decoding_error`*:: An optional configuration setting that specifies if JSON decoding errors should be logged or not. If set to true, errors will not be logged. The default is false. diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 8abe9fcf559c..f2bed44c6deb 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -360,15 +360,21 @@ func (h *Harvester) onMessage( jsonFields = f.(common.MapStr) } + var meta common.MapStr timestamp := message.Ts - if h.config.JSON != nil && len(jsonFields) > 0 { - ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) + id, ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) if !ts.IsZero() { // there was a `@timestamp` key in the event, so overwrite // the resulting timestamp timestamp = ts } + + if id != nil { + meta = common.MapStr{ + "id": id, + } + } } else if &text != nil { if fields == nil { fields = common.MapStr{} @@ -379,6 +385,7 @@ func (h *Harvester) onMessage( err := forwarder.Send(beat.Event{ Timestamp: timestamp, Fields: fields, + Meta: meta, Private: state, }) return err == nil diff --git a/filebeat/tests/files/logs/json_id.log b/filebeat/tests/files/logs/json_id.log new file mode 100644 index 000000000000..897436026fa4 --- /dev/null +++ b/filebeat/tests/files/logs/json_id.log @@ -0,0 +1,3 @@ +{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": "test", "id": 0} +{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": 5, "id": 1} +{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": {"hello": "shouldn't work"}, "id": 2} diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 938e9e6a9c63..0c48b34ebc24 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -76,6 +76,7 @@ filebeat.{{input_config | default("inputs")}}: {% if json.overwrite_keys %}overwrite_keys: true{% endif %} {% if json.add_error_key %}add_error_key: true{% endif %} {% if json.ignore_decoding_error %}ignore_decoding_error: true{% endif %} + {% if json.document_id %}document_id: {{json.document_id}}{% endif %} {% endif %} {% if multiline %} diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index 0fec0c680b51..1f877e61e896 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -230,6 +230,35 @@ def test_type_in_message(self): assert output[2]["error.message"] == \ "type not overwritten (not string)" + def test_id_in_message(self): + """ + Extract document ID from json contents. + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + json=dict( + message_key="msg", + document_id="id", + ), + ) + os.mkdir(self.working_dir + "/log/") + self.copy_files(["logs/json_id.log"], + source_dir="../files", + target_dir="log") + proc = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + proc.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 3 + for i in xrange(len(output)): + assert("@metadata.id" in output[i]) + assert(output[i]["@metadata.id"] == i) + assert("json.id" not in output[i]) + def test_with_generic_filtering(self): """ It should work fine to combine JSON decoding with diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index 72bea9cea359..e6e055417e8b 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -114,7 +114,7 @@ func createJSONError(message string) common.MapStr { // respecting the KeysUnderRoot and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always // takes precedence. -func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) time.Time { +func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) (interface{}, time.Time) { // The message key might have been modified by multiline if len(config.MessageKey) > 0 && text != nil { jsonFields[config.MessageKey] = *text @@ -127,6 +127,14 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, data["message"] = *text } + var id interface{} + if key := config.DocumentID; key != "" { + if tmp, err := jsonFields.GetValue(key); err == nil { + id = tmp + } + jsonFields.Delete(key) + } + if config.KeysUnderRoot { // Delete existing json key delete(data, "json") @@ -147,7 +155,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, } jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys, config.AddErrorKey) - return event.Timestamp + return id, event.Timestamp } - return time.Time{} + return id, time.Time{} } diff --git a/libbeat/reader/readjson/json_config.go b/libbeat/reader/readjson/json_config.go index a95b9db08447..5469f00a3c6a 100644 --- a/libbeat/reader/readjson/json_config.go +++ b/libbeat/reader/readjson/json_config.go @@ -20,6 +20,7 @@ package readjson // Config holds the options a JSON reader. type Config struct { MessageKey string `config:"message_key"` + DocumentID string `config:"document_id"` KeysUnderRoot bool `config:"keys_under_root"` OverwriteKeys bool `config:"overwrite_keys"` AddErrorKey bool `config:"add_error_key"` diff --git a/libbeat/reader/readjson/json_test.go b/libbeat/reader/readjson/json_test.go index 5f4deebd2bee..0d39fdaa122e 100644 --- a/libbeat/reader/readjson/json_test.go +++ b/libbeat/reader/readjson/json_test.go @@ -206,6 +206,7 @@ func TestAddJSONFields(t *testing.T) { JSONConfig Config ExpectedItems common.MapStr ExpectedTimestamp time.Time + ExpectedID interface{} }{ { // by default, don't overwrite keys @@ -340,6 +341,13 @@ func TestAddJSONFields(t *testing.T) { }, ExpectedTimestamp: time.Time{}, }, + { + // if document_id is set, extract the ID from the event + Name: "extract event id", + Data: common.MapStr{"@timestamp": common.Time(now), "json": common.MapStr{"id": "test_id"}}, + JSONConfig: Config{DocumentID: "id"}, + ExpectedID: "test_id", + }, } for _, test := range tests { @@ -349,13 +357,14 @@ func TestAddJSONFields(t *testing.T) { jsonFields = fields.(common.MapStr) } - ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig) + id, ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig) t.Log("Executing test:", test) for k, v := range test.ExpectedItems { assert.Equal(t, v, test.Data[k]) } assert.Equal(t, test.ExpectedTimestamp, ts) + assert.Equal(t, test.ExpectedID, id) }) } }