Skip to content

Commit

Permalink
Add random startup delay to each metricset
Browse files Browse the repository at this point in the history
Add random startup delay to each metricset to avoid the thundering herd problem. Fixes elastic#4010.
  • Loading branch information
andrewkroh committed Jun 14, 2017
1 parent 284a267 commit 5b5c2bd
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
*Heartbeat*

*Metricbeat*
- Add random startup delay to each metricset to avoid the thundering herd problem. {issue}4010[4010]

*Packetbeat*

Expand Down
4 changes: 4 additions & 0 deletions metricbeat/_meta/common.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ metricbeat.config.modules:

# Set to true to enable config reloading
reload.enabled: false

# Maximum amount of time to randomly delay the start of a metricset. Use 0 to
# disable startup delay.
metricbeat.max_start_delay: 10s
11 changes: 10 additions & 1 deletion metricbeat/beater/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package beater

import "github.com/elastic/beats/libbeat/common"
import (
"time"

"github.com/elastic/beats/libbeat/common"
)

// Config is the root of the Metricbeat configuration hierarchy.
type Config struct {
// Modules is a list of module specific configuration data.
Modules []*common.Config `config:"modules"`
ReloadModules *common.Config `config:"config.modules"`
MaxStartDelay time.Duration `config:"max_start_delay"` // Upper bound on the random startup delay for metricsets (use 0 to disable startup delay).
}

var defaultConfig = Config{
MaxStartDelay: 10 * time.Second,
}
11 changes: 4 additions & 7 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// List all registered modules and metricsets.
logp.Info("%s", mb.Registry.String())

config := Config{}

err := rawConfig.Unpack(&config)
if err != nil {
config := defaultConfig
if err := rawConfig.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "error reading configuration file")
}

modules, err := module.NewWrappers(config.Modules, mb.Registry)
modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry)
if err != nil {
// Empty config is fine if dynamic config is enabled
if !config.ReloadModules.Enabled() {
Expand All @@ -61,7 +59,6 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// that a single unresponsive host cannot inadvertently block other hosts
// within the same Module and MetricSet from collection.
func (bt *Metricbeat) Run(b *beat.Beat) error {

var wg sync.WaitGroup

for _, m := range bt.modules {
Expand All @@ -78,7 +75,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
if bt.config.ReloadModules.Enabled() {
logp.Beta("feature dynamic configuration reloading is enabled.")
moduleReloader := cfgfile.NewReloader(bt.config.ReloadModules)
factory := module.NewFactory(b.Publisher)
factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher)

go moduleReloader.Run(factory)
wg.Add(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,22 @@ metricbeat.modules:
hosts: ["root@tcp(127.0.0.1:3306)/"]
------------------------------------------------------------------------------

==== Metricbeat Options
==== General Options

===== max_start_delay

The maximum random delay to apply to the startup of a metricset. Random delays
ranging from [0, _max_start_delay_) are applied to reduce the thundering herd
effect that can occur if a fleet of machines running Metricbeat are restarted at
the same time. Specifying a value of 0 disables the startup delay. The default
is 10s.

[source,yaml]
----
metricbeat.max_start_delay: 10s
----

==== Module Options

You can specify the following options in the `metricbeat` section of the +{beatname_lc}.yml+ config file. These options
are the same for all modules. Each module may have additional configuration options that are specific to that module.
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func ExampleWrapper() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
fmt.Println("Error:", err)
return
Expand Down Expand Up @@ -97,7 +97,7 @@ func ExampleRunner() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
return
}
Expand Down
12 changes: 8 additions & 4 deletions metricbeat/mb/module/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package module

import (
"time"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
Expand All @@ -9,18 +11,20 @@ import (

// Factory is used to register and reload modules
type Factory struct {
client func() publisher.Client
client func() publisher.Client
maxStartDelay time.Duration
}

// NewFactory creates new Reloader instance for the given config
func NewFactory(p publisher.Publisher) *Factory {
func NewFactory(maxStartDelay time.Duration, p publisher.Publisher) *Factory {
return &Factory{
client: p.Connect,
client: p.Connect,
maxStartDelay: maxStartDelay,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
w, err := NewWrapper(c, mb.Registry)
w, err := NewWrapper(r.maxStartDelay, c, mb.Registry)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRunner(t *testing.T) {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 22 additions & 8 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package module

import (
"fmt"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -36,9 +37,10 @@ var (
// Use NewWrapper or NewWrappers to construct new Wrappers.
type Wrapper struct {
mb.Module
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
maxStartDelay time.Duration
}

// metricSetWrapper contains the MetricSet and the private data associated with
Expand All @@ -61,8 +63,8 @@ type stats struct {
// NewWrapper create a new Module and its associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them in the Wrapper.
func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers([]*common.Config{moduleConfig}, r)
func NewWrapper(maxStartDelay time.Duration, moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers(maxStartDelay, []*common.Config{moduleConfig}, r)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +79,7 @@ func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
// NewWrappers creates new Modules and their associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them all in a Wrapper.
func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
func NewWrappers(maxStartDelay time.Duration, modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
modules, err := mb.NewModules(modulesConfig, r)
if err != nil {
return nil, err
Expand All @@ -95,8 +97,9 @@ func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, er
}

mw := &Wrapper{
Module: k,
filters: f,
Module: k,
filters: f,
maxStartDelay: maxStartDelay,
}
wrappers = append(wrappers, mw)

Expand Down Expand Up @@ -188,6 +191,17 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- common.MapStr)
defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+
"'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host()))

// Start each metricset randomly over a period of MaxDelayPeriod.
if msw.module.maxStartDelay > 0 {
delay := time.Duration(rand.Int63n(int64(msw.module.maxStartDelay)))
debugf("%v/%v will start after %v", msw.module.Name(), msw.Name(), delay)
select {
case <-done:
return
case <-time.After(delay):
}
}

debugf("Starting %s", msw)
defer debugf("Stopped %s", msw)

Expand Down
6 changes: 3 additions & 3 deletions metricbeat/mb/module/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestWrapperOfEventFetcher(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestWrapperOfReportingFetcher(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ metricbeat.config.modules:
# Set to true to enable config reloading
reload.enabled: false

# Maximum amount of time to randomly delay the start of a metricset. Use 0 to
# disable startup delay.
metricbeat.max_start_delay: 10s

#========================== Modules configuration ============================
metricbeat.modules:

Expand Down
3 changes: 3 additions & 0 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ metricbeat.config.modules:
reload.enabled: true
{% endif -%}

# Disable random start delay for metricsets.
metricbeat.max_start_delay: 0

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down

0 comments on commit 5b5c2bd

Please sign in to comment.