Skip to content

Commit

Permalink
Fix race in monitoring output (elastic#8646)
Browse files Browse the repository at this point in the history
The action part of the bulk request used to be a constant (used to use
'var' for init purposes only), but with 6.4 we have had to introduce the
`index._type` field per monitoring type.
With the introduction of telemetry we ended up with 2 publisher
pipelines and 2 outputs, each with slightly different parameters, yet
the action part has become a 'global' shared variable, that was modified
by each reporter. If metrics and telemetry publish events at the same
time, there is a race in modifying the global and serializing the bulk
request.

This change removes the global and creates an action item per event,
such that there will be no sharing at all.
  • Loading branch information
Steffen Siering authored Oct 19, 2018
1 parent 1012302 commit 1438000
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fix a race condition with the `add_host_metadata` and the event serialization. {pull}8223[8223]
- Enforce that data used by k8s or docker doesn't use any reference. {pull}8240[8240]
- Switch to different UUID lib due to to non-random generated UUIDs. {pull}8485[8485]
- Fix race condition when publishing monitoring data. {pull}8646[8646]

*Auditbeat*

Expand Down
26 changes: 10 additions & 16 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ type publishClient struct {
params map[string]string
}

var (
// monitoring beats action
actMonitoringBeats = common.MapStr{
"index": common.MapStr{
"_index": "",
"_routing": nil,
},
}
)

func newPublishClient(
es *esout.Client,
params map[string]string,
Expand Down Expand Up @@ -122,15 +112,19 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
}
}
}
actMonitoringBeats.Put("index._type", t)

bulk := [2]interface{}{
actMonitoringBeats,
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
action := common.MapStr{
"index": common.MapStr{
"_type": t,
"_index": "",
"_routing": nil,
},
}
document := report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
}
bulk := [2]interface{}{action, document}

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
Expand Down

0 comments on commit 1438000

Please sign in to comment.