Skip to content

Commit

Permalink
[8.16](backport #42225) [filebeat][websocket] - Added infinite & blan…
Browse files Browse the repository at this point in the history
…ket retry options to websockets and improved logging and retry logic (#42234)

* [filebeat][websocket] - Added infinite & blanket retry options to websockets and improved logging and retry logic  (#42225)

* added blanket & infinite retry options and improved logging

(cherry picked from commit 177a47a)

* Update CHANGELOG.next.asciidoc

---------

Co-authored-by: ShourieG <shourie.ganguly@elastic.co>
  • Loading branch information
mergify[bot] and ShourieG authored Jan 7, 2025
1 parent 5cb6e21 commit 80b0af9
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]

*Auditbeat*

Expand Down
14 changes: 13 additions & 1 deletion x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely
[float]
==== `retry`

The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries.
The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default.

["source","yaml",subs="attributes"]
----
Expand All @@ -333,6 +333,8 @@ filebeat.inputs:
max_attempts: 5
wait_min: 1s
wait_max: 10s
blanket_retries: false
infinite_retries: false
----
[float]
==== `retry.max_attempts`
Expand All @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds.

[float]
==== `retry.blanket_retries`

Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying.

[float]
==== `retry.infinite_retries`

Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed.

[float]
=== `timeout`
Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds.
Expand Down
10 changes: 6 additions & 4 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ type redact struct {
}

type retry struct {
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
BlanketRetries bool `config:"blanket_retries"`
InfiniteRetries bool `config:"infinite_retries"`
}

type authConfig struct {
Expand Down Expand Up @@ -136,7 +138,7 @@ func (c config) Validate() error {

if c.Retry != nil {
switch {
case c.Retry.MaxAttempts <= 0:
case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries:
return errors.New("max_attempts must be greater than zero")
case c.Retry.WaitMin > c.Retry.WaitMax:
return errors.New("wait_min must be less than or equal to wait_max")
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/streaming/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ var configTests = []struct {
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_retry_with_infinite",
config: map[string]interface{}{
"retry": map[string]interface{}{
"infinite_retries": true,
"max_attempts": 0,
"wait_min": "1s",
"wait_max": "2s",
},
"url": "wss://localhost:443/v1/stream",
},
},
}

func TestConfig(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ var inputTests = []struct {
"wait_max": "2s",
},
},
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"),
},
{
name: "single_event_tls",
Expand Down
43 changes: 30 additions & 13 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
_, message, err := c.ReadMessage()
if err != nil {
s.metrics.errorsTotal.Inc()
if !isRetryableError(err) {
if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) {
s.log.Errorw("failed to read websocket data", "error", err)
return err
}
Expand Down Expand Up @@ -233,21 +233,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log
}
if cfg.Retry != nil {
retryConfig := cfg.Retry
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
if !retryConfig.InfiniteRetries {
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
} else {
log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode)
}
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
continue
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode)
} else {
for attempt := 1; ; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
} else {
log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode)
}
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt)
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err)
}

return dialer.DialContext(ctx, url, headers)
Expand Down

0 comments on commit 80b0af9

Please sign in to comment.