Skip to content

Commit

Permalink
Heartbeat Job Validation + addition of libbeat/mapval
Browse files Browse the repository at this point in the history
This commit seeks to establish a pattern for testing heartbeat jobs. It currently tests the HTTP and TCP jobs. It also required some minor refactors of those tasks for HTTP/TCP.

To do this, it made sense to validate event maps with a sort of schema library. I couldn't find one that did exactly what I wanted here, so I wrote one called mapval. That turned out to be a large undertaking, and is now the majority of this commit. Further tests need to be written, but this commit is large enough as is.

One of the nicest things about the heartbeat architecture is the dialer chain behavior. It should be the case that any validated protocol using TCP (e.g. HTTP, TCP, Redis, etc.) has the exact same tcp metadata.

To help make testing these properties easy mapval lets users compose portions of a schema into a bigger one. In other words, you can say "An HTTP response should be a TCP response, with the standard monitor data added in, and also the special HTTP fields". Even having only written a handful of tests this has uncovered some inconsistencies there, where TCP jobs have a hostname, but HTTP ones do not.
  • Loading branch information
andrewvc committed Jul 19, 2018
1 parent 5eaafff commit ff4f736
Show file tree
Hide file tree
Showing 21 changed files with 3,413 additions and 33 deletions.
90 changes: 90 additions & 0 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbtest

import (
"io"
"net/http"
"net/url"
"strconv"

"net/http/httptest"

"github.com/elastic/beats/libbeat/common/mapval"
)

// HelloWorldBody is the body of the HelloWorldHandler.
const HelloWorldBody = "hello, world!"

// HelloWorldHandler is a handler for an http server that returns
// HelloWorldBody and a 200 OK status.
var HelloWorldHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
io.WriteString(w, HelloWorldBody)
})

// BadGatewayBody is the body of the BadGatewayHandler.
const BadGatewayBody = "Bad Gateway"

// BadGatewayHandler is a handler for an http server that returns
// BadGatewayBody and a 200 OK status.
var BadGatewayHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
io.WriteString(w, BadGatewayBody)
})

// ServerPort takes an httptest.Server and returns its port as a uint16.
func ServerPort(server *httptest.Server) (uint16, error) {
u, err := url.Parse(server.URL)
if err != nil {
return 0, err
}
p, err := strconv.Atoi(u.Port())
if err != nil {
return 0, err
}
return uint16(p), nil
}

// MonitorChecks creates a skima.Validator that represents the "monitor" field present
// in all heartbeat events.
func MonitorChecks(id string, host string, ip string, scheme string, status string) mapval.Validator {
return mapval.Schema(mapval.Map{
"monitor": mapval.Map{
// TODO: This is only optional because, for some reason, TCP returns
// this value, but HTTP does not. We should fix this
"host": mapval.Optional(mapval.IsEqual(host)),
"duration.us": mapval.IsDuration,
"id": id,
"ip": ip,
"scheme": scheme,
"status": status,
},
})
}

// TCPChecks creates a skima.Validator that represents the "tcp" field present
// in all heartbeat events that use a Tcp connection as part of their DialChain
func TCPChecks(port uint16) mapval.Validator {
return mapval.Schema(mapval.Map{
"tcp": mapval.Map{
"port": port,
"rtt.connect.us": mapval.IsDuration,
},
})
}
105 changes: 105 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package http

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/mapval"
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func checkServer(t *testing.T, handlerFunc http.HandlerFunc) (*httptest.Server, beat.Event) {
server := httptest.NewServer(handlerFunc)
defer server.Close()

config := common.NewConfig()
config.SetString("urls", 0, server.URL)

jobs, err := create(monitors.Info{}, config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

return server, event
}

func httpChecks(urlStr string, statusCode int) mapval.Validator {
return mapval.Schema(mapval.Map{
"http": mapval.Map{
"url": urlStr,
"response.status_code": statusCode,
"rtt.content.us": mapval.IsDuration,
"rtt.response_header.us": mapval.IsDuration,
"rtt.total.us": mapval.IsDuration,
"rtt.validate.us": mapval.IsDuration,
"rtt.write_request.us": mapval.IsDuration,
},
})
}

func badGatewayChecks() mapval.Validator {
return mapval.Schema(mapval.Map{
"error": mapval.Map{
"message": "502 Bad Gateway",
"type": "validate",
},
})
}

func TestOKJob(t *testing.T) {
server, event := checkServer(t, hbtest.HelloWorldHandler)
port, err := hbtest.ServerPort(server)
require.NoError(t, err)

mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.TCPChecks(port),
httpChecks(server.URL, http.StatusOK),
))(event.Fields),
)
}

func TestBadGatewayJob(t *testing.T) {
server, event := checkServer(t, hbtest.BadGatewayHandler)
port, err := hbtest.ServerPort(server)
require.NoError(t, err)

mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "down"),
hbtest.TCPChecks(port),
httpChecks(server.URL, http.StatusBadGateway),
badGatewayChecks(),
))(event.Fields),
)
}
48 changes: 36 additions & 12 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,41 +218,65 @@ func execPing(
body []byte,
timeout time.Duration,
validator func(*http.Response) error,
) (time.Time, time.Time, common.MapStr, reason.Reason) {
) (start, end time.Time, event common.MapStr, errReason reason.Reason) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

req = req.WithContext(ctx)
req = attachRequestBody(&ctx, req, body)
start, end, resp, errReason := execRequest(client, req, validator)

if errReason != nil {
if resp != nil {
return start, end, makeEvent(end.Sub(start), resp), errReason
}
return start, end, nil, errReason
}

event = makeEvent(end.Sub(start), resp)

return start, end, event, nil
}

func attachRequestBody(ctx *context.Context, req *http.Request, body []byte) *http.Request {
req = req.WithContext(*ctx)
if len(body) > 0 {
req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
req.ContentLength = int64(len(body))
}

start := time.Now()
return req
}

func execRequest(client *http.Client, req *http.Request, validator func(*http.Response) error) (start time.Time, end time.Time, resp *http.Response, errReason reason.Reason) {
start = time.Now()
resp, err := client.Do(req)
end := time.Now()
if resp != nil { // If above errors, the response will be nil
defer resp.Body.Close()
}
end = time.Now()

if err != nil {
return start, end, nil, reason.IOFailed(err)
}
defer resp.Body.Close()

err = validator(resp)
end = time.Now()
if err != nil {
return start, end, resp, reason.ValidateFailed(err)
}

return start, end, resp, nil
}

rtt := end.Sub(start)
event := common.MapStr{"http": common.MapStr{
func makeEvent(rtt time.Duration, resp *http.Response) common.MapStr {
return common.MapStr{"http": common.MapStr{
"response": common.MapStr{
"status_code": resp.StatusCode,
},
"rtt": common.MapStr{
"total": look.RTT(rtt),
},
}}

if err != nil {
return start, end, event, reason.ValidateFailed(err)
}
return start, end, event, nil
}

func splitHostnamePort(requ *http.Request) (string, uint16, error) {
Expand Down
Loading

0 comments on commit ff4f736

Please sign in to comment.