Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17](backport #41765) x-pack/filebeat/input/http_endpoint: fix handling of http_endpoint request exceeding memory limits #41820

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Log bad handshake details when websocket connection fails {pull}41300[41300]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]

*Heartbeat*

Expand Down
11 changes: 11 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ These are the possible response codes from the server.
| 406 | Not Acceptable | Returned if the POST request does not contain a body.
| 415 | Unsupported Media Type | Returned if the Content-Type is not application/json. Or if Content-Encoding is present and is not gzip.
| 500 | Internal Server Error | Returned if an I/O error occurs reading the request.
| 503 | Service Unavailable | Returned if the length of the request body would take the total number of in-flight bytes above the configured `max_in_flight_bytes` value.
| 504 | Gateway Timeout | Returned if a request publication cannot be ACKed within the required timeout.
|=========================================================================================================================================================

Expand Down Expand Up @@ -285,6 +286,16 @@ The prefix for the signature. Certain webhooks prefix the HMAC signature with a
By default the input expects the incoming POST to include a Content-Type of `application/json` to try to enforce the incoming data to be valid JSON.
In certain scenarios when the source of the request is not able to do that, it can be overwritten with another value or set to null.

[float]
==== `max_in_flight_bytes`

The total sum of request body lengths that are allowed at any given time. If non-zero, the input will compare this value to the sum of in-flight request body lengths from requests that include a `wait_for_completion_timeout` request query and will return a 503 HTTP status code, along with a Retry-After header configured with the `retry_after` option. The default value for this option is zero, no limit.

[float]
==== `retry_after`

If a request has exceeded the `max_in_flight_bytes` limit, the response to the client will include a Retry-After header specifying how many seconds the client should wait to retry again. The default value for this option is 10 seconds.

[float]
==== `program`

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type config struct {
URL string `config:"url" validate:"required"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
MaxInFlight int64 `config:"max_in_flight_bytes"`
RetryAfter int `config:"retry_after"`
Program string `config:"program"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
Expand Down Expand Up @@ -66,6 +68,7 @@ func defaultConfig() config {
BasicAuth: false,
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
RetryAfter: 10,
ListenAddress: "127.0.0.1",
ListenPort: "8000",
URL: "/",
Expand Down
43 changes: 43 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ type handler struct {
txBaseID string // Random value to make transaction IDs unique.
txIDCounter atomic.Uint64 // Transaction ID counter that is incremented for each request.

// inFlight is the sum of message body length
// that have been received but not yet ACKed
// or timed out or otherwise handled.
//
// Requests that do not request a timeout do
// not contribute to this value.
inFlight atomic.Int64
// maxInFlight is the maximum value of inFligh
// that will be allowed for any messages received
// by the handler. If non-zero, inFlight may
// not exceed this value.
maxInFlight int64
retryAfter int

reqLogger *zap.Logger
host, scheme string

Expand Down Expand Up @@ -86,9 +100,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
acked chan struct{}
timeout *time.Timer
)
if h.maxInFlight != 0 {
// Consider non-ACKing messages as well. These do not add
// to the sum of in-flight bytes, but we can still assess
// whether a message would take us over the limit.
inFlight := h.inFlight.Load() + r.ContentLength
if inFlight > h.maxInFlight {
w.Header().Set(headerContentEncoding, "application/json")
w.Header().Set("Retry-After", strconv.Itoa(h.retryAfter))
w.WriteHeader(http.StatusServiceUnavailable)
_, err := fmt.Fprintf(w,
`{"warn":"max in flight message memory exceeded","max_in_flight":%d,"in_flight":%d}`,
h.maxInFlight, inFlight,
)
if err != nil {
h.log.Errorw("failed to write 503", "error", err)
}
return
}
}
if wait != 0 {
acked = make(chan struct{})
timeout = time.NewTimer(wait)
h.inFlight.Add(r.ContentLength)
defer func() {
// Any return will be a message handling completion and the
// the removal of the allocation from the queue assuming that
// the client has requested a timeout. Either we have an early
// error condition or timeout and the message is dropped, we
// have ACKed all the events in the request, or the input has
// been cancelled.
h.inFlight.Add(-r.ContentLength)
}()
}
start := time.Now()
acker := newBatchACKTracker(func() {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ type publisher struct {
func (p *publisher) Publish(e beat.Event) {
p.mu.Lock()
p.events = append(p.events, e)
if ack, ok := e.Private.(*batchACKTracker); ok {
ack.ACK()
}
p.mu.Unlock()
}

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func newHandler(ctx context.Context, c config, prg *program, pub func(beat.Event
hmacType: c.HMACType,
hmacPrefix: c.HMACPrefix,
},
maxInFlight: c.MaxInFlight,
retryAfter: c.RetryAfter,
program: prg,
messageField: c.Prefix,
responseCode: c.ResponseCode,
Expand Down
Loading
Loading