From 99e16ba573bcf6c8a2ba914c87a4a0263851d7a3 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Fri, 19 Oct 2018 18:24:10 +0200 Subject: [PATCH] Cherry-pick #8646 to 6.4: Fix race in monitoring output (#8659) Cherry-pick of PR #8646 to 6.4 branch. Original message: The action part of the bulk request used to be a constant (used to use 'var' for init purposes only), but with 6.4 we have had to introduce the `index._type` field per monitoring type. With the introduction of telemetry we ended up with 2 publisher pipelines and 2 outputs, each with slightly different parameters, yet the action part has become a 'global' shared variable, that was modified by each reporter. If metrics and telemetry publish events at the same time, there is a race in modifying the global and serializing the bulk request. This change removes the global and creates an action item per event, such that there will be no sharing at all. --- CHANGELOG.asciidoc | 2 ++ .../monitoring/report/elasticsearch/client.go | 26 +++++++------------ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 228365d5ee7..837f9f78eb8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,8 @@ https://github.com/elastic/beats/compare/v6.4.1...6.4[Check the HEAD diff] *Affecting all Beats* +- Fix race condition when publishing monitoring data. {pull}8646[8646] + *Auditbeat* *Filebeat* diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 4da37db60a0..9c216189bfb 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -34,16 +34,6 @@ type publishClient struct { params map[string]string } -var ( - // monitoring beats action - actMonitoringBeats = common.MapStr{ - "index": common.MapStr{ - "_index": "", - "_routing": nil, - }, - } -) - func newPublishClient( es *esout.Client, params map[string]string, @@ -122,15 +112,19 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } } - actMonitoringBeats.Put("index._type", t) - bulk := [2]interface{}{ - actMonitoringBeats, - report.Event{ - Timestamp: event.Content.Timestamp, - Fields: event.Content.Fields, + action := common.MapStr{ + "index": common.MapStr{ + "_type": t, + "_index": "", + "_routing": nil, }, } + document := report.Event{ + Timestamp: event.Content.Timestamp, + Fields: event.Content.Fields, + } + bulk := [2]interface{}{action, document} // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param.