diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4af319f78c10..50ca34b00e48 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -194,6 +194,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Made monitors.d configuration part of the default config. {pull}9004[9004] - Fixed rare issue where TLS connections to endpoints with x509 certificates missing either notBefore or notAfter would cause the check to fail with a stacktrace. {pull}9566[9566] +- Fix checks for TCP send/receive data {pull}11118[11118] *Journalbeat* diff --git a/heartbeat/monitors/active/tcp/task.go b/heartbeat/monitors/active/tcp/task.go index 9729d6ab5e0d..4c5523f671a5 100644 --- a/heartbeat/monitors/active/tcp/task.go +++ b/heartbeat/monitors/active/tcp/task.go @@ -70,7 +70,8 @@ func pingHost( }, }) if err != nil { - event.PutValue("error", reason.FailValidate(err)) + return reason.MakeValidateError(err) } + return nil } diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index fc857a298b5f..3b25685636a4 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -25,6 +25,7 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "testing" "github.com/stretchr/testify/require" @@ -38,11 +39,16 @@ import ( ) func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event { - config, err := common.NewConfigFrom(common.MapStr{ + config := common.MapStr{ "hosts": host, "ports": port, "timeout": "1s", - }) + } + return testTCPConfigCheck(t, config, host, port) +} + +func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port uint16) *beat.Event { + config, err := common.NewConfigFrom(configMap) require.NoError(t, err) jobs, endpoints, err := create("tcp", config) @@ -194,3 +200,110 @@ func TestUnreachableEndpointJob(t *testing.T) { event.Fields, ) } + +func TestCheckUp(t *testing.T) { + host, port, ip, closeEcho, err := startEchoServer(t) + require.NoError(t, err) + defer closeEcho() + + configMap := common.MapStr{ + "hosts": host, + "ports": port, + "timeout": "1s", + "check.receive": "echo123", + "check.send": "echo123", + } + + event := testTCPConfigCheck(t, configMap, host, port) + + mapval.Test( + t, + mapval.Strict(mapval.Compose( + tcpMonitorChecks(host, ip, port, "up"), + hbtest.RespondingTCPChecks(), + hbtest.SimpleURLChecks(t, "tcp", host, port), + hbtest.SummaryChecks(1, 0), + mapval.MustCompile(mapval.Map{ + "resolve": mapval.Map{ + "ip": ip, + "rtt.us": mapval.IsDuration, + }, + "tcp": mapval.Map{ + "rtt.validate.us": mapval.IsDuration, + }, + }), + )), + event.Fields, + ) +} + +func TestCheckDown(t *testing.T) { + host, port, ip, closeEcho, err := startEchoServer(t) + require.NoError(t, err) + defer closeEcho() + + configMap := common.MapStr{ + "hosts": host, + "ports": port, + "timeout": "1s", + "check.receive": "BOOM", // should fail + "check.send": "echo123", + } + + event := testTCPConfigCheck(t, configMap, host, port) + + mapval.Test( + t, + mapval.Strict(mapval.Compose( + tcpMonitorChecks(host, ip, port, "down"), + hbtest.RespondingTCPChecks(), + hbtest.SimpleURLChecks(t, "tcp", host, port), + hbtest.SummaryChecks(0, 1), + mapval.MustCompile(mapval.Map{ + "resolve": mapval.Map{ + "ip": ip, + "rtt.us": mapval.IsDuration, + }, + "tcp": mapval.Map{ + "rtt.validate.us": mapval.IsDuration, + }, + "error": mapval.Map{ + "type": "validate", + "message": "received string mismatch", + }, + }), + )), + event.Fields, + ) +} + +// startEchoServer starts a simple TCP echo server for testing. Only handles a single connection once. +// Note you MUST connect to this server exactly once to avoid leaking a goroutine. This is only useful +// for the specific tests used here. +func startEchoServer(t *testing.T) (host string, port uint16, ip string, close func() error, err error) { + // Simple echo server + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + return "", 0, "", nil, err + } + go func() { + conn, err := listener.Accept() + require.NoError(t, err) + buf := make([]byte, 1024) + rlen, err := conn.Read(buf) + require.NoError(t, err) + wlen, err := conn.Write(buf[:rlen]) + require.NoError(t, err) + // Normally we'd retry partial writes, but for tests this is OK + require.Equal(t, wlen, rlen) + }() + + ip, portStr, err := net.SplitHostPort(listener.Addr().String()) + portUint64, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + listener.Close() + return "", 0, "", nil, err + } + + return "localhost", uint16(portUint64), ip, listener.Close, nil +} diff --git a/heartbeat/reason/reason.go b/heartbeat/reason/reason.go index 08c7724f0162..d5fc5e456898 100644 --- a/heartbeat/reason/reason.go +++ b/heartbeat/reason/reason.go @@ -68,4 +68,7 @@ func Fail(r Reason) common.MapStr { func FailIO(err error) common.MapStr { return Fail(IOError{err}) } -func FailValidate(err error) common.MapStr { return Fail(ValidateError{err}) } +// MakeValidateError creates an instance of ValidateError from the given error. +func MakeValidateError(err error) ValidateError { + return ValidateError{err} +}