From bbefa8771030ebd8a29311031f34774697eb487e Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Oct 2018 08:57:07 -0500 Subject: [PATCH 1/3] [Heartbeat] Read entire body before closing connection This fixes an issue where the connection would be closed after only a partial body read. The RoundTripper would close the conn before returning the response which included a partially buffered body. This would actually work for short responses, since the backing bufio would do a partial read, but would fail on all but the shortest responses. Normally connection lifecycle is handled outside the realm of the `RoundTripper`, but for our purposes we don't want to re-use connections. Since it is a requirement that all response bodies be closed, we can piggy-back on top of that to ensure the connection is closed. Fixes #8588 --- heartbeat/monitors/active/http/simple_transp.go | 1 + 1 file changed, 1 insertion(+) diff --git a/heartbeat/monitors/active/http/simple_transp.go b/heartbeat/monitors/active/http/simple_transp.go index 889baa8e5a1..21fef7f7f6a 100644 --- a/heartbeat/monitors/active/http/simple_transp.go +++ b/heartbeat/monitors/active/http/simple_transp.go @@ -184,6 +184,7 @@ func (t *SimpleTransport) readResponse( ) (*http.Response, error) { reader := bufio.NewReader(conn) resp, err := http.ReadResponse(reader, req) + resp.Body = comboConnReadCloser{conn, resp.Body} if err != nil { return nil, err } From 507f6788d17ed2ac805958cafb43d9e2b867ceff Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 19 Oct 2018 15:53:28 -0500 Subject: [PATCH 2/3] Add JSON body check support to Heartbeat This commit adds a new `json` check for HTTP responses letting users define an arbitrary condition to match against parsed JSON to determine whether an endpoint is up or down. The nice thing about structured checks like this is that it makes it easy for users to precisely piggy-back on top of existing JSON endpoints, or write their own where a given key/value could indicate the health of an external system. In a sense, it allows users to write a healthcheck endpoint. An example can be seen below: ```yaml heartbeat.monitors: - type: http # List or urls to query urls: ["http://localhost:9200"] schedule: '@every 10s' check.response.json: - description: check version condition: equals.version.number: "6.4.0" ``` --- CHANGELOG.asciidoc | 1 + heartbeat/_meta/beat.reference.yml | 8 +++ heartbeat/docs/heartbeat-options.asciidoc | 7 +- heartbeat/heartbeat.reference.yml | 8 +++ heartbeat/monitors/active/http/check.go | 65 ++++++++++++++++- heartbeat/monitors/active/http/check_test.go | 71 +++++++++++++++++++ heartbeat/monitors/active/http/config.go | 15 +++- heartbeat/monitors/active/http/http.go | 5 +- .../tests/system/config/heartbeat.yml.j2 | 8 +++ heartbeat/tests/system/test_monitor.py | 37 ++++++++++ 10 files changed, 218 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index be54f72d9de..ae25e2976fe 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -156,6 +156,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] *Journalbeat* - Add journalbeat. {pull}8703[8703] +- Add the ability to check against JSON HTTP bodies with conditions. {pull}8667[8667] *Metricbeat* diff --git a/heartbeat/_meta/beat.reference.yml b/heartbeat/_meta/beat.reference.yml index 80a1027601f..7315549f742 100644 --- a/heartbeat/_meta/beat.reference.yml +++ b/heartbeat/_meta/beat.reference.yml @@ -208,6 +208,14 @@ heartbeat.monitors: # Required response contents. #body: + # Parses the body as JSON, then checks against the given condition expression + #json: + #- description: Explanation of what the check does + # condition: + # equals: + # myField: expectedValue + + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE # Configure file json file to be watched for changes to the monitor: #watch.poll_file: diff --git a/heartbeat/docs/heartbeat-options.asciidoc b/heartbeat/docs/heartbeat-options.asciidoc index 6587cf1308a..403146c782a 100644 --- a/heartbeat/docs/heartbeat-options.asciidoc +++ b/heartbeat/docs/heartbeat-options.asciidoc @@ -444,6 +444,7 @@ Under `check.response`, specify these options: it's set to 0, any status code other than 404 is accepted. *`headers`*:: The required response headers. *`body`*:: A list of regular expressions to match the the body output. Only a single expression needs to match. +*`json`*:: A list of <> expressions executed against the body when parsed as JSON. The following configuration shows how to check the response when the body contains JSON: @@ -459,7 +460,11 @@ contains JSON: 'X-API-Key': '12345-mykey-67890' check.response: status: 200 - body: '{"status": "ok"}' + json: + - description: check status + condition: + equals: + status: ok ------------------------------------------------------------------------------- The following configuration shows how to check the response for multiple regex diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index e0b81fbb592..b1727211858 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -208,6 +208,14 @@ heartbeat.monitors: # Required response contents. #body: + # Parses the body as JSON, then checks against the given condition expression + #json: + #- description: Explanation of what the check does + # condition: + # equals: + # myField: expectedValue + + # NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE # Configure file json file to be watched for changes to the monitor: #watch.poll_file: diff --git a/heartbeat/monitors/active/http/check.go b/heartbeat/monitors/active/http/check.go index 5eb822c29be..a673002dbe2 100644 --- a/heartbeat/monitors/active/http/check.go +++ b/heartbeat/monitors/active/http/check.go @@ -18,12 +18,18 @@ package http import ( + "encoding/json" "errors" "fmt" "io/ioutil" "net/http" + "strings" + pkgerrors "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/conditions" ) type RespCheck func(*http.Response) error @@ -32,7 +38,7 @@ var ( errBodyMismatch = errors.New("body mismatch") ) -func makeValidateResponse(config *responseParameters) RespCheck { +func makeValidateResponse(config *responseParameters) (RespCheck, error) { var checks []RespCheck if config.Status > 0 { @@ -49,7 +55,15 @@ func makeValidateResponse(config *responseParameters) RespCheck { checks = append(checks, checkBody(config.RecvBody)) } - return checkAll(checks...) + if len(config.RecvJSON) > 0 { + jsonChecks, err := checkJSON(config.RecvJSON) + if err != nil { + return nil, err + } + checks = append(checks, jsonChecks) + } + + return checkAll(checks...), nil } func checkOK(_ *http.Response) error { return nil } @@ -115,3 +129,50 @@ func checkBody(body []match.Matcher) RespCheck { return errBodyMismatch } } + +func checkJSON(checks []*jsonResponseCheck) (RespCheck, error) { + type compiledCheck struct { + description string + condition conditions.Condition + } + + var compiledChecks []compiledCheck + + for _, check := range checks { + cond, err := conditions.NewCondition(check.Condition) + if err != nil { + return nil, err + } + compiledChecks = append(compiledChecks, compiledCheck{check.Description, cond}) + } + + return func(r *http.Response) error { + decoded := &common.MapStr{} + err := json.NewDecoder(r.Body).Decode(decoded) + + if err != nil { + body, _ := ioutil.ReadAll(r.Body) + return pkgerrors.Wrapf(err, "could not parse JSON for body check with condition. Source: %s", body) + } + + var errorDescs []string + for _, compiledCheck := range compiledChecks { + //compiledCheck.condition, err = conditions.NewEqualsCondition(map[string]interface{}{"foo.baz": "bar"}) + ok := compiledCheck.condition.Check(decoded) + if !ok { + errorDescs = append(errorDescs, compiledCheck.description) + } + } + + if len(errorDescs) > 0 { + return fmt.Errorf( + "JSON body did not match %d conditions '%s' for monitor. Received JSON %+v", + len(errorDescs), + strings.Join(errorDescs, ","), + decoded, + ) + } + + return nil + }, nil +} diff --git a/heartbeat/monitors/active/http/check_test.go b/heartbeat/monitors/active/http/check_test.go index 3e26ee5cec8..f6f8f94a89c 100644 --- a/heartbeat/monitors/active/http/check_test.go +++ b/heartbeat/monitors/active/http/check_test.go @@ -24,7 +24,12 @@ import ( "net/http/httptest" "testing" + "github.com/elastic/beats/libbeat/common" + + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/conditions" ) func TestCheckBody(t *testing.T) { @@ -125,3 +130,69 @@ func TestCheckBody(t *testing.T) { }) } } + +func TestCheckJson(t *testing.T) { + fooBazEqualsBar := common.MustNewConfigFrom(map[string]interface{}{"equals": map[string]interface{}{"foo": map[string]interface{}{"baz": "bar"}}}) + fooBazEqualsBarConf := &conditions.Config{} + err := fooBazEqualsBar.Unpack(fooBazEqualsBarConf) + require.NoError(t, err) + + fooBazEqualsBarDesc := "foo.baz equals bar" + + var tests = []struct { + description string + body string + condDesc string + condConf *conditions.Config + result bool + }{ + { + "positive match", + "{\"foo\": {\"baz\": \"bar\"}}", + fooBazEqualsBarDesc, + fooBazEqualsBarConf, + true, + }, + { + "Negative match", + "{\"foo\": 123}", + fooBazEqualsBarDesc, + fooBazEqualsBarConf, + false, + }, + { + "unparseable", + `notjson`, + fooBazEqualsBarDesc, + fooBazEqualsBarConf, + false, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, test.body) + })) + defer ts.Close() + + res, err := http.Get(ts.URL) + if err != nil { + log.Fatal(err) + } + + checker, err := checkJSON([]*jsonResponseCheck{{test.condDesc, test.condConf}}) + require.NoError(t, err) + checkRes := checker(res) + + if result := checkRes == nil; result != test.result { + if test.result { + t.Fatalf("Expected condition: '%s' to match body: %s. got: %s", test.condDesc, test.body, checkRes) + } else { + t.Fatalf("Did not expect condition: '%s' to match body: %s. got: %s", test.condDesc, test.body, checkRes) + } + } + }) + } + +} diff --git a/heartbeat/monitors/active/http/config.go b/heartbeat/monitors/active/http/config.go index e9c20974dd6..0d67f8a3668 100644 --- a/heartbeat/monitors/active/http/config.go +++ b/heartbeat/monitors/active/http/config.go @@ -22,6 +22,8 @@ import ( "strings" "time" + "github.com/elastic/beats/libbeat/conditions" + "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/common/transport/tlscommon" @@ -68,9 +70,15 @@ type requestParameters struct { type responseParameters struct { // expected HTTP response configuration - Status uint16 `config:"status" verify:"min=0, max=699"` - RecvHeaders map[string]string `config:"headers"` - RecvBody []match.Matcher `config:"body"` + Status uint16 `config:"status" verify:"min=0, max=699"` + RecvHeaders map[string]string `config:"headers"` + RecvBody []match.Matcher `config:"body"` + RecvJSON []*jsonResponseCheck `config:"json"` +} + +type jsonResponseCheck struct { + Description string `config:"description"` + Condition *conditions.Config `config:"condition"` } type compressionConfig struct { @@ -93,6 +101,7 @@ var defaultConfig = Config{ Status: 0, RecvHeaders: nil, RecvBody: []match.Matcher{}, + RecvJSON: nil, }, }, } diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index 8727893beb7..d4d3fc47363 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -70,7 +70,10 @@ func create( body = buf.Bytes() } - validator := makeValidateResponse(&config.Check.Response) + validator, err := makeValidateResponse(&config.Check.Response) + if err != nil { + return nil, 0, err + } jobs = make([]monitors.Job, len(config.URLs)) diff --git a/heartbeat/tests/system/config/heartbeat.yml.j2 b/heartbeat/tests/system/config/heartbeat.yml.j2 index f9c7e84ba8f..40990b39358 100644 --- a/heartbeat/tests/system/config/heartbeat.yml.j2 +++ b/heartbeat/tests/system/config/heartbeat.yml.j2 @@ -29,6 +29,14 @@ heartbeat.monitors: {% endfor %} {% endif -%} + + {%- if monitor.check_response_json is defined %} + check.response.json: + {%- for check in monitor.check_response_json %} + - {{check}} + {% endfor %} + {% endif -%} + {%- if monitor.fields is defined %} {% if monitor.fields_under_root %}fields_under_root: true{% endif %} fields: diff --git a/heartbeat/tests/system/test_monitor.py b/heartbeat/tests/system/test_monitor.py index e445ad6d4b9..0b703c65774 100644 --- a/heartbeat/tests/system/test_monitor.py +++ b/heartbeat/tests/system/test_monitor.py @@ -41,6 +41,43 @@ def test_http(self, status_code): raise SkipTest self.assert_fields_are_documented(output[0]) + @parameterized.expand([ + ("up", '{"foo": {"baz": "bar"}}'), + ("down", '{"foo": "unexpected"}'), + ("down", 'notjson'), + ]) + def test_http_json(self, expected_status, body): + """ + Test JSON response checks + """ + server = self.start_server(body, 200) + try: + self.render_config_template( + monitors=[{ + "type": "http", + "urls": ["http://localhost:{}".format(server.server_port)], + "check_response_json": [{ + "description": "foo equals bar", + "condition": { + "equals": {"foo": {"baz": "bar"}} + } + }] + }] + ) + + try: + proc = self.start_beat() + self.wait_until(lambda: self.log_contains("heartbeat is running")) + + self.wait_until( + lambda: self.output_has(lines=1)) + finally: + proc.check_kill_and_wait() + + self.assert_last_status(expected_status) + finally: + server.shutdown() + @parameterized.expand([ (lambda server: "localhost:{}".format(server.server_port), "up"), # This IP is reserved in IPv4 From 1d11a6b76e3cd2ee656d130e2a6b1017f380db51 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 29 Oct 2018 11:13:03 -0500 Subject: [PATCH 3/3] Remove accidental comment --- heartbeat/monitors/active/http/check.go | 1 - 1 file changed, 1 deletion(-) diff --git a/heartbeat/monitors/active/http/check.go b/heartbeat/monitors/active/http/check.go index a673002dbe2..9563747c2d8 100644 --- a/heartbeat/monitors/active/http/check.go +++ b/heartbeat/monitors/active/http/check.go @@ -157,7 +157,6 @@ func checkJSON(checks []*jsonResponseCheck) (RespCheck, error) { var errorDescs []string for _, compiledCheck := range compiledChecks { - //compiledCheck.condition, err = conditions.NewEqualsCondition(map[string]interface{}{"foo.baz": "bar"}) ok := compiledCheck.condition.Check(decoded) if !ok { errorDescs = append(errorDescs, compiledCheck.description)