Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17](backport #42225) [filebeat][websocket] - Added infinite & blanket retry options to websockets and improved logging and retry logic #42236

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]

*Auditbeat*

Expand Down
14 changes: 13 additions & 1 deletion x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely
[float]
==== `retry`

The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries.
The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default.

["source","yaml",subs="attributes"]
----
Expand All @@ -333,6 +333,8 @@ filebeat.inputs:
max_attempts: 5
wait_min: 1s
wait_max: 10s
blanket_retries: false
infinite_retries: false
----
[float]
==== `retry.max_attempts`
Expand All @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds.

[float]
==== `retry.blanket_retries`

Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying.

[float]
==== `retry.infinite_retries`

Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed.

[float]
=== `timeout`
Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds.
Expand Down
10 changes: 6 additions & 4 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ type redact struct {
}

type retry struct {
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
BlanketRetries bool `config:"blanket_retries"`
InfiniteRetries bool `config:"infinite_retries"`
}

type authConfig struct {
Expand Down Expand Up @@ -136,7 +138,7 @@ func (c config) Validate() error {

if c.Retry != nil {
switch {
case c.Retry.MaxAttempts <= 0:
case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries:
return errors.New("max_attempts must be greater than zero")
case c.Retry.WaitMin > c.Retry.WaitMax:
return errors.New("wait_min must be less than or equal to wait_max")
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/streaming/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ var configTests = []struct {
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_retry_with_infinite",
config: map[string]interface{}{
"retry": map[string]interface{}{
"infinite_retries": true,
"max_attempts": 0,
"wait_min": "1s",
"wait_max": "2s",
},
"url": "wss://localhost:443/v1/stream",
},
},
}

func TestConfig(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ var inputTests = []struct {
"wait_max": "2s",
},
},
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"),
},
{
name: "single_event_tls",
Expand Down
43 changes: 30 additions & 13 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
_, message, err := c.ReadMessage()
if err != nil {
s.metrics.errorsTotal.Inc()
if !isRetryableError(err) {
if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) {
s.log.Errorw("failed to read websocket data", "error", err)
return err
}
Expand Down Expand Up @@ -233,21 +233,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log
}
if cfg.Retry != nil {
retryConfig := cfg.Retry
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
if !retryConfig.InfiniteRetries {
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
} else {
log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode)
}
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
continue
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode)
} else {
for attempt := 1; ; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
} else {
log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode)
}
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt)
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err)
}

return dialer.DialContext(ctx, url, headers)
Expand Down
Loading