Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: fix handling of http_endpoint re…
Browse files Browse the repository at this point in the history
…quest exceeding memory limits

The input does not have a way to communicate back-pressure to clients,
potentially leading to unconstrained growth in the publisher event queue
and an OoM event. This change adds a mechanism to keep track of the
total sum of in-flight message bytes from the client in order to allow
the server to return a 429 HTTP status when the total is too large.

Note that this does not monitor the total memory in the queue as that
would require a complete understanding of the allocations in the
preparation of event values to be sent to the publisher, but rather uses
the message length as a reasonable proxy.
  • Loading branch information
efd6 committed Nov 24, 2024
1 parent 6d1c81e commit ac98f9f
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Fix missing key in streaming input logging. {pull}41600[41600]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]

*Heartbeat*

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ These are the possible response codes from the server.
| 405 | Method Not Allowed | Returned if methods other than POST are used.
| 406 | Not Acceptable | Returned if the POST request does not contain a body.
| 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip.
| 429 | Too Many Requests | Returned if the length of the request body would take the total number of in-flight bytes above the configured `max_in_flight_bytes` value.
| 500 | Internal Server Error | Returned if an I/O error occurs reading the request.
| 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout.
|=========================================================================================================================================================
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type config struct {
URL string `config:"url" validate:"required"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
MaxInFlight int64 `config:"max_in_flight_bytes"`
Program string `config:"program"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
Expand Down
41 changes: 41 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ type handler struct {
txBaseID string // Random value to make transaction IDs unique.
txIDCounter atomic.Uint64 // Transaction ID counter that is incremented for each request.

// inFlight is the sum of message body length
// that have been received but not yet ACKed
// or timed out or otherwise handled.
//
// Requests that do not request a timeout do
// not contribute to this value.
inFlight atomic.Int64
// maxInFlight is the maximum value of inFligh
// that will be allowed for any messages received
// by the handler. If non-zero, inFlight may
// not exceed this value.
maxInFlight int64

reqLogger *zap.Logger
host, scheme string

Expand Down Expand Up @@ -86,9 +99,37 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
acked chan struct{}
timeout *time.Timer
)
if h.maxInFlight != 0 {
// Consider non-ACKing messages as well. These do not add
// to the sum of in-flight bytes, but we can still assess
// whether a message would take us over the limit.
inFlight := h.inFlight.Load() + r.ContentLength
if inFlight > h.maxInFlight {
w.WriteHeader(http.StatusTooManyRequests)
w.Header().Set(headerContentEncoding, "application/json")
_, err := fmt.Fprintf(w,
`{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`,
h.maxInFlight, inFlight,
)
if err != nil {
h.log.Errorw("failed to write 429", "error", err)
}
return
}
}
if wait != 0 {
acked = make(chan struct{})
timeout = time.NewTimer(wait)
h.inFlight.Add(r.ContentLength)
defer func() {
// Any return will be a message handling completion and the
// the removal of the allocation from the queue assuming that
// the client has requested a timeout. Either we have an early
// error condition or timeout and the message is dropped, we
// have ACKed all the events in the request, or the input has
// been cancelled.
h.inFlight.Add(-r.ContentLength)
}()
}
start := time.Now()
acker := newBatchACKTracker(func() {
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event
hmacType: c.HMACType,
hmacPrefix: c.HMACPrefix,
},
maxInFlight: c.MaxInFlight,
program: prg,
messageField: c.Prefix,
responseCode: c.ResponseCode,
Expand Down
79 changes: 57 additions & 22 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ import (
)

var serverPoolTests = []struct {
name string
method string
cfgs []*httpEndpoint
events []target
want []mapstr.M
wantErr error
name string
method string
cfgs []*httpEndpoint
events []target
want []mapstr.M
wantStatus int
wantErr error
}{
{
name: "single",
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -50,6 +51,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -63,7 +65,7 @@ var serverPoolTests = []struct {
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPut,
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -77,6 +79,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -90,7 +93,7 @@ var serverPoolTests = []struct {
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPatch,
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -104,6 +107,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -116,7 +120,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -128,7 +132,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9002",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9002",
Expand All @@ -143,6 +147,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9002/b/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/a/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -155,7 +160,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -167,7 +172,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -182,6 +187,7 @@ var serverPoolTests = []struct {
{url: "http://127.0.0.1:9001/b/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/a/", event: `{"c":3}`},
},
wantStatus: http.StatusOK,
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
Expand All @@ -194,7 +200,7 @@ var serverPoolTests = []struct {
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -207,7 +213,7 @@ var serverPoolTests = []struct {
addr: "127.0.0.1:9001",
config: config{
TLS: &tlscommon.ServerConfig{},
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -228,7 +234,7 @@ var serverPoolTests = []struct {
TLS: &tlscommon.ServerConfig{
VerificationMode: tlscommon.VerifyStrict,
},
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -243,7 +249,7 @@ var serverPoolTests = []struct {
TLS: &tlscommon.ServerConfig{
VerificationMode: tlscommon.VerifyNone,
},
ResponseCode: 200,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
Expand All @@ -255,11 +261,37 @@ var serverPoolTests = []struct {
},
wantErr: invalidTLSStateErr{addr: "127.0.0.1:9001", reason: "configuration options do not agree"},
},
{
name: "exceed_max_in_flight",
method: http.MethodPost,
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
Method: http.MethodPost,
ResponseCode: http.StatusOK,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/",
Prefix: "json",
MaxInFlight: 2,
ContentType: "application/json",
},
}},
events: []target{
{url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"a":1}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`},
{url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"b":2}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`},
{url: "http://127.0.0.1:9001/?wait_for_completion_timeout=1s", event: `{"c":3}`, wantBody: `{"warn":"max in flight message memory exceeded","max_in_flight":2,"in_flight":7}`},
},
wantStatus: http.StatusTooManyRequests,
want: nil,
},
}

type target struct {
url string
event string
url string
event string
wantBody string
}

func TestServerPool(t *testing.T) {
Expand Down Expand Up @@ -309,9 +341,12 @@ func TestServerPool(t *testing.T) {
t.Fatalf("failed to post event #%d: %v", i, err)
}
body := dump(resp.Body)
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response status code: %s (%d)\nresp: %s",
resp.Status, resp.StatusCode, body)
if resp.StatusCode != test.wantStatus {
t.Errorf("unexpected response status code: %s (%d), want: %d\nresp: %s",
resp.Status, resp.StatusCode, test.wantStatus, body)
}
if len(e.wantBody) != 0 && string(body) != e.wantBody {
t.Errorf("unexpected response body:\ngot: %s\nwant:%s", body, e.wantBody)
}
}
cancel()
Expand Down

0 comments on commit ac98f9f

Please sign in to comment.