Skip to content

Commit

Permalink
Enhanced Heartbeat Telemetry (elastic#8621)
Browse files Browse the repository at this point in the history
Add heartbeat telemetry.

We now track:

1. The total number of active monitors
2. The total number of active monitors per protocol
3. The total number of checked endpoints per protocol
4. Counter versions of the above stats for the stats endpoint

This also removes an accidentally comitted test case that tests nothing that broke with these refactors.
  • Loading branch information
andrewvc authored Oct 23, 2018
1 parent 06ec3b9 commit 596a0dd
Show file tree
Hide file tree
Showing 14 changed files with 361 additions and 329 deletions.
20 changes: 10 additions & 10 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ var debugf = logp.MakeDebug("http")
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
return nil, 0, err
}

var body []byte
Expand All @@ -58,44 +58,44 @@ func create(
compression := config.Check.Request.Compression
enc, err = getContentEncoder(compression.Type, compression.Level)
if err != nil {
return nil, err
return nil, 0, err
}

buf := bytes.NewBuffer(nil)
err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody))
if err != nil {
return nil, err
return nil, 0, err
}

body = buf.Bytes()
}

validator := makeValidateResponse(&config.Check.Response)

jobs := make([]monitors.Job, len(config.URLs))
jobs = make([]monitors.Job, len(config.URLs))

if config.ProxyURL != "" {
transport, err := newRoundTripper(&config, tls)
if err != nil {
return nil, err
return nil, 0, err
}

for i, url := range config.URLs {
jobs[i], err = newHTTPMonitorHostJob(url, &config, transport, enc, body, validator)
if err != nil {
return nil, err
return nil, 0, err
}
}
} else {
for i, url := range config.URLs {
jobs[i], err = newHTTPMonitorIPsJob(&config, url, tls, enc, body, validator)
if err != nil {
return nil, err
return nil, 0, err
}
}
}

return jobs, nil
return jobs, len(config.URLs), nil
}

func newRoundTripper(config *Config, tls *transport.TLSConfig) (*http.Transport, error) {
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ func testTLSRequest(t *testing.T, testURL string, certPath string) beat.Event {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, err := create("tls", config)
jobs, endpoints, err := create("tls", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return event
}

Expand Down
15 changes: 7 additions & 8 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ var debugf = logp.MakeDebug("icmp")
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

// TODO: check icmp is support by OS + check we've
Expand All @@ -49,7 +49,6 @@ func create(
// TODO: replace icmp package base reader/sender using raw sockets with
// OS specific solution

var jobs []monitors.Job
addJob := func(t monitors.Job, err error) error {
if err != nil {
return err
Expand All @@ -61,7 +60,7 @@ func create(
ipVersion := config.Mode.Network()
if len(config.Hosts) > 0 && ipVersion == "" {
err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled")
return nil, err
return nil, 0, err
}

var loopErr error
Expand All @@ -71,11 +70,11 @@ func create(
})
if loopErr != nil {
debugf("Failed to initialize ICMP loop %v", loopErr)
return nil, loopErr
return nil, 0, loopErr
}

if err := loop.checkNetworkMode(ipVersion); err != nil {
return nil, err
return nil, 0, err
}

network := config.Mode.Network()
Expand All @@ -90,11 +89,11 @@ func create(
settings := monitors.MakeHostJobSettings(jobName, host, config.Mode)
err := addJob(monitors.MakeByHostJob(settings, pingFactory))
if err != nil {
return nil, err
return nil, 0, err
}
}

return jobs, nil
return jobs, len(config.Hosts), nil
}

func createPingIPFactory(config *Config) func(*net.IPAddr) (common.MapStr, error) {
Expand Down
24 changes: 14 additions & 10 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,32 @@ type connURL struct {
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
return nil, 0, err
}

defaultScheme := "tcp"
if tls != nil {
defaultScheme = "ssl"
}

endpoints, err := collectHosts(&config, defaultScheme)
schemeHosts, err := collectHosts(&config, defaultScheme)
if err != nil {
return nil, err
return nil, 0, err
}

typ := config.Name
timeout := config.Timeout
validator := makeValidateConn(&config)

var jobs []monitors.Job
for scheme, eps := range endpoints {
for scheme, eps := range schemeHosts {
schemeTLS := tls
if scheme == "tcp" || scheme == "plain" {
schemeTLS = nil
Expand All @@ -85,20 +84,25 @@ func create(
TLS: schemeTLS,
})
if err != nil {
return nil, err
return nil, 0, err
}

epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode,
func(dialer transport.Dialer, addr string) (common.MapStr, error) {
return pingHost(dialer, addr, timeout, validator)
})
if err != nil {
return nil, err
return nil, 0, err
}

jobs = append(jobs, epJobs...)
}
return jobs, nil

numHosts := 0
for _, hosts := range schemeHosts {
numHosts += len(hosts)
}
return jobs, numHosts, nil
}

func collectHosts(config *Config, defaultScheme string) (map[string][]dialchain.Endpoint, error) {
Expand Down
8 changes: 6 additions & 2 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
})
require.NoError(t, err)

jobs, err := create("tcp", config)
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return &event
}

Expand All @@ -65,14 +67,16 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string
})
require.NoError(t, err)

jobs, err := create("tcp", config)
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return &event
}

Expand Down
9 changes: 6 additions & 3 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
)

type MockBeatClient struct {
Expand Down Expand Up @@ -99,11 +100,13 @@ func createMockJob(name string, cfg *common.Config) ([]Job, error) {
}

func mockPluginBuilder() pluginBuilder {
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) {
reg := monitoring.NewRegistry()

return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, int, error) {
c := common.Config{}
j, err := createMockJob("test", &c)
return j, err
}}
return j, 1, err
}, newPluginCountersRecorder("test", reg)}
}

func mockPluginsReg() *pluginsReg {
Expand Down
17 changes: 15 additions & 2 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Monitor struct {
scheduler *scheduler.Scheduler
jobTasks []*task
enabled bool
// endpoints is a count of endpoints this monitor measures.
endpoints int
// internalsMtx is used to synchronize access to critical
// internal datastructures
internalsMtx sync.Mutex
Expand All @@ -51,6 +53,10 @@ type Monitor struct {
watch watcher.Watch

pipelineConnector beat.PipelineConnector

// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
stats registryRecorder
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand Down Expand Up @@ -95,9 +101,11 @@ func newMonitor(
watchPollTasks: []*task{},
internalsMtx: sync.Mutex{},
config: config,
stats: monitorPlugin.stats,
}

jobs, err := monitorPlugin.create(config)
jobs, endpoints, err := monitorPlugin.create(config)
m.endpoints = endpoints
if err != nil {
return nil, fmt.Errorf("job err %v", err)
}
Expand Down Expand Up @@ -181,7 +189,8 @@ func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error {
return
}

watchJobs, err := monitorPlugin.create(merged)
watchJobs, endpoints, err := monitorPlugin.create(merged)
m.endpoints = endpoints
if err != nil {
logp.Err("Could not create job from watch file: %v", err)
}
Expand Down Expand Up @@ -227,6 +236,8 @@ func (m *Monitor) Start() {
for _, t := range m.watchPollTasks {
t.Start()
}

m.stats.startMonitor(int64(m.endpoints))
}

// Stop stops the Monitor's execution in its configured scheduler.
Expand All @@ -242,4 +253,6 @@ func (m *Monitor) Stop() {
for _, t := range m.watchPollTasks {
t.Stop()
}

m.stats.stopMonitor(int64(m.endpoints))
}
Loading

0 comments on commit 596a0dd

Please sign in to comment.