Skip to content

Commit

Permalink
[Heartbeat] Fix status for TCP checks
Browse files Browse the repository at this point in the history
The heartbeat checks for TCP send/receive have been broken for a long time, since at least 6.3 from my testing. An error was returned, but the status was still set to 'up'.

This patch fixes that and adds tests.
  • Loading branch information
andrewvc committed Mar 6, 2019
1 parent 71fb58b commit cc9e1e2
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Made monitors.d configuration part of the default config. {pull}9004[9004]
- Fixed rare issue where TLS connections to endpoints with x509 certificates missing either notBefore or notAfter would cause the check to fail with a stacktrace. {pull}9566[9566]
- Fix checks for TCP send/receive data {pull}11118[11118]

*Journalbeat*

Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/tcp/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func pingHost(
},
})
if err != nil {
event.PutValue("error", reason.FailValidate(err))
return reason.MakeValidateError(err)
}

return nil
}
117 changes: 115 additions & 2 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -38,11 +39,16 @@ import (
)

func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
config, err := common.NewConfigFrom(common.MapStr{
config := common.MapStr{
"hosts": host,
"ports": port,
"timeout": "1s",
})
}
return testTCPConfigCheck(t, config, host, port)
}

func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port uint16) *beat.Event {
config, err := common.NewConfigFrom(configMap)
require.NoError(t, err)

jobs, endpoints, err := create("tcp", config)
Expand Down Expand Up @@ -194,3 +200,110 @@ func TestUnreachableEndpointJob(t *testing.T) {
event.Fields,
)
}

func TestCheckUp(t *testing.T) {
host, port, ip, closeEcho, err := startEchoServer(t)
require.NoError(t, err)
defer closeEcho()

configMap := common.MapStr{
"hosts": host,
"ports": port,
"timeout": "1s",
"check.receive": "echo123",
"check.send": "echo123",
}

event := testTCPConfigCheck(t, configMap, host, port)

mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(host, ip, port, "up"),
hbtest.RespondingTCPChecks(),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.SummaryChecks(1, 0),
mapval.MustCompile(mapval.Map{
"resolve": mapval.Map{
"ip": ip,
"rtt.us": mapval.IsDuration,
},
"tcp": mapval.Map{
"rtt.validate.us": mapval.IsDuration,
},
}),
)),
event.Fields,
)
}

func TestCheckDown(t *testing.T) {
host, port, ip, closeEcho, err := startEchoServer(t)
require.NoError(t, err)
defer closeEcho()

configMap := common.MapStr{
"hosts": host,
"ports": port,
"timeout": "1s",
"check.receive": "BOOM", // should fail
"check.send": "echo123",
}

event := testTCPConfigCheck(t, configMap, host, port)

mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(host, ip, port, "down"),
hbtest.RespondingTCPChecks(),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.SummaryChecks(0, 1),
mapval.MustCompile(mapval.Map{
"resolve": mapval.Map{
"ip": ip,
"rtt.us": mapval.IsDuration,
},
"tcp": mapval.Map{
"rtt.validate.us": mapval.IsDuration,
},
"error": mapval.Map{
"type": "validate",
"message": "received string mismatch",
},
}),
)),
event.Fields,
)
}

// startEchoServer starts a simple TCP echo server for testing. Only handles a single connection once.
// Note you MUST connect to this server exactly once to avoid leaking a goroutine. This is only useful
// for the specific tests used here.
func startEchoServer(t *testing.T) (host string, port uint16, ip string, close func() error, err error) {
// Simple echo server
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
return "", 0, "", nil, err
}
go func() {
conn, err := listener.Accept()
require.NoError(t, err)
buf := make([]byte, 1024)
rlen, err := conn.Read(buf)
require.NoError(t, err)
wlen, err := conn.Write(buf[:rlen])
require.NoError(t, err)
// Normally we'd retry partial writes, but for tests this is OK
require.Equal(t, wlen, rlen)
}()

ip, portStr, err := net.SplitHostPort(listener.Addr().String())
portUint64, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
listener.Close()
return "", 0, "", nil, err
}

return "localhost", uint16(portUint64), ip, listener.Close, nil
}
5 changes: 4 additions & 1 deletion heartbeat/reason/reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@ func Fail(r Reason) common.MapStr {

func FailIO(err error) common.MapStr { return Fail(IOError{err}) }

func FailValidate(err error) common.MapStr { return Fail(ValidateError{err}) }
// MakeValidateError creates an instance of ValidateError from the given error.
func MakeValidateError(err error) ValidateError {
return ValidateError{err}
}

0 comments on commit cc9e1e2

Please sign in to comment.