diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4998320b6442..5fa8e5c9cd8e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -97,6 +97,35 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917] - Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985] - Updated Websocket input title to align with existing inputs {pull}39006[39006] +- Restore netflow input on Windows {pull}39024[39024] +- Upgrade azure-event-hubs-go and azure-storage-blob-go dependencies. {pull}38861[38861] +- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131] +- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420] +- Fix request trace filename handling in http_endpoint input. {pull}39410[39410] +- Fix filestream not correctly tracking the offset of a file when using the `include_message` parser. {pull}39873[39873] {issue}39653[39653] +- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036] +- Fix for Google Workspace duplicate events issue by adding canonical sorting over fingerprint keys array to maintain key order. {pull}40055[40055] {issue}39859[39859] +- Fix handling of deeply nested numeric values in HTTP Endpoint CEL programs. {pull}40115[40115] +- Prevent panic in CEL and salesforce inputs when github.com/hashicorp/go-retryablehttp exceeds maximum retries. {pull}40144[40144] +- Fix bug in CEL input rate limit logic. {issue}40106[40106] {pull}40270[40270] +- Relax requirements in Okta entity analytics provider user and device profile data shape. {pull}40359[40359] +- Fix bug in Okta entity analytics rate limit logic. {issue}40106[40106] {pull}40267[40267] +- Fix crashes in the journald input. {pull}40061[40061] +- Fix order of configuration for EntraID entity analytics provider. {pull}40487[40487] +- Ensure Entra ID request bodies are not truncated and trace logs are rotated before 100MB. {pull}40494[40494] +- The Elasticsearch output now correctly logs the event fields to the event log file {issue}40509[40509] {pull}40512[40512] +- Fix the "No such input type exist: 'azure-eventhub'" error on the Windows platform {issue}40608[40608] {pull}40609[40609] +- awss3 input: Fix handling of SQS notifications that don't contain a region. {pull}40628[40628] +- Fix credential handling when workload identity is being used in GCS input. {issue}39977[39977] {pull}40663[40663] +- Fix publication of group data from the Okta entity analytics provider. {pull}40681[40681] +- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730] +- Fix replace processor handling of zero string replacement validation. {pull}40751[40751] +- Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909] +- Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071] +- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] +- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] +- Log bad handshake details when websocket connection fails {pull}41300[41300] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go new file mode 100644 index 000000000000..1deaf8b07fa5 --- /dev/null +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -0,0 +1,271 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package streaming + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "math" + "math/rand/v2" + "net" + "net/http" + "strings" + "time" + + "github.com/gorilla/websocket" + "go.uber.org/zap/zapcore" + + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/elastic-agent-libs/logp" +) + +type websocketStream struct { + processor + + id string + cfg config + cursor map[string]any + + time func() time.Time +} + +// NewWebsocketFollower performs environment construction including CEL +// program and regexp compilation, and input metrics set-up for a websocket +// stream follower. +func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map[string]any, pub inputcursor.Publisher, log *logp.Logger, now func() time.Time) (StreamFollower, error) { + s := websocketStream{ + id: id, + cfg: cfg, + cursor: cursor, + processor: processor{ + ns: "websocket", + pub: pub, + log: log, + redact: cfg.Redact, + metrics: newInputMetrics(id), + }, + } + s.metrics.url.Set(cfg.URL.String()) + s.metrics.errorsTotal.Set(0) + + patterns, err := regexpsFromConfig(cfg) + if err != nil { + s.metrics.errorsTotal.Inc() + s.Close() + return nil, err + } + + s.prg, s.ast, err = newProgram(ctx, cfg.Program, root, patterns, log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.Close() + return nil, err + } + + return &s, nil +} + +// FollowStream receives, processes and publishes events from the subscribed +// websocket stream. +func (s *websocketStream) FollowStream(ctx context.Context) error { + state := s.cfg.State + if state == nil { + state = make(map[string]any) + } + if s.cursor != nil { + state["cursor"] = s.cursor + } + + // initialize the input url with the help of the url_program. + url, err := getURL(ctx, "websocket", s.cfg.URLProgram, s.cfg.URL.String(), state, s.cfg.Redact, s.log, s.now) + if err != nil { + s.metrics.errorsTotal.Inc() + return err + } + + // websocket client + c, resp, err := connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.metrics, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to establish websocket connection", "error", err) + return err + } + + // ensures this is the last connection closed when the function returns + defer func() { + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + }() + + for { + select { + case <-ctx.Done(): + s.log.Debugw("context cancelled, closing websocket connection") + return ctx.Err() + default: + _, message, err := c.ReadMessage() + if err != nil { + s.metrics.errorsTotal.Inc() + if isRetryableError(err) { + s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) + // close the old connection and reconnect + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.metrics, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to reconnect websocket connection", "error", err) + return err + } + } else { + s.log.Errorw("failed to read websocket data", "error", err) + return err + } + } + s.metrics.receivedBytesTotal.Add(uint64(len(message))) + state["response"] = message + s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) + err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to process and publish data", "error", err) + return err + } + } + } +} + +// isRetryableError checks if the error is retryable based on the error type. +func isRetryableError(err error) bool { + // check for specific network errors + var netErr *net.OpError + if errors.As(err, &netErr) { + switch { + case netErr.Op == "dial" && netErr.Err.Error() == "i/o timeout", + netErr.Op == "read" && netErr.Err.Error() == "i/o timeout", + netErr.Op == "read" && netErr.Err.Error() == "connection reset by peer", + netErr.Op == "read" && netErr.Err.Error() == "connection refused", + netErr.Op == "read" && netErr.Err.Error() == "connection reset", + netErr.Op == "read" && netErr.Err.Error() == "connection closed": + return true + } + } + + // check for specific websocket close errors + var closeErr *websocket.CloseError + if errors.As(err, &closeErr) { + switch closeErr.Code { + case websocket.CloseGoingAway, + websocket.CloseNormalClosure, + websocket.CloseInternalServerErr, + websocket.CloseTryAgainLater, + websocket.CloseServiceRestart, + websocket.CloseTLSHandshake: + return true + } + } + + // check for common error patterns + if strings.Contains(err.Error(), "timeout") || + strings.Contains(err.Error(), "connection reset") || + strings.Contains(err.Error(), "temporary failure") || + strings.Contains(err.Error(), "server is busy") { + return true + } + + return false +} + +// handleConnectionResponse logs the response body of the websocket connection. +func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *logp.Logger) { + if resp != nil && resp.Body != nil { + var buf bytes.Buffer + defer resp.Body.Close() + + if log.Core().Enabled(zapcore.DebugLevel) { + const limit = 1e4 + if _, err := io.CopyN(&buf, resp.Body, limit); err != nil && !errors.Is(err, io.EOF) { + metrics.errorsTotal.Inc() + fmt.Fprintf(&buf, "failed to read websocket response body with error: (%s) \n", err) + } + } + + // discard the remaining part of the body and check for truncation. + if n, err := io.Copy(io.Discard, resp.Body); err != nil { + metrics.errorsTotal.Inc() + fmt.Fprintf(&buf, "failed to discard remaining response body with error: (%s) ", err) + } else if n != 0 && buf.Len() != 0 { + buf.WriteString("... truncated") + } + + log.Debugw("websocket connection response", "body", &buf) + } +} + +// connectWebSocket attempts to connect to the websocket server with exponential backoff if retry config is available else it connects without retry. +func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Logger) (*websocket.Conn, *http.Response, error) { + var conn *websocket.Conn + var response *http.Response + var err error + headers := formHeader(cfg) + + if cfg.Retry != nil { + retryConfig := cfg.Retry + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { + conn, response, err = websocket.DefaultDialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + continue + } + log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt) + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) + } + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) + } + + return websocket.DefaultDialer.DialContext(ctx, url, headers) +} + +// calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm. +func calculateWaitTime(waitMin, waitMax time.Duration, attempt int) time.Duration { + // calculate exponential backoff + base := float64(waitMin) + backoff := base * math.Pow(2, float64(attempt-1)) + + // calculate jitter proportional to the backoff + maxJitter := float64(waitMax-waitMin) * math.Pow(2, float64(attempt-1)) + jitter := rand.Float64() * maxJitter + + waitTime := time.Duration(backoff + jitter) + + return waitTime +} + +// now is time.Now with a modifiable time source. +func (s *websocketStream) now() time.Time { + if s.time == nil { + return time.Now() + } + return s.time() +} + +func (s *websocketStream) Close() error { + s.metrics.Close() + return nil +}