From efaa0070649ee03bb0a54ad457fd9431d463e4c3 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Fri, 19 Oct 2018 15:51:37 +0200 Subject: [PATCH] Fix race in monitoring output (#8646) The action part of the bulk request used to be a constant (used to use 'var' for init purposes only), but with 6.4 we have had to introduce the `index._type` field per monitoring type. With the introduction of telemetry we ended up with 2 publisher pipelines and 2 outputs, each with slightly different parameters, yet the action part has become a 'global' shared variable, that was modified by each reporter. If metrics and telemetry publish events at the same time, there is a race in modifying the global and serializing the bulk request. This change removes the global and creates an action item per event, such that there will be no sharing at all. (cherry picked from commit 14380003f03cd77a60c7989406c9bdfbda377297) --- CHANGELOG.asciidoc | 2 ++ .../monitoring/report/elasticsearch/client.go | 26 +++++++------------ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 228365d5ee7..837f9f78eb8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,8 @@ https://github.com/elastic/beats/compare/v6.4.1...6.4[Check the HEAD diff] *Affecting all Beats* +- Fix race condition when publishing monitoring data. {pull}8646[8646] + *Auditbeat* *Filebeat* diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 4da37db60a0..9c216189bfb 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -34,16 +34,6 @@ type publishClient struct { params map[string]string } -var ( - // monitoring beats action - actMonitoringBeats = common.MapStr{ - "index": common.MapStr{ - "_index": "", - "_routing": nil, - }, - } -) - func newPublishClient( es *esout.Client, params map[string]string, @@ -122,15 +112,19 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } } - actMonitoringBeats.Put("index._type", t) - bulk := [2]interface{}{ - actMonitoringBeats, - report.Event{ - Timestamp: event.Content.Timestamp, - Fields: event.Content.Fields, + action := common.MapStr{ + "index": common.MapStr{ + "_type": t, + "_index": "", + "_routing": nil, }, } + document := report.Event{ + Timestamp: event.Content.Timestamp, + Fields: event.Content.Fields, + } + bulk := [2]interface{}{action, document} // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param.