Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Heartbeat Telemetry #8621

Merged
merged 17 commits into from
Oct 23, 2018
20 changes: 10 additions & 10 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ var debugf = logp.MakeDebug("http")
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
return nil, 0, err
}

var body []byte
Expand All @@ -58,44 +58,44 @@ func create(
compression := config.Check.Request.Compression
enc, err = getContentEncoder(compression.Type, compression.Level)
if err != nil {
return nil, err
return nil, 0, err
}

buf := bytes.NewBuffer(nil)
err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody))
if err != nil {
return nil, err
return nil, 0, err
}

body = buf.Bytes()
}

validator := makeValidateResponse(&config.Check.Response)

jobs := make([]monitors.Job, len(config.URLs))
jobs = make([]monitors.Job, len(config.URLs))

if config.ProxyURL != "" {
transport, err := newRoundTripper(&config, tls)
if err != nil {
return nil, err
return nil, 0, err
}

for i, url := range config.URLs {
jobs[i], err = newHTTPMonitorHostJob(url, &config, transport, enc, body, validator)
if err != nil {
return nil, err
return nil, 0, err
}
}
} else {
for i, url := range config.URLs {
jobs[i], err = newHTTPMonitorIPsJob(&config, url, tls, enc, body, validator)
if err != nil {
return nil, err
return nil, 0, err
}
}
}

return jobs, nil
return jobs, len(config.URLs), nil
}

func newRoundTripper(config *Config, tls *transport.TLSConfig) (*http.Transport, error) {
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ func testTLSRequest(t *testing.T, testURL string, certPath string) beat.Event {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, err := create("tls", config)
jobs, endpoints, err := create("tls", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return event
}

Expand Down
15 changes: 7 additions & 8 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ var debugf = logp.MakeDebug("icmp")
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

// TODO: check icmp is support by OS + check we've
Expand All @@ -49,7 +49,6 @@ func create(
// TODO: replace icmp package base reader/sender using raw sockets with
// OS specific solution

var jobs []monitors.Job
addJob := func(t monitors.Job, err error) error {
if err != nil {
return err
Expand All @@ -61,7 +60,7 @@ func create(
ipVersion := config.Mode.Network()
if len(config.Hosts) > 0 && ipVersion == "" {
err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled")
return nil, err
return nil, 0, err
}

var loopErr error
Expand All @@ -71,11 +70,11 @@ func create(
})
if loopErr != nil {
debugf("Failed to initialize ICMP loop %v", loopErr)
return nil, loopErr
return nil, 0, loopErr
}

if err := loop.checkNetworkMode(ipVersion); err != nil {
return nil, err
return nil, 0, err
}

network := config.Mode.Network()
Expand All @@ -90,11 +89,11 @@ func create(
settings := monitors.MakeHostJobSettings(jobName, host, config.Mode)
err := addJob(monitors.MakeByHostJob(settings, pingFactory))
if err != nil {
return nil, err
return nil, 0, err
}
}

return jobs, nil
return jobs, len(config.Hosts), nil
}

func createPingIPFactory(config *Config) func(*net.IPAddr) (common.MapStr, error) {
Expand Down
24 changes: 14 additions & 10 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,32 @@ type connURL struct {
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
return nil, 0, err
}

defaultScheme := "tcp"
if tls != nil {
defaultScheme = "ssl"
}

endpoints, err := collectHosts(&config, defaultScheme)
schemeHosts, err := collectHosts(&config, defaultScheme)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was unclear before. This is really a map of scheme(str) -> []host(str)

if err != nil {
return nil, err
return nil, 0, err
}

typ := config.Name
timeout := config.Timeout
validator := makeValidateConn(&config)

var jobs []monitors.Job
for scheme, eps := range endpoints {
for scheme, eps := range schemeHosts {
schemeTLS := tls
if scheme == "tcp" || scheme == "plain" {
schemeTLS = nil
Expand All @@ -85,20 +84,25 @@ func create(
TLS: schemeTLS,
})
if err != nil {
return nil, err
return nil, 0, err
}

epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode,
func(dialer transport.Dialer, addr string) (common.MapStr, error) {
return pingHost(dialer, addr, timeout, validator)
})
if err != nil {
return nil, err
return nil, 0, err
}

jobs = append(jobs, epJobs...)
}
return jobs, nil

numHosts := 0
for _, hosts := range schemeHosts {
numHosts += len(hosts)
}
return jobs, numHosts, nil
}

func collectHosts(config *Config, defaultScheme string) (map[string][]dialchain.Endpoint, error) {
Expand Down
8 changes: 6 additions & 2 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
})
require.NoError(t, err)

jobs, err := create("tcp", config)
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return &event
}

Expand All @@ -65,14 +67,16 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string
})
require.NoError(t, err)

jobs, err := create("tcp", config)
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return &event
}

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ func createMockJob(name string, cfg *common.Config) ([]Job, error) {
}

func mockPluginBuilder() pluginBuilder {
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) {
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, int, error) {
c := common.Config{}
j, err := createMockJob("test", &c)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks the tests because there is not monitor type test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now fixed since we only log instead of panic in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now further improved by our dynamic creation of metrics, so this scenario is no longer possible.

return j, err
return j, 1, err
}}
}

Expand Down
56 changes: 54 additions & 2 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,38 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
)

var teleRegistry *monitoring.Registry = monitoring.Default.NewRegistry("heartbeat")
var httpRegistry *monitoring.Registry = teleRegistry.NewRegistry("heartbeat.http")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso I initially thought I could nest by writing:

var teleRegistry *monitoring.Registry = monitoring.Default.NewRegistry("heartbeat")
var httpRegistry *monitoring.Registry = teleRegistry.NewRegistry("http")

but discovered that despite calling NewRegistry on an existing namespaced registry I still needed the full path. Is this by design? If so, what is the intended purpose of being able to chain NewRegistry? Should I refactor this to always invoke NewRegistry on monitoring.Default?

Copy link

@urso urso Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the full path? Where did you see things going wrong?

When calling (*Regsitry).NewRegistry(name), then it adds the new registry as a child to the parent with name as field. This way you build a tree.
The collectors/representation is not aware of dots like heartbeat.http. Actually you should not use dots for names in your registry. Depending on the metrics collector in use you might get events like:

{
  "heartbeat": {
    "heartbeat.http": {
      ...
    }
  },
  ...
}

Why do you create a new global registry? Is this supposed to be metrics or for phoning-home data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now that these lines are actually noops, we never use these registries, but define them further down. I was confused. In that subsequent block I do define them correctly without the full paths.

This is what happens when you write code late in the day! Apologies.

var tcpRegistry *monitoring.Registry = teleRegistry.NewRegistry("heartbeat.tcp")
var icmpRegistry *monitoring.Registry = teleRegistry.NewRegistry("heartbeat.icmp")

type protocolStats struct {
monitors *monitoring.Int
endpoints *monitoring.Int
}

func newProtocolStats(reg *monitoring.Registry) protocolStats {
return protocolStats{
monitoring.NewInt(reg, "monitors"),
monitoring.NewInt(reg, "endpoints"),
}
}

var teleStats = struct {
monitors *monitoring.Int
protocols map[string]protocolStats
}{
monitors: monitoring.NewInt(teleRegistry, "monitors"),
protocols: map[string]protocolStats{
"http": newProtocolStats(teleRegistry.NewRegistry("http")),
"tcp": newProtocolStats(teleRegistry.NewRegistry("tcp")),
"icmp": newProtocolStats(teleRegistry.NewRegistry("icmp")),
},
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer for new code not to rely on globals.


// Monitor represents a configured recurring monitoring task loaded from a config file. Starting it
// will cause it to run with the given scheduler until Stop() is called.
type Monitor struct {
Expand All @@ -42,6 +72,8 @@ type Monitor struct {
scheduler *scheduler.Scheduler
jobTasks []*task
enabled bool
// endpoints is a count of endpoints this monitor measures.
endpoints int
// internalsMtx is used to synchronize access to critical
// internal datastructures
internalsMtx sync.Mutex
Expand Down Expand Up @@ -97,7 +129,8 @@ func newMonitor(
config: config,
}

jobs, err := monitorPlugin.create(config)
jobs, endpoints, err := monitorPlugin.create(config)
m.endpoints = endpoints
if err != nil {
return nil, fmt.Errorf("job err %v", err)
}
Expand Down Expand Up @@ -181,7 +214,8 @@ func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error {
return
}

watchJobs, err := monitorPlugin.create(merged)
watchJobs, endpoints, err := monitorPlugin.create(merged)
m.endpoints = endpoints
if err != nil {
logp.Err("Could not create job from watch file: %v", err)
}
Expand Down Expand Up @@ -227,6 +261,15 @@ func (m *Monitor) Start() {
for _, t := range m.watchPollTasks {
t.Start()
}

teleStats.monitors.Inc()

if stats, ok := teleStats.protocols[m.name]; !ok {
logp.Err("Unknown protocol for monitor stats: %s", m.name)
} else {
stats.monitors.Inc()
stats.endpoints.Add(int64(m.endpoints))
}
}

// Stop stops the Monitor's execution in its configured scheduler.
Expand All @@ -242,4 +285,13 @@ func (m *Monitor) Stop() {
for _, t := range m.watchPollTasks {
t.Stop()
}

teleStats.monitors.Dec()

if stats, ok := teleStats.protocols[m.name]; !ok {
logp.Err("Unknown protocol for monitor stats: %s", m.name)
} else {
stats.monitors.Dec()
stats.endpoints.Sub(int64(m.endpoints))
}
}
4 changes: 2 additions & 2 deletions heartbeat/monitors/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func init() {

// PluginBuilder is the signature of functions used to build active
// monitors
type PluginBuilder func(string, *common.Config) ([]Job, error)
type PluginBuilder func(string, *common.Config) (jobs []Job, endpoints int, err error)

// Type represents whether a plugin is active or passive.
type Type uint8
Expand Down Expand Up @@ -129,7 +129,7 @@ func (r *pluginsReg) monitorNames() []string {
return names
}

func (e *pluginBuilder) create(cfg *common.Config) ([]Job, error) {
func (e *pluginBuilder) create(cfg *common.Config) (jobs []Job, endpoints int, err error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit names because endpoints int is more explanatory than int

return e.builder(e.name, cfg)
}

Expand Down
Loading