Skip to content

Commit

Permalink
Libbeat: Do not overwrite agent.*, ecs.version, and host.name (elasti…
Browse files Browse the repository at this point in the history
…c#14407)

Do not forcibly overwrite agent.*, ecs.version, and host.name in Libbeat when they are already set.

(cherry picked from commit c115bb1)
  • Loading branch information
Christoph Wurm committed Dec 2, 2019
1 parent bf912a1 commit 05fcdb7
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow Metricbeat's beat module to read monitoring information over a named pipe or unix domain socket. {pull}14558[14558]
- Remove version information from default ILM policy for improved upgrade experience on custom policies. {pull}14745[14745]
- Running `setup` cmd respects `setup.ilm.overwrite` setting for improved support of custom policies. {pull}14741[14741]
- Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407]

*Auditbeat*

Expand Down
30 changes: 24 additions & 6 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,51 @@ func (m MapStr) Update(d MapStr) {
// DeepUpdate recursively copies the key-value pairs from d to this map.
// If the key is present and a map as well, the sub-map will be updated recursively
// via DeepUpdate.
// DeepUpdateNoOverwrite is a version of this function that does not
// overwrite existing values.
func (m MapStr) DeepUpdate(d MapStr) {
m.deepUpdateMap(d, true)
}

// DeepUpdateNoOverwrite recursively copies the key-value pairs from d to this map.
// If a key is already present it will not be overwritten.
// DeepUpdate is a version of this function that overwrites existing values.
func (m MapStr) DeepUpdateNoOverwrite(d MapStr) {
m.deepUpdateMap(d, false)
}

func (m MapStr) deepUpdateMap(d MapStr, overwrite bool) {
for k, v := range d {
switch val := v.(type) {
case map[string]interface{}:
m[k] = deepUpdateValue(m[k], MapStr(val))
m[k] = deepUpdateValue(m[k], MapStr(val), overwrite)
case MapStr:
m[k] = deepUpdateValue(m[k], val)
m[k] = deepUpdateValue(m[k], val, overwrite)
default:
m[k] = v
if overwrite {
m[k] = v
} else if _, exists := m[k]; !exists {
m[k] = v
}
}
}
}

func deepUpdateValue(old interface{}, val MapStr) interface{} {
func deepUpdateValue(old interface{}, val MapStr, overwrite bool) interface{} {
if old == nil {
return val
}

switch sub := old.(type) {
case MapStr:
sub.DeepUpdate(val)
sub.deepUpdateMap(val, overwrite)
return sub
case map[string]interface{}:
tmp := MapStr(sub)
tmp.DeepUpdate(val)
tmp.deepUpdateMap(val, overwrite)
return tmp
default:
// This should never happen
return val
}
}
Expand Down
18 changes: 12 additions & 6 deletions libbeat/processors/actions/add_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (
)

type addFields struct {
fields common.MapStr
shared bool
fields common.MapStr
shared bool
overwrite bool
}

// FieldsKey is the default target key for the add_fields processor.
Expand Down Expand Up @@ -66,8 +67,8 @@ func CreateAddFields(c *common.Config) (processors.Processor, error) {
// NewAddFields creates a new processor adding the given fields to events.
// Set `shared` true if there is the chance of labels being changed/modified by
// subsequent processors.
func NewAddFields(fields common.MapStr, shared bool) processors.Processor {
return &addFields{fields: fields, shared: shared}
func NewAddFields(fields common.MapStr, shared bool, overwrite bool) processors.Processor {
return &addFields{fields: fields, shared: shared, overwrite: overwrite}
}

func (af *addFields) Run(event *beat.Event) (*beat.Event, error) {
Expand All @@ -76,7 +77,12 @@ func (af *addFields) Run(event *beat.Event) (*beat.Event, error) {
fields = fields.Clone()
}

event.Fields.DeepUpdate(fields)
if af.overwrite {
event.Fields.DeepUpdate(fields)
} else {
event.Fields.DeepUpdateNoOverwrite(fields)
}

return event, nil
}

Expand All @@ -99,5 +105,5 @@ func makeFieldsProcessor(target string, fields common.MapStr, shared bool) proce
}
}

return NewAddFields(fields, shared)
return NewAddFields(fields, shared, true)
}
2 changes: 1 addition & 1 deletion libbeat/processors/actions/add_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ func createAddLabels(c *common.Config) (processors.Processor, error) {
func NewAddLabels(labels common.MapStr, shared bool) processors.Processor {
return NewAddFields(common.MapStr{
LabelsKey: labels.Flatten(),
}, shared)
}, shared, true)
}
4 changes: 2 additions & 2 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
// With dynamic fields potentially changing at any time, we need to copy,
// so we do not change shared structures be accident.
fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, builtin)
processors.add(actions.NewAddFields(fields, fieldsNeedsCopy))
processors.add(actions.NewAddFields(fields, fieldsNeedsCopy, true))
}

if cfg.DynamicFields != nil {
Expand All @@ -310,7 +310,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,

// setup 6: add beats and host metadata
if meta := builtin; len(meta) > 0 {
processors.add(actions.NewAddFields(meta, needsCopy))
processors.add(actions.NewAddFields(meta, needsCopy, false))
}

// setup 8: pipeline processors list
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/processing/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestProcessorsConfigs(t *testing.T) {
local: beat.ProcessingConfig{
Processor: func() beat.ProcessorList {
g := newGroup("test", logp.L())
g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true))
g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true, true))
return g
}(),
},
Expand Down

0 comments on commit 05fcdb7

Please sign in to comment.