Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Heartbeat Telemetry #8621

Merged
merged 17 commits into from
Oct 23, 2018
20 changes: 10 additions & 10 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ var debugf = logp.MakeDebug("http")
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
return nil, 0, err
}

var body []byte
Expand All @@ -58,44 +58,44 @@ func create(
compression := config.Check.Request.Compression
enc, err = getContentEncoder(compression.Type, compression.Level)
if err != nil {
return nil, err
return nil, 0, err
}

buf := bytes.NewBuffer(nil)
err = enc.Encode(buf, bytes.NewBufferString(config.Check.Request.SendBody))
if err != nil {
return nil, err
return nil, 0, err
}

body = buf.Bytes()
}

validator := makeValidateResponse(&config.Check.Response)

jobs := make([]monitors.Job, len(config.URLs))
jobs = make([]monitors.Job, len(config.URLs))

if config.ProxyURL != "" {
transport, err := newRoundTripper(&config, tls)
if err != nil {
return nil, err
return nil, 0, err
}

for i, url := range config.URLs {
jobs[i], err = newHTTPMonitorHostJob(url, &config, transport, enc, body, validator)
if err != nil {
return nil, err
return nil, 0, err
}
}
} else {
for i, url := range config.URLs {
jobs[i], err = newHTTPMonitorIPsJob(&config, url, tls, enc, body, validator)
if err != nil {
return nil, err
return nil, 0, err
}
}
}

return jobs, nil
return jobs, len(config.URLs), nil
}

func newRoundTripper(config *Config, tls *transport.TLSConfig) (*http.Transport, error) {
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ func testTLSRequest(t *testing.T, testURL string, certPath string) beat.Event {
config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, err := create("tls", config)
jobs, endpoints, err := create("tls", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return event
}

Expand Down
15 changes: 7 additions & 8 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ var debugf = logp.MakeDebug("icmp")
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

// TODO: check icmp is support by OS + check we've
Expand All @@ -49,7 +49,6 @@ func create(
// TODO: replace icmp package base reader/sender using raw sockets with
// OS specific solution

var jobs []monitors.Job
addJob := func(t monitors.Job, err error) error {
if err != nil {
return err
Expand All @@ -61,7 +60,7 @@ func create(
ipVersion := config.Mode.Network()
if len(config.Hosts) > 0 && ipVersion == "" {
err := fmt.Errorf("pinging hosts requires ipv4 or ipv6 mode enabled")
return nil, err
return nil, 0, err
}

var loopErr error
Expand All @@ -71,11 +70,11 @@ func create(
})
if loopErr != nil {
debugf("Failed to initialize ICMP loop %v", loopErr)
return nil, loopErr
return nil, 0, loopErr
}

if err := loop.checkNetworkMode(ipVersion); err != nil {
return nil, err
return nil, 0, err
}

network := config.Mode.Network()
Expand All @@ -90,11 +89,11 @@ func create(
settings := monitors.MakeHostJobSettings(jobName, host, config.Mode)
err := addJob(monitors.MakeByHostJob(settings, pingFactory))
if err != nil {
return nil, err
return nil, 0, err
}
}

return jobs, nil
return jobs, len(config.Hosts), nil
}

func createPingIPFactory(config *Config) func(*net.IPAddr) (common.MapStr, error) {
Expand Down
24 changes: 14 additions & 10 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,32 @@ type connURL struct {
func create(
name string,
cfg *common.Config,
) ([]monitors.Job, error) {
) (jobs []monitors.Job, endpoints int, err error) {
config := DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
return nil, 0, err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
return nil, 0, err
}

defaultScheme := "tcp"
if tls != nil {
defaultScheme = "ssl"
}

endpoints, err := collectHosts(&config, defaultScheme)
schemeHosts, err := collectHosts(&config, defaultScheme)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was unclear before. This is really a map of scheme(str) -> []host(str)

if err != nil {
return nil, err
return nil, 0, err
}

typ := config.Name
timeout := config.Timeout
validator := makeValidateConn(&config)

var jobs []monitors.Job
for scheme, eps := range endpoints {
for scheme, eps := range schemeHosts {
schemeTLS := tls
if scheme == "tcp" || scheme == "plain" {
schemeTLS = nil
Expand All @@ -85,20 +84,25 @@ func create(
TLS: schemeTLS,
})
if err != nil {
return nil, err
return nil, 0, err
}

epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode,
func(dialer transport.Dialer, addr string) (common.MapStr, error) {
return pingHost(dialer, addr, timeout, validator)
})
if err != nil {
return nil, err
return nil, 0, err
}

jobs = append(jobs, epJobs...)
}
return jobs, nil

numHosts := 0
for _, hosts := range schemeHosts {
numHosts += len(hosts)
}
return jobs, numHosts, nil
}

func collectHosts(config *Config, defaultScheme string) (map[string][]dialchain.Endpoint, error) {
Expand Down
8 changes: 6 additions & 2 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
})
require.NoError(t, err)

jobs, err := create("tcp", config)
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return &event
}

Expand All @@ -65,14 +67,16 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string
})
require.NoError(t, err)

jobs, err := create("tcp", config)
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return &event
}

Expand Down
9 changes: 6 additions & 3 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
)

type MockBeatClient struct {
Expand Down Expand Up @@ -99,11 +100,13 @@ func createMockJob(name string, cfg *common.Config) ([]Job, error) {
}

func mockPluginBuilder() pluginBuilder {
return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, error) {
reg := monitoring.NewRegistry()

return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]Job, int, error) {
c := common.Config{}
j, err := createMockJob("test", &c)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks the tests because there is not monitor type test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now fixed since we only log instead of panic in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now further improved by our dynamic creation of metrics, so this scenario is no longer possible.

return j, err
}}
return j, 1, err
}, newPluginCountersRecorder("test", reg)}
}

func mockPluginsReg() *pluginsReg {
Expand Down
17 changes: 15 additions & 2 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Monitor struct {
scheduler *scheduler.Scheduler
jobTasks []*task
enabled bool
// endpoints is a count of endpoints this monitor measures.
endpoints int
// internalsMtx is used to synchronize access to critical
// internal datastructures
internalsMtx sync.Mutex
Expand All @@ -51,6 +53,10 @@ type Monitor struct {
watch watcher.Watch

pipelineConnector beat.PipelineConnector

// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
stats registryRecorder
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand Down Expand Up @@ -95,9 +101,11 @@ func newMonitor(
watchPollTasks: []*task{},
internalsMtx: sync.Mutex{},
config: config,
stats: monitorPlugin.stats,
}

jobs, err := monitorPlugin.create(config)
jobs, endpoints, err := monitorPlugin.create(config)
m.endpoints = endpoints
if err != nil {
return nil, fmt.Errorf("job err %v", err)
}
Expand Down Expand Up @@ -181,7 +189,8 @@ func (m *Monitor) makeWatchTasks(monitorPlugin pluginBuilder) error {
return
}

watchJobs, err := monitorPlugin.create(merged)
watchJobs, endpoints, err := monitorPlugin.create(merged)
m.endpoints = endpoints
if err != nil {
logp.Err("Could not create job from watch file: %v", err)
}
Expand Down Expand Up @@ -227,6 +236,8 @@ func (m *Monitor) Start() {
for _, t := range m.watchPollTasks {
t.Start()
}

m.stats.startMonitor(int64(m.endpoints))
}

// Stop stops the Monitor's execution in its configured scheduler.
Expand All @@ -242,4 +253,6 @@ func (m *Monitor) Stop() {
for _, t := range m.watchPollTasks {
t.Stop()
}

m.stats.stopMonitor(int64(m.endpoints))
}
Loading