Skip to content

Commit

Permalink
[7.5] Allow users to set just monitoring.cluster_uuid (elastic#14338) (
Browse files Browse the repository at this point in the history
…elastic#14633)

* Allow users to set just monitoring.cluster_uuid (elastic#14338)

* Allow users to just set monitoring.cluster_uuid

* Adding back comment lost in refactoring

* Adds godoc comments for functions

* Removes stutter in function name

* Adding system test

* Adding CHANGELOG entry

* Fixing formatting

* Move defer to right scope

* Adding system test

* Fixing formatting

* Fixing up CHANGELOG
  • Loading branch information
ycombinator committed Nov 20, 2019
1 parent 0a44d8b commit b904569
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Affecting all Beats*

- Fix a race condition with the Kafka pipeline client, it is possible that `Close()` get called before `Connect()` . {issue}11945[11945]
- Allow users to configure only `cluster_uuid` setting under `monitoring` namespace. {pull}14338[14338]

*Auditbeat*

Expand Down
59 changes: 38 additions & 21 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,30 +402,12 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
return err
}

monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig)
r, err := b.setupMonitoring(settings)
if err != nil {
return err
}

if monitoringCfg.Enabled() {
settings := report.Settings{
DefaultUsername: settings.Monitoring.DefaultUsername,
Format: reporterSettings.Format,
ClusterUUID: reporterSettings.ClusterUUID,
}
reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output)
if err != nil {
return err
}
defer reporter.Stop()

// Expose monitoring.cluster_uuid in state API
if reporterSettings.ClusterUUID != "" {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
monitoringRegistry := stateRegistry.NewRegistry("monitoring")
clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid")
clusterUUIDRegVar.Set(reporterSettings.ClusterUUID)
}
if r != nil {
defer r.Stop()
}

if b.Config.MetricLogging == nil || b.Config.MetricLogging.Enabled() {
Expand Down Expand Up @@ -887,6 +869,41 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err
return callback, nil
}

func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) {
monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig)
if err != nil {
return nil, err
}

monitoringClusterUUID, err := monitoring.GetClusterUUID(monitoringCfg)
if err != nil {
return nil, err
}

// Expose monitoring.cluster_uuid in state API
if monitoringClusterUUID != "" {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
monitoringRegistry := stateRegistry.NewRegistry("monitoring")
clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid")
clusterUUIDRegVar.Set(monitoringClusterUUID)
}

if monitoring.IsEnabled(monitoringCfg) {
settings := report.Settings{
DefaultUsername: settings.Monitoring.DefaultUsername,
Format: reporterSettings.Format,
ClusterUUID: monitoringClusterUUID,
}
reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output)
if err != nil {
return nil, err
}
return reporter, nil
}

return nil, nil
}

// handleError handles the given error by logging it and then returning the
// error. If the err is nil or is a GracefulExit error then the method will
// return nil without logging anything.
Expand Down
40 changes: 33 additions & 7 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,40 @@ func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error)
return monitoringCfg, &report.Settings{Format: report.FormatXPackMonitoringBulk}, nil
case beatCfg.Monitoring.Enabled():
monitoringCfg := beatCfg.Monitoring
var info struct {
ClusterUUID string `config:"cluster_uuid"`
}
if err := monitoringCfg.Unpack(&info); err != nil {
return nil, nil, err
}
return monitoringCfg, &report.Settings{Format: report.FormatBulk, ClusterUUID: info.ClusterUUID}, nil
return monitoringCfg, &report.Settings{Format: report.FormatBulk}, nil
default:
return nil, nil, nil
}
}

// GetClusterUUID returns the value of the monitoring.cluster_uuid setting, if it is set.
func GetClusterUUID(monitoringCfg *common.Config) (string, error) {
if monitoringCfg == nil {
return "", nil
}

var config struct {
ClusterUUID string `config:"cluster_uuid"`
}
if err := monitoringCfg.Unpack(&config); err != nil {
return "", err
}

return config.ClusterUUID, nil
}

// IsEnabled returns whether the monitoring reporter is enabled or not.
func IsEnabled(monitoringCfg *common.Config) bool {
if monitoringCfg == nil {
return false
}

// If the only setting in the monitoring config is cluster_uuid, it is
// not enabled
fields := monitoringCfg.GetFields()
if len(fields) == 1 && fields[0] == "cluster_uuid" {
return false
}

return monitoringCfg.Enabled()
}
16 changes: 13 additions & 3 deletions libbeat/tests/system/config/mockbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,20 @@ xpack.monitoring.elasticsearch.state.period: 3s # to speed up tests

{% if monitoring -%}
#================================ X-Pack Monitoring (direct) =====================================
monitoring.elasticsearch.hosts: {{monitoring.elasticsearch.hosts}}
monitoring.elasticsearch.metrics.period: 2s # to speed up tests
monitoring.elasticsearch.state.period: 3s # to speed up tests
monitoring:
{% if monitoring.elasticsearch -%}
elasticsearch.hosts: {{monitoring.elasticsearch.hosts}}
elasticsearch.metrics.period: 2s # to speed up tests
elasticsearch.state.period: 3s # to speed up tests
{% endif -%}

{% if monitoring.cluster_uuid -%}
cluster_uuid: {{monitoring.cluster_uuid}}
{% endif -%}
{% endif -%}

# vim: set ft=jinja:

{% if http_enabled -%}
http.enabled: {{http_enabled}}
{% endif -%}
34 changes: 34 additions & 0 deletions libbeat/tests/system/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import re
from nose.plugins.attrib import attr
import unittest
import requests
import random
import string

INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)

Expand Down Expand Up @@ -150,6 +153,29 @@ def test_compare(self):
self.assert_same_structure(indirect_beats_state_doc['beats_state'], direct_beats_state_doc['beats_state'])
self.assert_same_structure(indirect_beats_stats_doc['beats_stats'], direct_beats_stats_doc['beats_stats'])

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@attr('integration')
def test_cluster_uuid_setting(self):
"""
Test that monitoring.cluster_uuid setting may be set without any other monitoring.* settings
"""
test_cluster_uuid = self.random_string(10)
self.render_config_template(
"mockbeat",
monitoring={
"cluster_uuid": test_cluster_uuid
},
http_enabled="true"
)

proc = self.start_beat(config="mockbeat.yml")
self.wait_until(lambda: self.log_contains("mockbeat start running."))

state = self.get_beat_state()
proc.check_kill_and_wait()

self.assertEqual(test_cluster_uuid, state["monitoring"]["cluster_uuid"])

def search_monitoring_doc(self, monitoring_type):
results = self.es_monitoring.search(
index='.monitoring-beats-*',
Expand Down Expand Up @@ -241,3 +267,11 @@ def get_elasticsearch_monitoring_url(self):
host=os.getenv("ES_MONITORING_HOST", "localhost"),
port=os.getenv("ES_MONITORING_PORT", "9210")
)

def get_beat_state(self):
url = "http://localhost:5066/state"
return requests.get(url).json()

def random_string(self, size):
char_pool = string.ascii_letters + string.digits
return ''.join(random.choice(char_pool) for i in range(size))

0 comments on commit b904569

Please sign in to comment.