Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Fix / clarify the module reload logic #16440

Merged
merged 14 commits into from
Feb 26, 2020
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix index names for indexing not always guaranteed to be lower case. {pull}16081[16081]
- Add `ssl.ca_sha256` option to the supported TLS option, this allow to check that a specific certificate is used as part of the verified chain. {issue}15717[15717]
- Fix loading processors from annotation hints. {pull}16348[16348]
- Fix an issue that could cause redundant configuration reloads. {pull}16440[16440]
- Fix k8s pods labels broken schema. {pull}16480[16480]
- Upgrade go-ucfg to latest v0.8.3. {pull}16450{16450}

Expand Down Expand Up @@ -114,6 +115,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add document_id setting to decode_json_fields processor. {pull}15859[15859]
- Include network information by default on add_host_metadata and add_observer_metadata. {issue}15347[15347] {pull}16077[16077]
- Add `aws_ec2` provider for autodiscover. {issue}12518[12518] {pull}14823[14823]
- Add monitoring variable `libbeat.config.scans` to distinguish scans of the configuration directory from actual reloads of its contents. {pull}16440[16440]
- Add support for multiple password in redis output. {issue}16058[16058] {pull}16206[16206]

*Auditbeat*
Expand Down
6 changes: 5 additions & 1 deletion heartbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def test_config_add(self):
"""
self.setup_dynamic()

# Wait until the beat is running and has performed its first load of
# the config directory.
self.wait_until(lambda: self.log_contains(
"Starting reload procedure, current runners: 0"))

Expand All @@ -77,8 +79,10 @@ def test_config_add(self):
self.write_dyn_config(
"test.yml", self.http_cfg("myid", "http://localhost:{}".format(server.server_port)))

# The beat should recognize there is a new runner to start.
self.wait_until(lambda: self.log_contains(
"Starting reload procedure, current runners: 1"))
"Start list: 1, Stop list: 0"),
max_timeout=10)

self.wait_until(lambda: self.output_lines() > 0)

Expand Down
27 changes: 18 additions & 9 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ var (

debugf = logp.MakeDebug("cfgfile")

// configScans measures how many times the config dir was scanned for
// changes, configReloads measures how many times there were changes that
// triggered an actual reload.
configScans = monitoring.NewInt(nil, "libbeat.config.scans")
configReloads = monitoring.NewInt(nil, "libbeat.config.reloads")
moduleStarts = monitoring.NewInt(nil, "libbeat.config.module.starts")
moduleStops = monitoring.NewInt(nil, "libbeat.config.module.stops")
Expand Down Expand Up @@ -185,7 +189,11 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
rl.config.Reload.Period = 0
}

overwriteUpdate := true
// If forceReload is set, the configuration should be reloaded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the renaming of the variable.

// even if there are no changes. It is set on the first iteration,
// and whenever an attempted reload fails. It is unset whenever
// a reload succeeds.
forceReload := true

for {
select {
Expand All @@ -195,7 +203,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {

case <-time.After(rl.config.Reload.Period):
debugf("Scan for new config files")
configReloads.Add(1)
configScans.Add(1)

files, updated, err := gw.Scan()
if err != nil {
Expand All @@ -204,21 +212,22 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
logp.Err("Error fetching new config files: %v", err)
}

// no file changes
if !updated && !overwriteUpdate {
overwriteUpdate = false
// if there are no changes, skip this reload unless forceReload is set.
if !updated && !forceReload {
continue
}
configReloads.Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if the config reload counter should only increase when it was successful? And have an additional error counter for debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant, because an error counter here wouldn't be at the right granularity -- we get an error here even if almost everything reloads successfully and there's one invalid file in the search path, and it would be misleading to report that as though the reload didn't happen.

The related monitor vars libbeat.config.module.{starts, stops, running} are defined in this file also but they aren't used at all. I was planning to enable them in a followup (it requires changing the RunnerList API and modifying other call sites or I'd have done it in this PR), but it might make sense to add errors at the same time to track reload failures at module granularity instead of the whole configuration directory.


// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))

if err := list.Reload(configs); err != nil {
// Make sure the next run also updates because some runners were not properly loaded
overwriteUpdate = true
}
err = list.Reload(configs)
// Force reload on the next iteration if and only if this one failed.
// (Any errors are already logged by list.Reload, so we don't need to
// propagate the details further.)
forceReload = err != nil
}

// Path loading is enabled but not reloading. Loads files only once and then stops.
Expand Down
107 changes: 107 additions & 0 deletions libbeat/cfgfile/reload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package cfgfile

import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
)

func TestReloader(t *testing.T) {
// Create random temp directory
dir, err := ioutil.TempDir("", "libbeat-reloader")
defer os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
glob := dir + "/*.yml"

config := common.MustNewConfigFrom(common.MapStr{
"path": glob,
"reload": common.MapStr{
"period": "1s",
"enabled": true,
},
})
// common.Config{}
reloader := NewReloader(nil, config)
retryCount := 10

go reloader.Run(nil)
defer reloader.Stop()

// wait until configScans >= 2 (which should happen after ~1 second)
for i := 0; i < retryCount; i++ {
if configScans.Get() >= 2 {
break
}
// time interval is slightly more than a second so we don't slightly
// undershoot the first iteration and wait a whole extra second.
time.Sleep(1100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about tests with Sleep as they tend to get flaky earlier or later. Is there an other way to achieve this "wait"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know a better way, though I'm open to suggestions :-) A spin lock would presumably be even worse. The difficulty is that the behavior being tested is itself timer-based. I did structure the logic here so that the test is only affected by the state changes and not other timing issues -- the only "race condition" here is that the unit test might wait several seconds without the reloader's 1-second-interval timer going off, in which case it has no way to tell whether anything's happening. But, the retry timeout was shorter than I meant it to be, so I just increased it to 10 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm late on this one. Lets see if it shows up in flaky tests in the future ;-)

What we did as workarounds in the past is:

  • Check for log messages
  • Have an other data point we can check very frequently for a change.

In general I think it's problematic if a single unit tests increases the test suite by seconds and only spends the time waiting. To improve the above, perhaps there is a way to decrease the scan frequency and do checks every 10ms but retry 1000 times.

}
if configScans.Get() < 2 {
assert.Fail(t, "Timed out waiting for configScans >= 2")
}

// The first scan should cause a reload, but additional ones should not,
// so configReloads should still be 1.
assert.Equal(t, int64(1), configReloads.Get())

// Write a file to the reloader path to trigger a real reload
content := []byte("test\n")
err = ioutil.WriteFile(dir+"/config1.yml", content, 0644)
assert.NoError(t, err)

// Wait for the number of scans to increase at least twice. This is somewhat
// pedantic, but if we just wait for the next scan, it's possible to wake up
// during the brief interval after configScans is updated but before
// configReloads is, giving a false negative. Waiting two iterations
// guarantees that the change from the first one has taken effect.
targetScans := configScans.Get() + 2
for i := 0; i < retryCount; i++ {
time.Sleep(time.Second)
if configScans.Get() >= targetScans {
break
}
}
if configScans.Get() < targetScans {
assert.Fail(t,
fmt.Sprintf("Timed out waiting for configScans >= %d", targetScans))
}

// The number of reloads should now have increased. It would be nicer to
// check if the value is exactly 2, but we can't guarantee this: the glob
// watcher includes an extra 1-second margin around the real modification
// time, so changes that fall too close to a scan interval can be detected
// twice.
if configReloads.Get() < 2 {
assert.Fail(t,
fmt.Sprintf(
"Reloader performed %d scans but only reloaded once",
configScans.Get()))
}
}
1 change: 1 addition & 0 deletions libbeat/docs/http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ curl -XGET 'localhost:5066/stats?pretty'
"starts": 0,
"stops": 0
},
"scans": 1,
"reloads": 1
},
"output": {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/tests/system/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_stats(self):
data = json.loads(r.content)

# Test one data point
assert data["libbeat"]["config"]["reloads"] == 0
assert data["libbeat"]["config"]["scans"] == 0

proc.check_kill_and_wait()

Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/beat/stats/_meta/test/stats.800.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"starts": 0,
"stops": 0
},
"scans": 1,
"reloads": 1
},
"output": {
Expand Down
12 changes: 5 additions & 7 deletions metricbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def test_reload(self):

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_start_stop(self):
def reload_line(
num_runners): return "Starting reload procedure, current runners: %d" % num_runners
"""
Test if module is properly started and stopped
"""
Expand All @@ -61,7 +59,7 @@ def reload_line(

# Ensure no modules are loaded
self.wait_until(
lambda: self.log_contains(reload_line(0)),
lambda: self.log_contains("Start list: 0, Stop list: 0"),
max_timeout=10)

systemConfig = """
Expand All @@ -73,17 +71,17 @@ def reload_line(
with open(config_path, 'w') as f:
f.write(systemConfig)

# Ensure the module was successfully loaded
# Ensure the module is started
self.wait_until(
lambda: self.log_contains(reload_line(1)),
lambda: self.log_contains("Start list: 1, Stop list: 0"),
max_timeout=10)

# Remove config again
os.remove(config_path)

# Ensure the module was successfully unloaded
# Ensure the module is stopped
self.wait_until(
lambda: self.log_contains(reload_line(0)),
lambda: self.log_contains("Start list: 0, Stop list: 1"),
max_timeout=10)

time.sleep(1)
Expand Down