From 21ac1cbb74312dc3ec5022c021bc40b843de02f7 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 29 Jun 2020 13:51:18 +0200 Subject: [PATCH] [Filebeat][httpjson] Add split_events_by config setting (#19246) (cherry picked from commit ce3f5050efdb3e6e0d0883f9a6e57aedde71c868) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 67 +++++++++ x-pack/filebeat/input/httpjson/config.go | 1 + .../filebeat/input/httpjson/httpjson_test.go | 129 +++++++++++++++++- x-pack/filebeat/input/httpjson/input.go | 57 ++++++-- 5 files changed, 242 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b5688555cac..e29b591e5c1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -66,6 +66,7 @@ field. You can revert this change by configuring tags for the module and omittin - Adds check on `` config option value for the azure input `resource_manager_endpoint`. {pull}18890[18890] - Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953] - Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892] +- Adds `split_events_by` option to httpjson input. {pull}19246[19246] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index cdb24b51360..24e673d09e6 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -178,6 +178,73 @@ The config needs to specify `events` as the `json_objects_array` value. json_objects_array: events ---- +[float] +==== `split_events_by` + +If the response body contains a JSON object containing an array then this option +specifies the key containing that array. Each object in that array will generate +an event, but will maintain the common fields of the document as well. + +["source","json",subs="attributes"] +---- +{ + "time": "2020-06-02 23:22:32 UTC", + "user": "Bob", + "events": [ + { + "timestamp": "2020-05-02 11:10:03 UTC", + "event": { + "category": "authorization" + } + }, + { + "timestamp": "2020-05-05 13:03:11 UTC", + "event": { + "category": "authorization" + } + } + ] +} +---- + +The config needs to specify `events` as the `split_events_by` value. + +["source","yaml",subs="attributes"] +---- +- type: httpjson + split_events_by: events +---- + +And will output the following events: + +["source","json",subs="attributes"] +---- +[ + { + "time": "2020-06-02 23:22:32 UTC", + "user": "Bob", + "events": { + "timestamp": "2020-05-02 11:10:03 UTC", + "event": { + "category": "authorization" + } + } + }, + { + "time": "2020-06-02 23:22:32 UTC", + "user": "Bob", + "events": { + "timestamp": "2020-05-05 13:03:11 UTC", + "event": { + "category": "authorization" + } + } + } +] +---- + +It can be used in combination with `json_objects_array`, which will look for the field inside each element. + [float] ==== `no_http_body` diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 0d677abd561..2fcc2fc8941 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -26,6 +26,7 @@ type config struct { HTTPRequestBody common.MapStr `config:"http_request_body"` Interval time.Duration `config:"interval"` JSONObjects string `config:"json_objects_array"` + SplitEventsBy string `config:"split_events_by"` NoHTTPBody bool `config:"no_http_body"` Pagination *Pagination `config:"pagination"` RateLimit *RateLimit `config:"rate_limit"` diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 416836782ba..4e70fe72472 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -21,6 +21,8 @@ import ( "golang.org/x/sync/errgroup" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" @@ -38,7 +40,6 @@ const ( var ( once sync.Once - url string ) func testSetup(t *testing.T) { @@ -91,6 +92,10 @@ func createServer(newServer func(handler http.Handler) *httptest.Server) *httpte "embedded": map[string]string{ "hello": "world", }, + "list": []map[string]interface{}{ + {"foo": "bar"}, + {"hello": "world"}, + }, } b, _ := json.Marshal(message) w.Header().Set("Content-Type", "application/json") @@ -157,8 +162,24 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") message := map[string]interface{}{ - "hello": []map[string]string{ - {"foo": "bar"}, + "hello": []map[string]interface{}{ + { + "foo": "bar", + "list": []map[string]interface{}{ + {"foo": "bar"}, + {"hello": "world"}, + }, + }, + { + "foo": "bar", + "list": []map[string]interface{}{ + {"foo": "bar"}, + }, + }, + { + "bar": "foo", + "list": []map[string]interface{}{}, + }, {"bar": "foo"}, }, } @@ -604,3 +625,105 @@ func TestOAuth2(t *testing.T) { } }) } + +func TestSplitResponseWithKey(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "split_events_by": "list", + "interval": 0, + } + ts := createTestServer(HTTPTestServer) + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(2) + if !ok { + t.Fatalf("Expected 2 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSplitResponseWithoutKey(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "split_events_by": "not_found", + "interval": 0, + } + ts := createTestServer(HTTPTestServer) + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(1) + if !ok { + t.Fatalf("Expected 1 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + +func TestArrayWithSplitResponse(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "json_objects_array": "hello", + "split_events_by": "list", + "interval": 0, + } + + expectedFields := []string{ + `{ + "foo": "bar", + "list": { + "foo": "bar" + } + }`, + `{ + "foo": "bar", + "list": { + "hello": "world" + } + }`, + `{ + "foo": "bar", + "list": { + "foo": "bar" + } + }`, + `{ + "bar": "foo", + "list": [] + }`, + `{"bar": "foo"}`, + } + + ts := createTestServer(ArrayResponseServer) + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(5) + if !ok { + t.Fatalf("Expected 5 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + + for i, e := range events { + message, _ := e.GetValue("message") + assert.JSONEq(t, expectedFields[i], message.(string)) + } + }) +} diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index b8d7b797e29..39726eac177 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -181,24 +181,61 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo) // processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) { - var m map[string]interface{} + var last map[string]interface{} for _, t := range events { switch v := t.(type) { case map[string]interface{}: - m = v - d, err := json.Marshal(v) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal %+v", v) - } - ok := in.outlet.OnEvent(makeEvent(string(d))) - if !ok { - return nil, errors.New("function OnEvent returned false") + for _, e := range in.splitEvent(v) { + last = e + d, err := json.Marshal(e) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal %+v", e) + } + ok := in.outlet.OnEvent(makeEvent(string(d))) + if !ok { + return nil, errors.New("function OnEvent returned false") + } } default: return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v) } } - return m, nil + return last, nil +} + +func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} { + m := common.MapStr(event) + + hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy) + if in.config.SplitEventsBy == "" || !hasSplitKey { + return []map[string]interface{}{event} + } + + splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy) + splitOn, ok := splitOnIfc.([]interface{}) + // if not an array or is empty, we do nothing + if !ok || len(splitOn) == 0 { + return []map[string]interface{}{event} + } + + var events []map[string]interface{} + for _, split := range splitOn { + s, ok := split.(map[string]interface{}) + // if not an object, we do nothing + if !ok { + return []map[string]interface{}{event} + } + + mm := m.Clone() + _, err := mm.Put(in.config.SplitEventsBy, s) + if err != nil { + return []map[string]interface{}{event} + } + + events = append(events, mm) + } + + return events } // getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response