Skip to content

Commit

Permalink
x-pack/filebeat/input/streaming: log websocket bad handshake details (#…
Browse files Browse the repository at this point in the history
…41300)

(cherry picked from commit 0c2f9e7)

# Conflicts:
#	x-pack/filebeat/input/streaming/websocket.go
  • Loading branch information
efd6 authored and mergify[bot] committed Oct 18, 2024
1 parent 9baecba commit 482db84
Show file tree
Hide file tree
Showing 2 changed files with 300 additions and 0 deletions.
29 changes: 29 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,35 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917]
- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985]
- Updated Websocket input title to align with existing inputs {pull}39006[39006]
- Restore netflow input on Windows {pull}39024[39024]
- Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861]
- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131]
- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420]
- Fix request trace filename handling in http_endpoint input. {pull}39410[39410]
- Fix filestream not correctly tracking the offset of a file when using the `include_message` parser. {pull}39873[39873] {issue}39653[39653]
- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036]
- Fix for Google Workspace duplicate events issue by adding canonical sorting over fingerprint keys array to maintain key order. {pull}40055[40055] {issue}39859[39859]
- Fix handling of deeply nested numeric values in HTTP Endpoint CEL programs. {pull}40115[40115]
- Prevent panic in CEL and salesforce inputs when github.com/hashicorp/go-retryablehttp exceeds maximum retries. {pull}40144[40144]
- Fix bug in CEL input rate limit logic. {issue}40106[40106] {pull}40270[40270]
- Relax requirements in Okta entity analytics provider user and device profile data shape. {pull}40359[40359]
- Fix bug in Okta entity analytics rate limit logic. {issue}40106[40106] {pull}40267[40267]
- Fix crashes in the journald input. {pull}40061[40061]
- Fix order of configuration for EntraID entity analytics provider. {pull}40487[40487]
- Ensure Entra ID request bodies are not truncated and trace logs are rotated before 100MB. {pull}40494[40494]
- The Elasticsearch output now correctly logs the event fields to the event log file {issue}40509[40509] {pull}40512[40512]
- Fix the "No such input type exist: 'azure-eventhub'" error on the Windows platform {issue}40608[40608] {pull}40609[40609]
- awss3 input: Fix handling of SQS notifications that don't contain a region. {pull}40628[40628]
- Fix credential handling when workload identity is being used in GCS input. {issue}39977[39977] {pull}40663[40663]
- Fix publication of group data from the Okta entity analytics provider. {pull}40681[40681]
- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730]
- Fix replace processor handling of zero string replacement validation. {pull}40751[40751]
- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909]
- Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]
- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192]
- Log bad handshake details when websocket connection fails {pull}41300[41300]

*Heartbeat*

Expand Down
271 changes: 271 additions & 0 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package streaming

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"math/rand/v2"
"net"
"net/http"
"strings"
"time"

"github.com/gorilla/websocket"
"go.uber.org/zap/zapcore"

inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/elastic-agent-libs/logp"
)

type websocketStream struct {
processor

id string
cfg config
cursor map[string]any

time func() time.Time
}

// NewWebsocketFollower performs environment construction including CEL
// program and regexp compilation, and input metrics set-up for a websocket
// stream follower.
func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map[string]any, pub inputcursor.Publisher, log *logp.Logger, now func() time.Time) (StreamFollower, error) {
s := websocketStream{
id: id,
cfg: cfg,
cursor: cursor,
processor: processor{
ns: "websocket",
pub: pub,
log: log,
redact: cfg.Redact,
metrics: newInputMetrics(id),
},
}
s.metrics.url.Set(cfg.URL.String())
s.metrics.errorsTotal.Set(0)

patterns, err := regexpsFromConfig(cfg)
if err != nil {
s.metrics.errorsTotal.Inc()
s.Close()
return nil, err
}

s.prg, s.ast, err = newProgram(ctx, cfg.Program, root, patterns, log)
if err != nil {
s.metrics.errorsTotal.Inc()
s.Close()
return nil, err
}

return &s, nil
}

// FollowStream receives, processes and publishes events from the subscribed
// websocket stream.
func (s *websocketStream) FollowStream(ctx context.Context) error {
state := s.cfg.State
if state == nil {
state = make(map[string]any)
}
if s.cursor != nil {
state["cursor"] = s.cursor
}

// initialize the input url with the help of the url_program.
url, err := getURL(ctx, "websocket", s.cfg.URLProgram, s.cfg.URL.String(), state, s.cfg.Redact, s.log, s.now)
if err != nil {
s.metrics.errorsTotal.Inc()
return err
}

// websocket client
c, resp, err := connectWebSocket(ctx, s.cfg, url, s.log)
handleConnectionResponse(resp, s.metrics, s.log)
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to establish websocket connection", "error", err)
return err
}

// ensures this is the last connection closed when the function returns
defer func() {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
}()

for {
select {
case <-ctx.Done():
s.log.Debugw("context cancelled, closing websocket connection")
return ctx.Err()
default:
_, message, err := c.ReadMessage()
if err != nil {
s.metrics.errorsTotal.Inc()
if isRetryableError(err) {
s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err)
// close the old connection and reconnect
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
// since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it
c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log)
handleConnectionResponse(resp, s.metrics, s.log)
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to reconnect websocket connection", "error", err)
return err
}
} else {
s.log.Errorw("failed to read websocket data", "error", err)
return err
}
}
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
}
}
}

// isRetryableError checks if the error is retryable based on the error type.
func isRetryableError(err error) bool {
// check for specific network errors
var netErr *net.OpError
if errors.As(err, &netErr) {
switch {
case netErr.Op == "dial" && netErr.Err.Error() == "i/o timeout",
netErr.Op == "read" && netErr.Err.Error() == "i/o timeout",
netErr.Op == "read" && netErr.Err.Error() == "connection reset by peer",
netErr.Op == "read" && netErr.Err.Error() == "connection refused",
netErr.Op == "read" && netErr.Err.Error() == "connection reset",
netErr.Op == "read" && netErr.Err.Error() == "connection closed":
return true
}
}

// check for specific websocket close errors
var closeErr *websocket.CloseError
if errors.As(err, &closeErr) {
switch closeErr.Code {
case websocket.CloseGoingAway,
websocket.CloseNormalClosure,
websocket.CloseInternalServerErr,
websocket.CloseTryAgainLater,
websocket.CloseServiceRestart,
websocket.CloseTLSHandshake:
return true
}
}

// check for common error patterns
if strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "connection reset") ||
strings.Contains(err.Error(), "temporary failure") ||
strings.Contains(err.Error(), "server is busy") {
return true
}

return false
}

// handleConnectionResponse logs the response body of the websocket connection.
func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *logp.Logger) {
if resp != nil && resp.Body != nil {
var buf bytes.Buffer
defer resp.Body.Close()

if log.Core().Enabled(zapcore.DebugLevel) {
const limit = 1e4
if _, err := io.CopyN(&buf, resp.Body, limit); err != nil && !errors.Is(err, io.EOF) {
metrics.errorsTotal.Inc()
fmt.Fprintf(&buf, "failed to read websocket response body with error: (%s) \n", err)
}
}

// discard the remaining part of the body and check for truncation.
if n, err := io.Copy(io.Discard, resp.Body); err != nil {
metrics.errorsTotal.Inc()
fmt.Fprintf(&buf, "failed to discard remaining response body with error: (%s) ", err)
} else if n != 0 && buf.Len() != 0 {
buf.WriteString("... truncated")
}

log.Debugw("websocket connection response", "body", &buf)
}
}

// connectWebSocket attempts to connect to the websocket server with exponential backoff if retry config is available else it connects without retry.
func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Logger) (*websocket.Conn, *http.Response, error) {
var conn *websocket.Conn
var response *http.Response
var err error
headers := formHeader(cfg)

if cfg.Retry != nil {
retryConfig := cfg.Retry
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = websocket.DefaultDialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
continue
}
log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt)
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err)
}

return websocket.DefaultDialer.DialContext(ctx, url, headers)
}

// calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm.
func calculateWaitTime(waitMin, waitMax time.Duration, attempt int) time.Duration {
// calculate exponential backoff
base := float64(waitMin)
backoff := base * math.Pow(2, float64(attempt-1))

// calculate jitter proportional to the backoff
maxJitter := float64(waitMax-waitMin) * math.Pow(2, float64(attempt-1))
jitter := rand.Float64() * maxJitter

waitTime := time.Duration(backoff + jitter)

return waitTime
}

// now is time.Now with a modifiable time source.
func (s *websocketStream) now() time.Time {
if s.time == nil {
return time.Now()
}
return s.time()
}

func (s *websocketStream) Close() error {
s.metrics.Close()
return nil
}

0 comments on commit 482db84

Please sign in to comment.