Skip to content

Commit

Permalink
http_endpoint: Allow receiving multiple documents on a single request (
Browse files Browse the repository at this point in the history
…#25764) (#26175)

Updates Filebeat's http_endpoint to produce multiple documents from a
single POST request. This extends the application/json format handling
to accept arrays of objects, and adds support for the NDJSON format
(application/x-ndjson).

(cherry picked from commit 8bbb26f)

Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
  • Loading branch information
mergify[bot] and adriansr authored Jun 9, 2021
1 parent 822b2a2 commit 910f67f
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- In Cisco Umbrella fileset add users from cisco.umbrella.identities to related.user. {pull}25776[25776]
- Add fingerprint processor to generate fixed ids for `google_workspace` events. {pull}25841[25841]
- Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368]
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]
- Make `filestream` input GA. {pull}26127[26127]
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]

*Heartbeat*

Expand Down
62 changes: 32 additions & 30 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
package http_endpoint

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/pkg/errors"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -36,19 +35,20 @@ var (

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
obj, status, err := httpReadJsonObject(r.Body)
objs, status, err := httpReadJSON(r.Body)
if err != nil {
w.Header().Add("Content-Type", "application/json")
sendErrorResponse(w, status, err)
return
}

h.publishEvent(obj)
w.Header().Add("Content-Type", "application/json")
for _, obj := range objs {
h.publishEvent(obj)
}
h.sendResponse(w, h.responseCode, h.responseBody)
}

func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
io.WriteString(w, message)
}
Expand Down Expand Up @@ -82,32 +82,34 @@ func sendErrorResponse(w http.ResponseWriter, status int, err error) {
e.Encode(common.MapStr{"message": err.Error()})
}

func httpReadJsonObject(body io.Reader) (obj common.MapStr, status int, err error) {
func httpReadJSON(body io.Reader) (objs []common.MapStr, status int, err error) {
if body == http.NoBody {
return nil, http.StatusNotAcceptable, errBodyEmpty
}

contents, err := ioutil.ReadAll(body)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("failed reading body: %w", err)
}

if !isObject(contents) {
return nil, http.StatusBadRequest, errUnsupportedType
}

obj = common.MapStr{}
if err := json.Unmarshal(contents, &obj); err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("malformed JSON body: %w", err)
}

return obj, 0, nil
}

func isObject(b []byte) bool {
obj := bytes.TrimLeft(b, " \t\r\n")
if len(obj) > 0 && obj[0] == '{' {
return true
decoder := json.NewDecoder(body)
for idx := 0; ; idx++ {
var obj interface{}
if err := decoder.Decode(&obj); err != nil {
if err == io.EOF {
break
}
return nil, http.StatusBadRequest, errors.Wrapf(err, "malformed JSON object at stream position %d", idx)
}
switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
case []interface{}:
for listIdx, listObj := range v {
asMap, ok := listObj.(map[string]interface{})
if !ok {
return nil, http.StatusBadRequest, fmt.Errorf("%v at stream %d index %d", errUnsupportedType, idx, listIdx)
}
objs = append(objs, asMap)
}
default:
return nil, http.StatusBadRequest, errUnsupportedType
}
}
return false
return objs, 0, nil
}
93 changes: 93 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"net/http"
"reflect"
"strings"
"testing"

"github.com/elastic/beats/v7/libbeat/common"
)

func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
body string
wantObjs []common.MapStr
wantStatus int
wantErr bool
}{
{
name: "single object",
body: `{"a": 42, "b": "c"}`,
wantObjs: []common.MapStr{{"a": float64(42), "b": "c"}},
},
{
name: "array accepted",
body: `[{"a":"b"},{"c":"d"}]`,
wantObjs: []common.MapStr{{"a": "b"}, {"c": "d"}},
},
{
name: "not an object not accepted",
body: `42`,
wantStatus: http.StatusBadRequest,
wantErr: true,
},
{
name: "not an object mixed",
body: "[{\"a\":1},\n42,\n{\"a\":2}]",
wantStatus: http.StatusBadRequest,
wantErr: true,
},
{
name: "sequence of objects accepted (CRLF)",
body: "{\"a\":1}\r\n{\"a\":2}",
wantObjs: []common.MapStr{{"a": float64(1)}, {"a": float64(2)}},
},
{
name: "sequence of objects accepted (LF)",
body: "{\"a\":1}\n{\"a\":2}",
wantObjs: []common.MapStr{{"a": float64(1)}, {"a": float64(2)}},
},
{
name: "sequence of objects accepted (SP)",
body: "{\"a\":1} {\"a\":2}",
wantObjs: []common.MapStr{{"a": float64(1)}, {"a": float64(2)}},
},
{
name: "sequence of objects accepted (no separator)",
body: "{\"a\":1}{\"a\":2}",
wantObjs: []common.MapStr{{"a": float64(1)}, {"a": float64(2)}},
},
{
name: "not an object in sequence",
body: "{\"a\":1}\n42\n{\"a\":2}",
wantStatus: http.StatusBadRequest,
wantErr: true,
},
{
name: "array of objects in stream",
body: `{"a":1} [{"a":2},{"a":3}] {"a":4}`,
wantObjs: []common.MapStr{{"a": float64(1)}, {"a": float64(2)}, {"a": float64(3)}, {"a": float64(4)}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotObjs, gotStatus, err := httpReadJSON(strings.NewReader(tt.body))
if (err != nil) != tt.wantErr {
t.Errorf("httpReadJSON() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotObjs, tt.wantObjs) {
t.Errorf("httpReadJSON() gotObjs = %v, want %v", gotObjs, tt.wantObjs)
}
if gotStatus != tt.wantStatus {
t.Errorf("httpReadJSON() gotStatus = %v, want %v", gotStatus, tt.wantStatus)
}
})
}
}
62 changes: 61 additions & 1 deletion x-pack/filebeat/tests/system/test_http_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,66 @@ def test_http_endpoint_request(self):
assert output[0]["input.type"] == "http_endpoint"
assert output[0]["json.{}".format(self.prefix)] == message

def test_http_endpoint_request_multiple_documents(self):
"""
Test http_endpoint input with multiple documents on a single HTTP request.
"""
self.get_config()
filebeat = self.start_beat()
self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port)))

N = 10
message = "somerandommessage_{}"
payload = [{self.prefix: message.format(i)} for i in range(N)]
headers = {"Content-Type": "application/json", "Accept": "application/json"}
r = requests.post(self.url, headers=headers, data=json.dumps(payload))

self.wait_until(lambda: self.output_count(lambda x: x == N))
filebeat.check_kill_and_wait()

output = self.read_output()

print("response:", r.status_code, r.text)

assert r.text == '{"message": "success"}'

assert len(output) == N
for i in range(N):
assert output[i]["input.type"] == "http_endpoint"
assert output[i]["json.{}".format(self.prefix)] == message.format(i)

def test_http_endpoint_request_ndjson(self):
"""
Test http_endpoint input with multiple documents on a single HTTP request (NDJSON).
"""

options = """
content_type: application/x-ndjson
"""
self.get_config(options)
filebeat = self.start_beat()
self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port)))

N = 10
message = "somerandommessage_{}"
payload = "\n".join([json.dumps({self.prefix: message.format(i)}) for i in range(N)])
headers = {"Content-Type": "application/x-ndjson", "Accept": "application/json"}
r = requests.post(self.url, headers=headers, data=payload)

self.wait_until(lambda: self.output_count(lambda x: x == N))
filebeat.check_kill_and_wait()

output = self.read_output()

print("response:", r.status_code, r.text)

assert r.text == '{"message": "success"}'

assert len(output) == N
for i in range(N):
assert output[i]["input.type"] == "http_endpoint"
assert output[i]["json.{}".format(self.prefix)] == message.format(i)

def test_http_endpoint_wrong_content_header(self):
"""
Test http_endpoint input with wrong content header.
Expand Down Expand Up @@ -283,7 +343,7 @@ def test_http_endpoint_malformed_json(self):
print("response:", r.status_code, r.text)

assert r.status_code == 400
self.assertRegex(r.json()['message'], 'malformed JSON body')
self.assertRegex(r.json()['message'], 'malformed JSON')

def test_http_endpoint_get_request(self):
"""
Expand Down

0 comments on commit 910f67f

Please sign in to comment.