diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8a3b511aa9f..614708b7870 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -79,6 +79,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Do not rotate log files on startup when interval is configured and rotateonstartup is disabled. {pull}17613[17613] - Fix `setup.dashboards.index` setting not working. {pull}17749[17749] - Fix Elasticsearch license endpoint URL referenced in error message. {issue}17880[17880] {pull}18030[18030] +- Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] *Auditbeat* @@ -192,6 +193,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `providers` setting to `add_cloud_metadata` processor. {pull}13812[13812] - Ensure that init containers are no longer tailed after they stop {pull}14394[14394] - Fingerprint processor adds a new xxhash hashing algorithm {pull}15418[15418] +- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] *Filebeat* @@ -250,6 +252,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve AWS cloudtrail field mappings {issue}16086[16086] {issue}16110[16110] {pull}17155[17155] - Release Google Cloud module as GA. {pull}17511[17511] - Update filebeat httpjson input to support pagination via Header and Okta module. {pull}16354[16354] +- Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] *Heartbeat* diff --git a/libbeat/common/jsontransform/jsonhelper.go b/libbeat/common/jsontransform/jsonhelper.go index 1490bcff170..164e1e9e1f4 100644 --- a/libbeat/common/jsontransform/jsonhelper.go +++ b/libbeat/common/jsontransform/jsonhelper.go @@ -29,11 +29,12 @@ import ( // WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) { if !overwriteKeys { - for k, v := range keys { - if _, exists := event.Fields[k]; !exists && k != "@timestamp" && k != "@metadata" { - event.Fields[k] = v - } - } + // @timestamp and @metadata fields are root-level fields. We remove them so they + // don't become part of event.Fields. + removeKeys(keys, "@timestamp", "@metadata") + + // Then, perform deep update without overwriting + event.Fields.DeepUpdateNoOverwrite(keys) return } @@ -64,7 +65,7 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys } case map[string]interface{}: - event.Meta.Update(common.MapStr(m)) + event.Meta.DeepUpdate(common.MapStr(m)) default: event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey) @@ -83,13 +84,21 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys continue } event.Fields[k] = vstr - - default: - event.Fields[k] = v } } + + // We have accounted for @timestamp, @metadata, type above. So let's remove these keys and + // deep update the event with the rest of the keys. + removeKeys(keys, "@timestamp", "@metadata", "type") + event.Fields.DeepUpdate(keys) } func createJSONError(message string) common.MapStr { return common.MapStr{"message": message, "type": "json"} } + +func removeKeys(keys map[string]interface{}, names ...string) { + for _, name := range names { + delete(keys, name) + } +} diff --git a/libbeat/common/jsontransform/jsonhelper_test.go b/libbeat/common/jsontransform/jsonhelper_test.go new file mode 100644 index 00000000000..d7679579be1 --- /dev/null +++ b/libbeat/common/jsontransform/jsonhelper_test.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package jsontransform + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestWriteJSONKeys(t *testing.T) { + now := time.Now() + now = now.Round(time.Second) + + eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC) + eventMetadata := common.MapStr{ + "foo": "bar", + "baz": common.MapStr{ + "qux": 17, + }, + } + eventFields := common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "dee", + }, + } + + tests := map[string]struct { + keys map[string]interface{} + overwriteKeys bool + expectedMetadata common.MapStr + expectedTimestamp time.Time + expectedFields common.MapStr + }{ + "overwrite_true": { + overwriteKeys: true, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(time.RFC3339), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedMetadata: common.MapStr{ + "foo": "NEW_bar", + "baz": common.MapStr{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + expectedTimestamp: now, + expectedFields: common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + "overwrite_false": { + overwriteKeys: false, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(time.RFC3339), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedMetadata: eventMetadata.Clone(), + expectedTimestamp: eventTimestamp, + expectedFields: common.MapStr{ + "top_a": 23, + "top_b": common.MapStr{ + "inner_c": "see", + "inner_d": "dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + event := &beat.Event{ + Timestamp: eventTimestamp, + Meta: eventMetadata.Clone(), + Fields: eventFields.Clone(), + } + + WriteJSONKeys(event, test.keys, test.overwriteKeys, false) + require.Equal(t, test.expectedMetadata, event.Meta) + require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano()) + require.Equal(t, test.expectedFields, event.Fields) + }) + } +}