-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhanced Heartbeat Telemetry #8621
Changes from 1 commit
1cf1a5d
d85d186
ac5cbe4
0e20f0a
036d373
113ed51
027f045
e3f9d62
998d27a
a15ef68
fab8be9
d9995d5
7a62efb
090e920
339547c
919214d
e7374a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,10 +37,10 @@ var debugf = logp.MakeDebug("icmp") | |
func create( | ||
name string, | ||
cfg *common.Config, | ||
) ([]monitors.Job, error) { | ||
) (jobs []monitors.Job, endpoints int, err error) { | ||
config := DefaultConfig | ||
if err := cfg.Unpack(&config); err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as above, why -1 instead of 0? |
||
} | ||
|
||
// TODO: check icmp is support by OS + check we've | ||
|
@@ -49,7 +49,6 @@ func create( | |
// TODO: replace icmp package base reader/sender using raw sockets with | ||
// OS specific solution | ||
|
||
var jobs []monitors.Job | ||
addJob := func(t monitors.Job, err error) error { | ||
if err != nil { | ||
return err | ||
|
@@ -61,7 +60,7 @@ func create( | |
ipVersion := config.Mode.Network() | ||
if len(config.Hosts) > 0 && ipVersion == "" { | ||
err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled") | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
var loopErr error | ||
|
@@ -71,11 +70,11 @@ func create( | |
}) | ||
if loopErr != nil { | ||
debugf("Failed to initialize ICMP loop %v", loopErr) | ||
return nil, loopErr | ||
return nil, -1, loopErr | ||
} | ||
|
||
if err := loop.checkNetworkMode(ipVersion); err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
network := config.Mode.Network() | ||
|
@@ -90,11 +89,11 @@ func create( | |
settings := monitors.MakeHostJobSettings(jobName, host, config.Mode) | ||
err := addJob(monitors.MakeByHostJob(settings, pingFactory)) | ||
if err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
} | ||
|
||
return jobs, nil | ||
return jobs, len(config.Hosts), nil | ||
} | ||
|
||
func createPingIPFactory(config *Config) func(*net.IPAddr) (common.MapStr, error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,33 +47,32 @@ type connURL struct { | |
func create( | ||
name string, | ||
cfg *common.Config, | ||
) ([]monitors.Job, error) { | ||
) (jobs []monitors.Job, endpoints int, err error) { | ||
config := DefaultConfig | ||
if err := cfg.Unpack(&config); err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
tls, err := outputs.LoadTLSConfig(config.TLS) | ||
if err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
defaultScheme := "tcp" | ||
if tls != nil { | ||
defaultScheme = "ssl" | ||
} | ||
|
||
endpoints, err := collectHosts(&config, defaultScheme) | ||
schemeHosts, err := collectHosts(&config, defaultScheme) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was unclear before. This is really a map of |
||
if err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
typ := config.Name | ||
timeout := config.Timeout | ||
validator := makeValidateConn(&config) | ||
|
||
var jobs []monitors.Job | ||
for scheme, eps := range endpoints { | ||
for scheme, eps := range schemeHosts { | ||
schemeTLS := tls | ||
if scheme == "tcp" || scheme == "plain" { | ||
schemeTLS = nil | ||
|
@@ -85,20 +84,25 @@ func create( | |
TLS: schemeTLS, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode, | ||
func(dialer transport.Dialer, addr string) (common.MapStr, error) { | ||
return pingHost(dialer, addr, timeout, validator) | ||
}) | ||
if err != nil { | ||
return nil, err | ||
return nil, -1, err | ||
} | ||
|
||
jobs = append(jobs, epJobs...) | ||
} | ||
return jobs, nil | ||
|
||
numHosts := 0 | ||
for _, hosts := range schemeHosts { | ||
numHosts += len(hosts) | ||
} | ||
return jobs, numHosts, nil | ||
} | ||
|
||
func collectHosts(config *Config, defaultScheme string) (map[string][]dialchain.Endpoint, error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,10 +99,10 @@ func createMockJob(name string, cfg *common.Config) ([]Job, error) { | |
} | ||
|
||
func mockPluginBuilder() pluginBuilder { | ||
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) { | ||
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, int, error) { | ||
c := common.Config{} | ||
j, err := createMockJob("test", &c) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this breaks the tests because there is not monitor type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now fixed since we only log instead of panic in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now further improved by our dynamic creation of metrics, so this scenario is no longer possible. |
||
return j, err | ||
return j, 1, err | ||
}} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,8 +30,38 @@ import ( | |
"github.com/elastic/beats/libbeat/beat" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/logp" | ||
"github.com/elastic/beats/libbeat/monitoring" | ||
) | ||
|
||
var teleRegistry *monitoring.Registry = monitoring.Default.NewRegistry("heartbeat") | ||
var httpRegistry *monitoring.Registry = teleRegistry.NewRegistry("heartbeat.http") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @urso I initially thought I could nest by writing: var teleRegistry *monitoring.Registry = monitoring.Default.NewRegistry("heartbeat")
var httpRegistry *monitoring.Registry = teleRegistry.NewRegistry("http") but discovered that despite calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need the full path? Where did you see things going wrong? When calling
Why do you create a new global registry? Is this supposed to be metrics or for phoning-home data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see now that these lines are actually noops, we never use these registries, but define them further down. I was confused. In that subsequent block I do define them correctly without the full paths. This is what happens when you write code late in the day! Apologies. |
||
var tcpRegistry *monitoring.Registry = teleRegistry.NewRegistry("heartbeat.tcp") | ||
var icmpRegistry *monitoring.Registry = teleRegistry.NewRegistry("heartbeat.icmp") | ||
|
||
type protocolStats struct { | ||
monitors *monitoring.Int | ||
endpoints *monitoring.Int | ||
} | ||
|
||
func newProtocolStats(reg *monitoring.Registry) protocolStats { | ||
return protocolStats{ | ||
monitoring.NewInt(reg, "monitors"), | ||
monitoring.NewInt(reg, "endpoints"), | ||
} | ||
} | ||
|
||
var teleStats = struct { | ||
monitors *monitoring.Int | ||
protocols map[string]protocolStats | ||
}{ | ||
monitors: monitoring.NewInt(teleRegistry, "monitors"), | ||
protocols: map[string]protocolStats{ | ||
"http": newProtocolStats(teleRegistry.NewRegistry("http")), | ||
"tcp": newProtocolStats(teleRegistry.NewRegistry("tcp")), | ||
"icmp": newProtocolStats(teleRegistry.NewRegistry("icmp")), | ||
}, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer for new code not to rely on globals. |
||
|
||
// Monitor represents a configured recurring monitoring task loaded from a config file. Starting it | ||
// will cause it to run with the given scheduler until Stop() is called. | ||
type Monitor struct { | ||
|
@@ -42,6 +72,8 @@ type Monitor struct { | |
scheduler *scheduler.Scheduler | ||
jobTasks []*task | ||
enabled bool | ||
// endpoints is a count of endpoints this monitor measures. | ||
endpoints int | ||
// internalsMtx is used to synchronize access to critical | ||
// internal datastructures | ||
internalsMtx sync.Mutex | ||
|
@@ -97,7 +129,8 @@ func newMonitor( | |
config: config, | ||
} | ||
|
||
jobs, err := monitorPlugin.create(config) | ||
jobs, endpoints, err := monitorPlugin.create(config) | ||
m.endpoints = endpoints | ||
if err != nil { | ||
return nil, fmt.Errorf("job err %v", err) | ||
} | ||
|
@@ -181,7 +214,8 @@ func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error { | |
return | ||
} | ||
|
||
watchJobs, err := monitorPlugin.create(merged) | ||
watchJobs, endpoints, err := monitorPlugin.create(merged) | ||
m.endpoints = endpoints | ||
if err != nil { | ||
logp.Err("Could not create job from watch file: %v", err) | ||
} | ||
|
@@ -227,6 +261,15 @@ func (m *Monitor) Start() { | |
for _, t := range m.watchPollTasks { | ||
t.Start() | ||
} | ||
|
||
teleStats.monitors.Inc() | ||
|
||
if stats, ok := teleStats.protocols[m.name]; !ok { | ||
panic(fmt.Sprintf("Unknown protocol for monitor stats: %s", m.name)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we really panic if can't to the stats? Assuming someone adds a new protocol, forgets to update the list and we do a release I would expect that we log an error but not stop the application because of it. On the other hand having a panic makes it easy to spot the problem early on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to favor panic for "This should never happen" sorts of things. The benefit is that if we add a new protocol we won't forget stats support. In this case, I think its OK, because I can't see how we could go through a dev/test/release cycle without seeing the panic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OTOH, this breaks with the mock "test" monitor. Maybe it should jus be a log message. WDYT? The other approach would be to add "test " as a protocol (or going further, to inject the map of protocols, but that seems like overkill). |
||
} else { | ||
stats.monitors.Inc() | ||
stats.endpoints.Add(int64(m.endpoints)) | ||
} | ||
} | ||
|
||
// Stop stops the Monitor's execution in its configured scheduler. | ||
|
@@ -242,4 +285,13 @@ func (m *Monitor) Stop() { | |
for _, t := range m.watchPollTasks { | ||
t.Stop() | ||
} | ||
|
||
teleStats.monitors.Dec() | ||
|
||
if stats, ok := teleStats.protocols[m.name]; !ok { | ||
panic(fmt.Sprintf("Unknown protocol for monitor stats: %s", m.name)) | ||
} else { | ||
stats.monitors.Dec() | ||
stats.endpoints.Sub(int64(m.endpoints)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,7 +48,7 @@ func init() { | |
|
||
// PluginBuilder is the signature of functions used to build active | ||
// monitors | ||
type PluginBuilder func(string, *common.Config) ([]Job, error) | ||
type PluginBuilder func(string, *common.Config) (jobs []Job, endpoints int, err error) | ||
|
||
// Type represents whether a plugin is active or passive. | ||
type Type uint8 | ||
|
@@ -129,7 +129,7 @@ func (r *pluginsReg) monitorNames() []string { | |
return names | ||
} | ||
|
||
func (e *pluginBuilder) create(cfg *common.Config) ([]Job, error) { | ||
func (e *pluginBuilder) create(cfg *common.Config) (jobs []Job, endpoints int, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. explicit names because |
||
return e.builder(e.name, cfg) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you pick -1 and not 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not partial to -1 or 0. I picked -1 because it's obviously a nonsense value in the error case. In any event, in all these situations the value should be discarded by any callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking: The method returns the number of endpoints it created. In the case of error this is 0 and not -1. Lets assume I always count the value even in the error case, the results would still be correct.