Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: fix handling of http_endpoint re…
Browse files Browse the repository at this point in the history
…quest exceeding memory limits

The input does not have a way to communicate back-pressure to clients,
potentially leading to unconstrained growth in the publisher event queue
and an OoM event. This change adds a mechanism to keep track of the
total sum of in-flight message bytes from the client in order to allow
the server to return a 503 HTTP status when the total is too large.

Note that this does not monitor the total memory in the queue as that
would require a complete understanding of the allocations in the
preparation of event values to be sent to the publisher, but rather uses
the message length as a reasonable proxy.
  • Loading branch information
efd6 committed Nov 25, 2024
1 parent 6d1c81e commit 7c7251a
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Fix missing key in streaming input logging. {pull}41600[41600]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]

*Heartbeat*

Expand Down
11 changes: 11 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ These are the possible response codes from the server.
| 406 | Not Acceptable | Returned if the POST request does not contain a body.
| 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip.
| 500 | Internal Server Error | Returned if an I/O error occurs reading the request.
| 503 | Service Unavailable | Returned if the length of the request body would take the total number of in-flight bytes above the configured `max_in_flight_bytes` value.
| 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout.
|=========================================================================================================================================================

Expand Down Expand Up @@ -285,6 +286,16 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a
By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON.
In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null.

[float]
==== `max_inf_light_bytes`

The total sum of request body lengths that are allowed at any given time. If non-zero, the input will compare this value to the sum of in-flight request body lengths from requests that include a `wait_for_completion_timeout` request query and will return a 503 HTTP status code, along with a Retry-After header configured with the `retry_after` option. The default value for this option is zero, no limit.

[float]
==== `retry_after`

If a request has exceeded the `max_in_light_bytes` limit, the response to the client will include a Retry-After header specifying how many seconds the client should wait to retry again. The default value for this option is 10 seconds.

[float]
==== `program`

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type config struct {
URL string `config:"url" validate:"required"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
MaxInFlight int64 `config:"max_in_flight_bytes"`
RetryAfter int `config:"retry_after"`
Program string `config:"program"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
Expand Down Expand Up @@ -66,6 +68,7 @@ func defaultConfig() config {
BasicAuth: false,
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
RetryAfter: 10,
ListenAddress: "127.0.0.1",
ListenPort: "8000",
URL: "/",
Expand Down
43 changes: 43 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ type handler struct {
txBaseID string // Random value to make transaction IDs unique.
txIDCounter atomic.Uint64 // Transaction ID counter that is incremented for each request.

// inFlight is the sum of message body length
// that have been received but not yet ACKed
// or timed out or otherwise handled.
//
// Requests that do not request a timeout do
// not contribute to this value.
inFlight atomic.Int64
// maxInFlight is the maximum value of inFligh
// that will be allowed for any messages received
// by the handler. If non-zero, inFlight may
// not exceed this value.
maxInFlight int64
retryAfter int

reqLogger *zap.Logger
host, scheme string

Expand Down Expand Up @@ -86,9 +100,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
acked chan struct{}
timeout *time.Timer
)
if h.maxInFlight != 0 {
// Consider non-ACKing messages as well. These do not add
// to the sum of in-flight bytes, but we can still assess
// whether a message would take us over the limit.
inFlight := h.inFlight.Load() + r.ContentLength
if inFlight > h.maxInFlight {
w.WriteHeader(http.StatusServiceUnavailable)
w.Header().Set(headerContentEncoding, "application/json")
w.Header().Set("Retry-After", strconv.Itoa(h.retryAfter))
_, err := fmt.Fprintf(w,
`{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`,
h.maxInFlight, inFlight,
)
if err != nil {
h.log.Errorw("failed to write 429", "error", err)
}
return
}
}
if wait != 0 {
acked = make(chan struct{})
timeout = time.NewTimer(wait)
h.inFlight.Add(r.ContentLength)
defer func() {
// Any return will be a message handling completion and the
// the removal of the allocation from the queue assuming that
// the client has requested a timeout. Either we have an early
// error condition or timeout and the message is dropped, we
// have ACKed all the events in the request, or the input has
// been cancelled.
h.inFlight.Add(-r.ContentLength)
}()
}
start := time.Now()
acker := newBatchACKTracker(func() {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event
hmacType: c.HMACType,
hmacPrefix: c.HMACPrefix,
},
maxInFlight: c.MaxInFlight,
retryAfter: c.RetryAfter,
program: prg,
messageField: c.Prefix,
responseCode: c.ResponseCode,
Expand Down
79 changes: 57 additions & 22 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ import (
)

var serverPoolTests = []struct {
name string
method string
cfgs []*httpEndpoint
events []target
want []mapstr.M
wantErr error
name string
method string
cfgs []*httpEndpoint
events []target
want []mapstr.M
wantStatus int
wantErr error
}{
{
name: "single",
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -50,6 +51,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -63,7 +65,7 @@ var serverPoolTests = []struct {
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPut,
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -77,6 +79,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -90,7 +93,7 @@ var serverPoolTests = []struct {
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPatch,
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -104,6 +107,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -116,7 +120,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -128,7 +132,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9002",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9002",
Expand All @@ -143,6 +147,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9002/b/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/a/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -155,7 +160,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -167,7 +172,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -182,6 +187,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/b/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/a/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -194,7 +200,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -207,7 +213,7 @@ var serverPoolTests = []struct {
addr: "127.0.0.1:9001",
config: config{
TLS: &tlscommon.ServerConfig{},
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -228,7 +234,7 @@ var serverPoolTests = []struct {
TLS: &tlscommon.ServerConfig{
VerificationMode: tlscommon.VerifyStrict,
},
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -243,7 +249,7 @@ var serverPoolTests = []struct {
TLS: &tlscommon.ServerConfig{
VerificationMode: tlscommon.VerifyNone,
},
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -255,11 +261,37 @@ var serverPoolTests = []struct {
},
wantErr: invalidTLSStateErr{addr: "127.0.0.1:9001", reason: "configuration options do not agree"},
},
{
name: "exceed_max_in_flight",
method: http.MethodPost,
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPost,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/",
Prefix: "json",
MaxInFlight: 2,
ContentType: "application/json",
},
}},
events: []target{
{url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"a":1}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`},
{url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"b":2}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`},
{url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"c":3}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`},
},
wantStatus: http.StatusServiceUnavailable,
want: nil,
},
}

type target struct {
url string
event string
url string
event string
wantBody string
}

func TestServerPool(t *testing.T) {
Expand Down Expand Up @@ -309,9 +341,12 @@ func TestServerPool(t *testing.T) {
t.Fatalf("failed to post event #%d: %v", i, err)
}
body := dump(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response status code: %s (%d)\nresp: %s",
resp.Status, resp.StatusCode, body)
if resp.StatusCode != test.wantStatus {
t.Errorf("unexpected response status code: %s (%d), want: %d\nresp: %s",
resp.Status, resp.StatusCode, test.wantStatus, body)
}
if len(e.wantBody) != 0 && string(body) != e.wantBody {
t.Errorf("unexpected response body:\ngot: %s\nwant:%s", body, e.wantBody)
}
}
cancel()
Expand Down

0 comments on commit 7c7251a

Please sign in to comment.