Skip to content

Commit

Permalink
Add support to set the document ID in the filebeat json reader
Browse files Browse the repository at this point in the history
Add support to configure a key for setting the document ID in
the harvester JSON settings. The ID will be store in the events
Meta["id"] for the output to pick up. With elastic#5811 will the elasticsearch
output is the Meta["id"] field to set the document its ID (uses
op_type="create" to count duplicate inserts of same ID). For other
output type, the document ID will be forwarded via `@metadata.id`.
  • Loading branch information
urso committed Sep 18, 2019
1 parent 717771f commit 0b9b43c
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add module for ingesting Cisco FTD logs over syslog. {pull}13286[13286]
- Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505]
- Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701]
- Add support to set the document id in the json reader. {pull}5844[5844]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/docs/inputs/input-common-harvester-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ must be at the top level in the JSON object and the value associated with the
key must be a string, otherwise no filtering or multiline aggregation will
occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
json document and stored in `@metadata.id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
be logged. The default is false.
Expand Down
11 changes: 9 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,21 @@ func (h *Harvester) onMessage(
jsonFields = f.(common.MapStr)
}

var meta common.MapStr
timestamp := message.Ts

if h.config.JSON != nil && len(jsonFields) > 0 {
ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
id, ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
if !ts.IsZero() {
// there was a `@timestamp` key in the event, so overwrite
// the resulting timestamp
timestamp = ts
}

if id != nil {
meta = common.MapStr{
"id": id,
}
}
} else if &text != nil {
if fields == nil {
fields = common.MapStr{}
Expand All @@ -379,6 +385,7 @@ func (h *Harvester) onMessage(
err := forwarder.Send(beat.Event{
Timestamp: timestamp,
Fields: fields,
Meta: meta,
Private: state,
})
return err == nil
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/files/logs/json_id.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": "test", "id": 0}
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": 5, "id": 1}
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": {"hello": "shouldn't work"}, "id": 2}
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ filebeat.{{input_config | default("inputs")}}:
{% if json.overwrite_keys %}overwrite_keys: true{% endif %}
{% if json.add_error_key %}add_error_key: true{% endif %}
{% if json.ignore_decoding_error %}ignore_decoding_error: true{% endif %}
{% if json.document_id %}document_id: {{json.document_id}}{% endif %}
{% endif %}

{% if multiline %}
Expand Down
29 changes: 29 additions & 0 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,35 @@ def test_type_in_message(self):
assert output[2]["error.message"] == \
"type not overwritten (not string)"

def test_id_in_message(self):
"""
Extract document ID from json contents.
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
message_key="msg",
document_id="id",
),
)
os.mkdir(self.working_dir + "/log/")
self.copy_files(["logs/json_id.log"],
source_dir="../files",
target_dir="log")
proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()

assert len(output) == 3
for i in xrange(len(output)):
assert("@metadata.id" in output[i])
assert(output[i]["@metadata.id"] == i)
assert("json.id" not in output[i])

def test_with_generic_filtering(self):
"""
It should work fine to combine JSON decoding with
Expand Down
14 changes: 11 additions & 3 deletions libbeat/reader/readjson/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func createJSONError(message string) common.MapStr {
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) time.Time {
func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config Config) (interface{}, time.Time) {
// The message key might have been modified by multiline
if len(config.MessageKey) > 0 && text != nil {
jsonFields[config.MessageKey] = *text
Expand All @@ -127,6 +127,14 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string,
data["message"] = *text
}

var id interface{}
if key := config.DocumentID; key != "" {
if tmp, err := jsonFields.GetValue(key); err == nil {
id = tmp
}
jsonFields.Delete(key)
}

if config.KeysUnderRoot {
// Delete existing json key
delete(data, "json")
Expand All @@ -147,7 +155,7 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string,
}
jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys, config.AddErrorKey)

return event.Timestamp
return id, event.Timestamp
}
return time.Time{}
return id, time.Time{}
}
1 change: 1 addition & 0 deletions libbeat/reader/readjson/json_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package readjson
// Config holds the options a JSON reader.
type Config struct {
MessageKey string `config:"message_key"`
DocumentID string `config:"document_id"`
KeysUnderRoot bool `config:"keys_under_root"`
OverwriteKeys bool `config:"overwrite_keys"`
AddErrorKey bool `config:"add_error_key"`
Expand Down
11 changes: 10 additions & 1 deletion libbeat/reader/readjson/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func TestAddJSONFields(t *testing.T) {
JSONConfig Config
ExpectedItems common.MapStr
ExpectedTimestamp time.Time
ExpectedID interface{}
}{
{
// by default, don't overwrite keys
Expand Down Expand Up @@ -340,6 +341,13 @@ func TestAddJSONFields(t *testing.T) {
},
ExpectedTimestamp: time.Time{},
},
{
// if document_id is set, extract the ID from the event
Name: "extract event id",
Data: common.MapStr{"@timestamp": common.Time(now), "json": common.MapStr{"id": "test_id"}},
JSONConfig: Config{DocumentID: "id"},
ExpectedID: "test_id",
},
}

for _, test := range tests {
Expand All @@ -349,13 +357,14 @@ func TestAddJSONFields(t *testing.T) {
jsonFields = fields.(common.MapStr)
}

ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig)
id, ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig)

t.Log("Executing test:", test)
for k, v := range test.ExpectedItems {
assert.Equal(t, v, test.Data[k])
}
assert.Equal(t, test.ExpectedTimestamp, ts)
assert.Equal(t, test.ExpectedID, id)
})
}
}

0 comments on commit 0b9b43c

Please sign in to comment.