Skip to content

Commit

Permalink
Cherry-pick elastic#8646 to 6.4: Fix race in monitoring output (elast…
Browse files Browse the repository at this point in the history
…ic#8659)

Cherry-pick of PR elastic#8646 to 6.4 branch. Original message: 

The action part of the bulk request used to be a constant (used to use
'var' for init purposes only), but with 6.4 we have had to introduce the
`index._type` field per monitoring type.
With the introduction of telemetry we ended up with 2 publisher
pipelines and 2 outputs, each with slightly different parameters, yet
the action part has become a 'global' shared variable, that was modified
by each reporter. If metrics and telemetry publish events at the same
time, there is a race in modifying the global and serializing the bulk
request.

This change removes the global and creates an action item per event,
such that there will be no sharing at all.
  • Loading branch information
Steffen Siering authored Oct 19, 2018
1 parent c3de428 commit b64bc13
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ https://github.com/elastic/beats/compare/v6.4.1...6.4[Check the HEAD diff]

*Affecting all Beats*

- Fix race condition when publishing monitoring data. {pull}8646[8646]

*Auditbeat*

*Filebeat*
Expand Down
26 changes: 10 additions & 16 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ type publishClient struct {
params map[string]string
}

var (
// monitoring beats action
actMonitoringBeats = common.MapStr{
"index": common.MapStr{
"_index": "",
"_routing": nil,
},
}
)

func newPublishClient(
es *esout.Client,
params map[string]string,
Expand Down Expand Up @@ -122,15 +112,19 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
}
}
}
actMonitoringBeats.Put("index._type", t)

bulk := [2]interface{}{
actMonitoringBeats,
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
action := common.MapStr{
"index": common.MapStr{
"_type": t,
"_index": "",
"_routing": nil,
},
}
document := report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
}
bulk := [2]interface{}{action, document}

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
Expand Down

0 comments on commit b64bc13

Please sign in to comment.