From c115bb1a5eeccbe64f61563af2d9e99f8f882700 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Mon, 2 Dec 2019 13:55:34 +0000 Subject: [PATCH] Libbeat: Do not overwrite agent.*, ecs.version, and host.name (#14407) Do not forcibly overwrite agent.*, ecs.version, and host.name in Libbeat when they are already set. --- CHANGELOG.next.asciidoc | 1 + filebeat/channel/connector_test.go | 4 +-- libbeat/common/mapstr.go | 30 ++++++++++++++++---- libbeat/processors/actions/add_fields.go | 18 ++++++++---- libbeat/processors/actions/add_labels.go | 2 +- libbeat/publisher/processing/default.go | 4 +-- libbeat/publisher/processing/default_test.go | 2 +- 7 files changed, 43 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7b14bd27ad8..2ac3079ec84 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -26,6 +26,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Allow Metricbeat's beat module to read monitoring information over a named pipe or unix domain socket. {pull}14558[14558] - Remove version information from default ILM policy for improved upgrade experience on custom policies. {pull}14745[14745] - Running `setup` cmd respects `setup.ilm.overwrite` setting for improved support of custom policies. {pull}14741[14741] +- Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407] *Auditbeat* diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go index 4708a7e45a5..f9f5cc0640c 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/connector_test.go @@ -84,7 +84,7 @@ func TestProcessorsForConfig(t *testing.T) { Processing: beat.ProcessingConfig{ Processor: makeProcessors(actions.NewAddFields(common.MapStr{ "fields": common.MapStr{"testField": "clientConfig"}, - }, false)), + }, false, true)), }, }, expectedFields: map[string]string{ @@ -97,7 +97,7 @@ func TestProcessorsForConfig(t *testing.T) { Processing: beat.ProcessingConfig{ Processor: makeProcessors(actions.NewAddFields(common.MapStr{ "fields": common.MapStr{"testField": "clientConfig"}, - }, false)), + }, false, true)), }, }, expectedFields: map[string]string{ diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index ec184f7645a..ca2a31d50b3 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -64,33 +64,51 @@ func (m MapStr) Update(d MapStr) { // DeepUpdate recursively copies the key-value pairs from d to this map. // If the key is present and a map as well, the sub-map will be updated recursively // via DeepUpdate. +// DeepUpdateNoOverwrite is a version of this function that does not +// overwrite existing values. func (m MapStr) DeepUpdate(d MapStr) { + m.deepUpdateMap(d, true) +} + +// DeepUpdateNoOverwrite recursively copies the key-value pairs from d to this map. +// If a key is already present it will not be overwritten. +// DeepUpdate is a version of this function that overwrites existing values. +func (m MapStr) DeepUpdateNoOverwrite(d MapStr) { + m.deepUpdateMap(d, false) +} + +func (m MapStr) deepUpdateMap(d MapStr, overwrite bool) { for k, v := range d { switch val := v.(type) { case map[string]interface{}: - m[k] = deepUpdateValue(m[k], MapStr(val)) + m[k] = deepUpdateValue(m[k], MapStr(val), overwrite) case MapStr: - m[k] = deepUpdateValue(m[k], val) + m[k] = deepUpdateValue(m[k], val, overwrite) default: - m[k] = v + if overwrite { + m[k] = v + } else if _, exists := m[k]; !exists { + m[k] = v + } } } } -func deepUpdateValue(old interface{}, val MapStr) interface{} { +func deepUpdateValue(old interface{}, val MapStr, overwrite bool) interface{} { if old == nil { return val } switch sub := old.(type) { case MapStr: - sub.DeepUpdate(val) + sub.deepUpdateMap(val, overwrite) return sub case map[string]interface{}: tmp := MapStr(sub) - tmp.DeepUpdate(val) + tmp.deepUpdateMap(val, overwrite) return tmp default: + // This should never happen return val } } diff --git a/libbeat/processors/actions/add_fields.go b/libbeat/processors/actions/add_fields.go index 39b41b0c905..f370b4c5c43 100644 --- a/libbeat/processors/actions/add_fields.go +++ b/libbeat/processors/actions/add_fields.go @@ -29,8 +29,9 @@ import ( ) type addFields struct { - fields common.MapStr - shared bool + fields common.MapStr + shared bool + overwrite bool } // FieldsKey is the default target key for the add_fields processor. @@ -66,8 +67,8 @@ func CreateAddFields(c *common.Config) (processors.Processor, error) { // NewAddFields creates a new processor adding the given fields to events. // Set `shared` true if there is the chance of labels being changed/modified by // subsequent processors. -func NewAddFields(fields common.MapStr, shared bool) processors.Processor { - return &addFields{fields: fields, shared: shared} +func NewAddFields(fields common.MapStr, shared bool, overwrite bool) processors.Processor { + return &addFields{fields: fields, shared: shared, overwrite: overwrite} } func (af *addFields) Run(event *beat.Event) (*beat.Event, error) { @@ -76,7 +77,12 @@ func (af *addFields) Run(event *beat.Event) (*beat.Event, error) { fields = fields.Clone() } - event.Fields.DeepUpdate(fields) + if af.overwrite { + event.Fields.DeepUpdate(fields) + } else { + event.Fields.DeepUpdateNoOverwrite(fields) + } + return event, nil } @@ -99,5 +105,5 @@ func makeFieldsProcessor(target string, fields common.MapStr, shared bool) proce } } - return NewAddFields(fields, shared) + return NewAddFields(fields, shared, true) } diff --git a/libbeat/processors/actions/add_labels.go b/libbeat/processors/actions/add_labels.go index de6f4cc9011..8d7ddc45d7a 100644 --- a/libbeat/processors/actions/add_labels.go +++ b/libbeat/processors/actions/add_labels.go @@ -56,5 +56,5 @@ func createAddLabels(c *common.Config) (processors.Processor, error) { func NewAddLabels(labels common.MapStr, shared bool) processors.Processor { return NewAddFields(common.MapStr{ LabelsKey: labels.Flatten(), - }, shared) + }, shared, true) } diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 323938e4c18..cd15a0750fd 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -295,7 +295,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, // With dynamic fields potentially changing at any time, we need to copy, // so we do not change shared structures be accident. fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, builtin) - processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) + processors.add(actions.NewAddFields(fields, fieldsNeedsCopy, true)) } if cfg.DynamicFields != nil { @@ -310,7 +310,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, // setup 6: add beats and host metadata if meta := builtin; len(meta) > 0 { - processors.add(actions.NewAddFields(meta, needsCopy)) + processors.add(actions.NewAddFields(meta, needsCopy, false)) } // setup 8: pipeline processors list diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 5f49b2b3829..c21345d2701 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -165,7 +165,7 @@ func TestProcessorsConfigs(t *testing.T) { local: beat.ProcessingConfig{ Processor: func() beat.ProcessorList { g := newGroup("test", logp.L()) - g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) + g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true, true)) return g }(), },