From 483bc38a458c0552eafc374c39b8518ec8c7f19e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 28 Nov 2024 15:10:50 +1030 Subject: [PATCH] x-pack/filebeat/input/http_endpoint: fix handling of http_endpoint request exceeding memory limits (#41765) The input does not have a way to communicate back-pressure to clients, potentially leading to unconstrained growth in the publisher event queue and an OoM event. This change adds a mechanism to keep track of the total sum of in-flight message bytes from the client in order to allow the server to return a 503 HTTP status when the total is too large. Note that this does not monitor the total memory in the queue as that would require a complete understanding of the allocations in the preparation of event values to be sent to the publisher, but rather uses the message length as a reasonable proxy. (cherry picked from commit 2ad3922d9b28dcaeedaf4f97aed03bbdf9ea805e) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-http-endpoint.asciidoc | 11 ++ x-pack/filebeat/input/http_endpoint/config.go | 3 + .../filebeat/input/http_endpoint/handler.go | 43 ++++++ .../input/http_endpoint/handler_test.go | 3 + x-pack/filebeat/input/http_endpoint/input.go | 2 + .../input/http_endpoint/input_test.go | 137 +++++++++++++++--- 7 files changed, 176 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b135f7aee39..b6c7d60780c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -177,6 +177,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Access Points in the `aws-s3` input. {pull}41495[41495] - Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664] - Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755] +- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 0ca54ab4567..3b4d3302579 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -40,6 +40,7 @@ These are the possible response codes from the server. | 406 | Not Acceptable | Returned if the POST request does not contain a body. | 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip. | 500 | Internal Server Error | Returned if an I/O error occurs reading the request. +| 503 | Service Unavailable | Returned if the length of the request body would take the total number of in-flight bytes above the configured `max_in_flight_bytes` value. | 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout. |========================================================================================================================================================= @@ -285,6 +286,16 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON. In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null. +[float] +==== `max_in_flight_bytes` + +The total sum of request body lengths that are allowed at any given time. If non-zero, the input will compare this value to the sum of in-flight request body lengths from requests that include a `wait_for_completion_timeout` request query and will return a 503 HTTP status code, along with a Retry-After header configured with the `retry_after` option. The default value for this option is zero, no limit. + +[float] +==== `retry_after` + +If a request has exceeded the `max_in_flight_bytes` limit, the response to the client will include a Retry-After header specifying how many seconds the client should wait to retry again. The default value for this option is 10 seconds. + [float] ==== `program` diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 5e1c93d2bc3..977e7b5d7d2 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -37,6 +37,8 @@ type config struct { URL string `config:"url" validate:"required"` Prefix string `config:"prefix"` ContentType string `config:"content_type"` + MaxInFlight int64 `config:"max_in_flight_bytes"` + RetryAfter int `config:"retry_after"` Program string `config:"program"` SecretHeader string `config:"secret.header"` SecretValue string `config:"secret.value"` @@ -66,6 +68,7 @@ func defaultConfig() config { BasicAuth: false, ResponseCode: 200, ResponseBody: `{"message": "success"}`, + RetryAfter: 10, ListenAddress: "127.0.0.1", ListenPort: "8000", URL: "/", diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index 67a1e07af86..90dd0d34720 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -56,6 +56,20 @@ type handler struct { txBaseID string // Random value to make transaction IDs unique. txIDCounter atomic.Uint64 // Transaction ID counter that is incremented for each request. + // inFlight is the sum of message body length + // that have been received but not yet ACKed + // or timed out or otherwise handled. + // + // Requests that do not request a timeout do + // not contribute to this value. + inFlight atomic.Int64 + // maxInFlight is the maximum value of inFligh + // that will be allowed for any messages received + // by the handler. If non-zero, inFlight may + // not exceed this value. + maxInFlight int64 + retryAfter int + reqLogger *zap.Logger host, scheme string @@ -86,9 +100,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { acked chan struct{} timeout *time.Timer ) + if h.maxInFlight != 0 { + // Consider non-ACKing messages as well. These do not add + // to the sum of in-flight bytes, but we can still assess + // whether a message would take us over the limit. + inFlight := h.inFlight.Load() + r.ContentLength + if inFlight > h.maxInFlight { + w.Header().Set(headerContentEncoding, "application/json") + w.Header().Set("Retry-After", strconv.Itoa(h.retryAfter)) + w.WriteHeader(http.StatusServiceUnavailable) + _, err := fmt.Fprintf(w, + `{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`, + h.maxInFlight, inFlight, + ) + if err != nil { + h.log.Errorw("failed to write 503", "error", err) + } + return + } + } if wait != 0 { acked = make(chan struct{}) timeout = time.NewTimer(wait) + h.inFlight.Add(r.ContentLength) + defer func() { + // Any return will be a message handling completion and the + // the removal of the allocation from the queue assuming that + // the client has requested a timeout. Either we have an early + // error condition or timeout and the message is dropped, we + // have ACKed all the events in the request, or the input has + // been cancelled. + h.inFlight.Add(-r.ContentLength) + }() } start := time.Now() acker := newBatchACKTracker(func() { diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index 131596f1fc3..8989214ba20 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -221,6 +221,9 @@ type publisher struct { func (p *publisher) Publish(e beat.Event) { p.mu.Lock() p.events = append(p.events, e) + if ack, ok := e.Private.(*batchACKTracker); ok { + ack.ACK() + } p.mu.Unlock() } diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index b4ad07e7626..6d6b0cbc3f4 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -347,6 +347,8 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event hmacType: c.HMACType, hmacPrefix: c.HMACPrefix, }, + maxInFlight: c.MaxInFlight, + retryAfter: c.RetryAfter, program: prg, messageField: c.Prefix, responseCode: c.ResponseCode, diff --git a/x-pack/filebeat/input/http_endpoint/input_test.go b/x-pack/filebeat/input/http_endpoint/input_test.go index 3f530454e1d..9a3a2368a74 100644 --- a/x-pack/filebeat/input/http_endpoint/input_test.go +++ b/x-pack/filebeat/input/http_endpoint/input_test.go @@ -10,6 +10,7 @@ import ( "errors" "io" "net/http" + "slices" "strings" "sync" "testing" @@ -24,19 +25,20 @@ import ( ) var serverPoolTests = []struct { - name string - method string - cfgs []*httpEndpoint - events []target - want []mapstr.M - wantErr error + name string + method string + cfgs []*httpEndpoint + events []target + want []mapstr.M + wantStatus int + wantErr error }{ { name: "single", cfgs: []*httpEndpoint{{ addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -50,6 +52,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -63,7 +66,7 @@ var serverPoolTests = []struct { addr: "127.0.0.1:9001", config: config{ Method: http.MethodPut, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -77,6 +80,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -90,7 +94,7 @@ var serverPoolTests = []struct { addr: "127.0.0.1:9001", config: config{ Method: http.MethodPatch, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -104,6 +108,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -116,7 +121,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -128,7 +133,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9002", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9002", @@ -143,6 +148,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9002/b/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/a/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -155,7 +161,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -167,7 +173,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -182,6 +188,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/b/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/a/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -194,7 +201,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -207,7 +214,7 @@ var serverPoolTests = []struct { addr: "127.0.0.1:9001", config: config{ TLS: &tlscommon.ServerConfig{}, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -228,7 +235,7 @@ var serverPoolTests = []struct { TLS: &tlscommon.ServerConfig{ VerificationMode: tlscommon.VerifyStrict, }, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -243,7 +250,7 @@ var serverPoolTests = []struct { TLS: &tlscommon.ServerConfig{ VerificationMode: tlscommon.VerifyNone, }, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -255,11 +262,87 @@ var serverPoolTests = []struct { }, wantErr: invalidTLSStateErr{addr: "127.0.0.1:9001", reason: "configuration options do not agree"}, }, + { + name: "exceed_max_in_flight", + method: http.MethodPost, + cfgs: []*httpEndpoint{{ + addr: "127.0.0.1:9001", + config: config{ + Method: http.MethodPost, + ResponseCode: http.StatusOK, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/", + Prefix: "json", + MaxInFlight: 2, + RetryAfter: 10, + ContentType: "application/json", + }, + }}, + events: []target{ + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"a":1}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`, wantHeader: http.Header{"Retry-After": {"10"}}}, + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"b":2}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`, wantHeader: http.Header{"Retry-After": {"10"}}}, + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"c":3}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`, wantHeader: http.Header{"Retry-After": {"10"}}}, + }, + wantStatus: http.StatusServiceUnavailable, + want: nil, + }, + { + name: "not_exceed_max_in_flight", + method: http.MethodPost, + cfgs: []*httpEndpoint{{ + addr: "127.0.0.1:9001", + config: config{ + Method: http.MethodPost, + ResponseCode: http.StatusOK, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/", + Prefix: "json", + MaxInFlight: 20, + RetryAfter: 10, + ContentType: "application/json", + }, + }}, + events: []target{ + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"a":1}`, wantBody: `{"message": "success"}`, wantHeader: http.Header{"Retry-After": nil}}, + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"b":2}`, wantBody: `{"message": "success"}`, wantHeader: http.Header{"Retry-After": nil}}, + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"c":3}`, wantBody: `{"message": "success"}`, wantHeader: http.Header{"Retry-After": nil}}, + }, + wantStatus: http.StatusOK, + want: []mapstr.M{ + {"json": mapstr.M{"a": int64(1)}}, + {"json": mapstr.M{"b": int64(2)}}, + {"json": mapstr.M{"c": int64(3)}}, + }, + }, } type target struct { - url string - event string + url string + event string + wantBody string + wantHeader http.Header +} + +// isWantedHeader returns whether got includes the wanted header and that +// the values match. A nil value for a header in the receiver matches absence +// of that header in the got parameter. +func (t target) isWantedHeader(got http.Header) bool { + for h, v := range t.wantHeader { + if v == nil { + if _, ok := got[h]; ok { + return false + } + continue + } + if !slices.Equal(got[h], v) { + return false + } + } + return true } func TestServerPool(t *testing.T) { @@ -309,9 +392,15 @@ func TestServerPool(t *testing.T) { t.Fatalf("failed to post event #%d: %v", i, err) } body := dump(resp.Body) - if resp.StatusCode != http.StatusOK { - t.Errorf("unexpected response status code: %s (%d)\nresp: %s", - resp.Status, resp.StatusCode, body) + if resp.StatusCode != test.wantStatus { + t.Errorf("unexpected response status code: %s (%d), want: %d\nresp: %s", + resp.Status, resp.StatusCode, test.wantStatus, body) + } + if len(e.wantBody) != 0 && string(body) != e.wantBody { + t.Errorf("unexpected response body:\ngot: %s\nwant:%s", body, e.wantBody) + } + if !e.isWantedHeader(resp.Header) { + t.Errorf("unexpected header:\n--- want\n+++ got\n%s", cmp.Diff(e.wantHeader, resp.Header)) } } cancel() @@ -320,8 +409,8 @@ func TestServerPool(t *testing.T) { for _, e := range pub.events { got = append(got, e.Fields) } - if !cmp.Equal(got, test.want) { - t.Errorf("unexpected result:\n--- got\n--- want\n%s", cmp.Diff(got, test.want)) + if !cmp.Equal(test.want, got) { + t.Errorf("unexpected result:\n--- want\n+++ got\n%s", cmp.Diff(test.want, got)) } // Try to re-register the same addresses.