diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3570ba5e5ad2..4ba7bd550622 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -181,6 +181,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Access Points in the `aws-s3` input. {pull}41495[41495] - Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664] - Fix missing key in streaming input logging. {pull}41600[41600] +- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index afd39fec0f12..c21de102168e 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -40,6 +40,7 @@ These are the possible response codes from the server. | 406 | Not Acceptable | Returned if the POST request does not contain a body. | 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip. | 500 | Internal Server Error | Returned if an I/O error occurs reading the request. +| 503 | Service Unavailable | Returned if the length of the request body would take the total number of in-flight bytes above the configured `max_in_flight_bytes` value. | 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout. |========================================================================================================================================================= @@ -285,6 +286,16 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON. In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null. +[float] +==== `max_inf_light_bytes` + +The total sum of request body lengths that are allowed at any given time. If non-zero, the input will compare this value to the sum of in-flight request body lengths from requests that include a `wait_for_completion_timeout` request query and will return a 503 HTTP status code, along with a Retry-After header configured with the `retry_after` option. The default value for this option is zero, no limit. + +[float] +==== `retry_after` + +If a request has exceeded the `max_in_light_bytes` limit, the response to the client will include a Retry-After header specifying how many seconds the client should wait to retry again. The default value for this option is 10 seconds. + [float] ==== `program` diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 5e1c93d2bc36..977e7b5d7d26 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -37,6 +37,8 @@ type config struct { URL string `config:"url" validate:"required"` Prefix string `config:"prefix"` ContentType string `config:"content_type"` + MaxInFlight int64 `config:"max_in_flight_bytes"` + RetryAfter int `config:"retry_after"` Program string `config:"program"` SecretHeader string `config:"secret.header"` SecretValue string `config:"secret.value"` @@ -66,6 +68,7 @@ func defaultConfig() config { BasicAuth: false, ResponseCode: 200, ResponseBody: `{"message": "success"}`, + RetryAfter: 10, ListenAddress: "127.0.0.1", ListenPort: "8000", URL: "/", diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go index 27f4d12253ef..3677fdb4adef 100644 --- a/x-pack/filebeat/input/http_endpoint/handler.go +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -56,6 +56,20 @@ type handler struct { txBaseID string // Random value to make transaction IDs unique. txIDCounter atomic.Uint64 // Transaction ID counter that is incremented for each request. + // inFlight is the sum of message body length + // that have been received but not yet ACKed + // or timed out or otherwise handled. + // + // Requests that do not request a timeout do + // not contribute to this value. + inFlight atomic.Int64 + // maxInFlight is the maximum value of inFligh + // that will be allowed for any messages received + // by the handler. If non-zero, inFlight may + // not exceed this value. + maxInFlight int64 + retryAfter int + reqLogger *zap.Logger host, scheme string @@ -86,9 +100,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { acked chan struct{} timeout *time.Timer ) + if h.maxInFlight != 0 { + // Consider non-ACKing messages as well. These do not add + // to the sum of in-flight bytes, but we can still assess + // whether a message would take us over the limit. + inFlight := h.inFlight.Load() + r.ContentLength + if inFlight > h.maxInFlight { + w.WriteHeader(http.StatusServiceUnavailable) + w.Header().Set(headerContentEncoding, "application/json") + w.Header().Set("Retry-After", strconv.Itoa(h.retryAfter)) + _, err := fmt.Fprintf(w, + `{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`, + h.maxInFlight, inFlight, + ) + if err != nil { + h.log.Errorw("failed to write 429", "error", err) + } + return + } + } if wait != 0 { acked = make(chan struct{}) timeout = time.NewTimer(wait) + h.inFlight.Add(r.ContentLength) + defer func() { + // Any return will be a message handling completion and the + // the removal of the allocation from the queue assuming that + // the client has requested a timeout. Either we have an early + // error condition or timeout and the message is dropped, we + // have ACKed all the events in the request, or the input has + // been cancelled. + h.inFlight.Add(-r.ContentLength) + }() } start := time.Now() acker := newBatchACKTracker(func() { diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index b4ad07e7626b..6d6b0cbc3f41 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -347,6 +347,8 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event hmacType: c.HMACType, hmacPrefix: c.HMACPrefix, }, + maxInFlight: c.MaxInFlight, + retryAfter: c.RetryAfter, program: prg, messageField: c.Prefix, responseCode: c.ResponseCode, diff --git a/x-pack/filebeat/input/http_endpoint/input_test.go b/x-pack/filebeat/input/http_endpoint/input_test.go index 3f530454e1d8..63391ffb2f74 100644 --- a/x-pack/filebeat/input/http_endpoint/input_test.go +++ b/x-pack/filebeat/input/http_endpoint/input_test.go @@ -24,19 +24,20 @@ import ( ) var serverPoolTests = []struct { - name string - method string - cfgs []*httpEndpoint - events []target - want []mapstr.M - wantErr error + name string + method string + cfgs []*httpEndpoint + events []target + want []mapstr.M + wantStatus int + wantErr error }{ { name: "single", cfgs: []*httpEndpoint{{ addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -50,6 +51,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -63,7 +65,7 @@ var serverPoolTests = []struct { addr: "127.0.0.1:9001", config: config{ Method: http.MethodPut, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -77,6 +79,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -90,7 +93,7 @@ var serverPoolTests = []struct { addr: "127.0.0.1:9001", config: config{ Method: http.MethodPatch, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -104,6 +107,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -116,7 +120,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -128,7 +132,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9002", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9002", @@ -143,6 +147,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9002/b/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/a/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -155,7 +160,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -167,7 +172,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -182,6 +187,7 @@ var serverPoolTests = []struct { {url: "http://127.0.0.1:9001/b/", event: `{"b":2}`}, {url: "http://127.0.0.1:9001/a/", event: `{"c":3}`}, }, + wantStatus: http.StatusOK, want: []mapstr.M{ {"json": mapstr.M{"a": int64(1)}}, {"json": mapstr.M{"b": int64(2)}}, @@ -194,7 +200,7 @@ var serverPoolTests = []struct { { addr: "127.0.0.1:9001", config: config{ - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -207,7 +213,7 @@ var serverPoolTests = []struct { addr: "127.0.0.1:9001", config: config{ TLS: &tlscommon.ServerConfig{}, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -228,7 +234,7 @@ var serverPoolTests = []struct { TLS: &tlscommon.ServerConfig{ VerificationMode: tlscommon.VerifyStrict, }, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -243,7 +249,7 @@ var serverPoolTests = []struct { TLS: &tlscommon.ServerConfig{ VerificationMode: tlscommon.VerifyNone, }, - ResponseCode: 200, + ResponseCode: http.StatusOK, ResponseBody: `{"message": "success"}`, ListenAddress: "127.0.0.1", ListenPort: "9001", @@ -255,11 +261,37 @@ var serverPoolTests = []struct { }, wantErr: invalidTLSStateErr{addr: "127.0.0.1:9001", reason: "configuration options do not agree"}, }, + { + name: "exceed_max_in_flight", + method: http.MethodPost, + cfgs: []*httpEndpoint{{ + addr: "127.0.0.1:9001", + config: config{ + Method: http.MethodPost, + ResponseCode: http.StatusOK, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/", + Prefix: "json", + MaxInFlight: 2, + ContentType: "application/json", + }, + }}, + events: []target{ + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"a":1}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`}, + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"b":2}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`}, + {url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"c":3}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`}, + }, + wantStatus: http.StatusServiceUnavailable, + want: nil, + }, } type target struct { - url string - event string + url string + event string + wantBody string } func TestServerPool(t *testing.T) { @@ -309,9 +341,12 @@ func TestServerPool(t *testing.T) { t.Fatalf("failed to post event #%d: %v", i, err) } body := dump(resp.Body) - if resp.StatusCode != http.StatusOK { - t.Errorf("unexpected response status code: %s (%d)\nresp: %s", - resp.Status, resp.StatusCode, body) + if resp.StatusCode != test.wantStatus { + t.Errorf("unexpected response status code: %s (%d), want: %d\nresp: %s", + resp.Status, resp.StatusCode, test.wantStatus, body) + } + if len(e.wantBody) != 0 && string(body) != e.wantBody { + t.Errorf("unexpected response body:\ngot: %s\nwant:%s", body, e.wantBody) } } cancel()