Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #16440 to 7.x: [libbeat] Fix / clarify the module reload logic #16686

Merged
merged 1 commit into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix index names for indexing not always guaranteed to be lower case. {pull}16081[16081]
- Upgrade go-ucfg to latest v0.8.1. {pull}15937{15937}
- Fix loading processors from annotation hints. {pull}16348[16348]
- Fix an issue that could cause redundant configuration reloads. {pull}16440[16440]
- Fix k8s pods labels broken schema. {pull}16480[16480]
- Fix k8s pods annotations broken schema. {pull}16554[16554]
- Upgrade go-ucfg to latest v0.8.3. {pull}16450{16450}
Expand Down Expand Up @@ -182,6 +183,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add document_id setting to decode_json_fields processor. {pull}15859[15859]
- Include network information by default on add_host_metadata and add_observer_metadata. {issue}15347[15347] {pull}16077[16077]
- Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823]
- Add monitoring variable `libbeat.config.scans` to distinguish scans of the configuration directory from actual reloads of its contents. {pull}16440[16440]
- Add support for multiple password in redis output. {issue}16058[16058] {pull}16206[16206]
- Remove experimental flag from `setup.template.append_fields` {pull}16576[16576]
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621]
Expand Down
6 changes: 5 additions & 1 deletion heartbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def test_config_add(self):
"""
self.setup_dynamic()

# Wait until the beat is running and has performed its first load of
# the config directory.
self.wait_until(lambda: self.log_contains(
"Starting reload procedure, current runners: 0"))

Expand All @@ -77,8 +79,10 @@ def test_config_add(self):
self.write_dyn_config(
"test.yml", self.http_cfg("myid", "http://localhost:{}".format(server.server_port)))

# The beat should recognize there is a new runner to start.
self.wait_until(lambda: self.log_contains(
"Starting reload procedure, current runners: 1"))
"Start list: 1, Stop list: 0"),
max_timeout=10)

self.wait_until(lambda: self.output_lines() > 0)

Expand Down
27 changes: 18 additions & 9 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ var (

debugf = logp.MakeDebug("cfgfile")

// configScans measures how many times the config dir was scanned for
// changes, configReloads measures how many times there were changes that
// triggered an actual reload.
configScans = monitoring.NewInt(nil, "libbeat.config.scans")
configReloads = monitoring.NewInt(nil, "libbeat.config.reloads")
moduleStarts = monitoring.NewInt(nil, "libbeat.config.module.starts")
moduleStops = monitoring.NewInt(nil, "libbeat.config.module.stops")
Expand Down Expand Up @@ -185,7 +189,11 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
rl.config.Reload.Period = 0
}

overwriteUpdate := true
// If forceReload is set, the configuration should be reloaded
// even if there are no changes. It is set on the first iteration,
// and whenever an attempted reload fails. It is unset whenever
// a reload succeeds.
forceReload := true

for {
select {
Expand All @@ -195,7 +203,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {

case <-time.After(rl.config.Reload.Period):
debugf("Scan for new config files")
configReloads.Add(1)
configScans.Add(1)

files, updated, err := gw.Scan()
if err != nil {
Expand All @@ -204,21 +212,22 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
logp.Err("Error fetching new config files: %v", err)
}

// no file changes
if !updated && !overwriteUpdate {
overwriteUpdate = false
// if there are no changes, skip this reload unless forceReload is set.
if !updated && !forceReload {
continue
}
configReloads.Add(1)

// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))

if err := list.Reload(configs); err != nil {
// Make sure the next run also updates because some runners were not properly loaded
overwriteUpdate = true
}
err = list.Reload(configs)
// Force reload on the next iteration if and only if this one failed.
// (Any errors are already logged by list.Reload, so we don't need to
// propagate the details further.)
forceReload = err != nil
}

// Path loading is enabled but not reloading. Loads files only once and then stops.
Expand Down
107 changes: 107 additions & 0 deletions libbeat/cfgfile/reload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package cfgfile

import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
)

func TestReloader(t *testing.T) {
// Create random temp directory
dir, err := ioutil.TempDir("", "libbeat-reloader")
defer os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
glob := dir + "/*.yml"

config := common.MustNewConfigFrom(common.MapStr{
"path": glob,
"reload": common.MapStr{
"period": "1s",
"enabled": true,
},
})
// common.Config{}
reloader := NewReloader(nil, config)
retryCount := 10

go reloader.Run(nil)
defer reloader.Stop()

// wait until configScans >= 2 (which should happen after ~1 second)
for i := 0; i < retryCount; i++ {
if configScans.Get() >= 2 {
break
}
// time interval is slightly more than a second so we don't slightly
// undershoot the first iteration and wait a whole extra second.
time.Sleep(1100 * time.Millisecond)
}
if configScans.Get() < 2 {
assert.Fail(t, "Timed out waiting for configScans >= 2")
}

// The first scan should cause a reload, but additional ones should not,
// so configReloads should still be 1.
assert.Equal(t, int64(1), configReloads.Get())

// Write a file to the reloader path to trigger a real reload
content := []byte("test\n")
err = ioutil.WriteFile(dir+"/config1.yml", content, 0644)
assert.NoError(t, err)

// Wait for the number of scans to increase at least twice. This is somewhat
// pedantic, but if we just wait for the next scan, it's possible to wake up
// during the brief interval after configScans is updated but before
// configReloads is, giving a false negative. Waiting two iterations
// guarantees that the change from the first one has taken effect.
targetScans := configScans.Get() + 2
for i := 0; i < retryCount; i++ {
time.Sleep(time.Second)
if configScans.Get() >= targetScans {
break
}
}
if configScans.Get() < targetScans {
assert.Fail(t,
fmt.Sprintf("Timed out waiting for configScans >= %d", targetScans))
}

// The number of reloads should now have increased. It would be nicer to
// check if the value is exactly 2, but we can't guarantee this: the glob
// watcher includes an extra 1-second margin around the real modification
// time, so changes that fall too close to a scan interval can be detected
// twice.
if configReloads.Get() < 2 {
assert.Fail(t,
fmt.Sprintf(
"Reloader performed %d scans but only reloaded once",
configScans.Get()))
}
}
1 change: 1 addition & 0 deletions libbeat/docs/http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ curl -XGET 'localhost:5066/stats?pretty'
"starts": 0,
"stops": 0
},
"scans": 1,
"reloads": 1
},
"output": {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/tests/system/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_stats(self):
data = json.loads(r.content)

# Test one data point
assert data["libbeat"]["config"]["reloads"] == 0
assert data["libbeat"]["config"]["scans"] == 0

proc.check_kill_and_wait()

Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/beat/stats/_meta/test/stats.800.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"starts": 0,
"stops": 0
},
"scans": 1,
"reloads": 1
},
"output": {
Expand Down
12 changes: 5 additions & 7 deletions metricbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def test_reload(self):

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_start_stop(self):
def reload_line(
num_runners): return "Starting reload procedure, current runners: %d" % num_runners
"""
Test if module is properly started and stopped
"""
Expand All @@ -61,7 +59,7 @@ def reload_line(

# Ensure no modules are loaded
self.wait_until(
lambda: self.log_contains(reload_line(0)),
lambda: self.log_contains("Start list: 0, Stop list: 0"),
max_timeout=10)

systemConfig = """
Expand All @@ -73,17 +71,17 @@ def reload_line(
with open(config_path, 'w') as f:
f.write(systemConfig)

# Ensure the module was successfully loaded
# Ensure the module is started
self.wait_until(
lambda: self.log_contains(reload_line(1)),
lambda: self.log_contains("Start list: 1, Stop list: 0"),
max_timeout=10)

# Remove config again
os.remove(config_path)

# Ensure the module was successfully unloaded
# Ensure the module is stopped
self.wait_until(
lambda: self.log_contains(reload_line(0)),
lambda: self.log_contains("Start list: 0, Stop list: 1"),
max_timeout=10)

time.sleep(1)
Expand Down