Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][streaming] - Fix for streaming input handling of invalid or empty websocket messages #42036

Merged
merged 3 commits into from
Dec 16, 2024

Conversation

ShourieG
Copy link
Contributor

@ShourieG ShourieG commented Dec 13, 2024

Type of change

  • Bug

Proposed commit message

Fix for streaming input handling of invalid or empty websocket messages.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Disruptive User Impact

None

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

@ShourieG ShourieG requested a review from a team as a code owner December 13, 2024 17:24
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
Copy link
Contributor

mergify bot commented Dec 13, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @ShourieG? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit

Copy link
Contributor

mergify bot commented Dec 13, 2024

backport-8.x has been added to help with the transition to the new branch 8.x.
If you don't need it please use backport-skip label and remove the backport-8.x label.

@mergify mergify bot added the backport-8.x Automated backport to the 8.x branch with mergify label Dec 13, 2024
@ShourieG ShourieG added the Team:Security-Service Integrations Security Service Integrations Team label Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label bugfix labels Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label input:streaming labels Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@ShourieG ShourieG added needs_team Indicates that the issue/PR needs a Team:* label backport-8.16 Automated backport with mergify backport-8.17 Automated backport with mergify labels Dec 13, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 13, 2024
@botelastic
Copy link

botelastic bot commented Dec 13, 2024

This pull request doesn't have a Team:<team> label.

Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the proposed commit message needs a clearer explanation of what is being fixed; it's not the situation that's being fixed, it's the response to the situation, so describe that and say what the behaviour is and then how that is rectified.

@@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Fixed a scenario in the streaming input where CEL would process invalid/empty messages in case of a websocket error. {pull}42036[42036]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]

@@ -116,6 +116,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
return ctx.Err()
default:
_, message, err := c.ReadMessage()
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move this up? Is a message valid when err is non-nil?

Copy link
Contributor Author

@ShourieG ShourieG Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still count the received bytes irrespective of whether it's a valid message or not, since this is a metric ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on what the understood semantics of the metric are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is not a core part of the change for this PR, I've reverted this now, will take a look at it once more when I do some cleanups.

Comment on lines 141 to 149
} else {
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}
continue
}
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to process and publish data", "error", err)
return err
}

but why does the error case not return if !isRetryableError(err)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efd6, It does return, the issue is when it is a RetryableError. In this scenario the message is still passed to CEL during the retry attempts. So if the 3rd attempt is successful the 1st 2 invalid messages are still processed by CEL leading to downstream errors in integration pipelines because the event itself might be malformed or not present. Also this is just unnecessary processing that can be avoided.

Copy link
Contributor Author

@ShourieG ShourieG Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I felt that an else block had better readability in this case since we already have a bunch of returns within the "if".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does return, the issue is when it is a RetryableError

Sorry, you are correct.

Suggest then

			if err != nil {
				s.metrics.errorsTotal.Inc()
				if !isRetryableError(err) {
					s.log.Errorw("failed to read websocket data", "error", err)
					return err
				}
				s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err)
				// close the old connection and reconnect
				if err := c.Close(); err != nil {
					s.metrics.errorsTotal.Inc()
					s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
				}
				// since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it
				c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log)
				handleConnectionResponse(resp, s.metrics, s.log)
				if err != nil {
					s.metrics.errorsTotal.Inc()
					s.log.Errorw("failed to reconnect websocket connection", "error", err)
					return err
				}
				continue
			}
			state["response"] = message
			s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
			err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
			if err != nil {
				s.metrics.errorsTotal.Inc()
				s.log.Errorw("failed to process and publish data", "error", err)
				return err
			}

Note also that the debugw call with the message allocates non-conditionally due to the string conversion. This can be avoided by providing a lazy fmt.Stringer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll keep the fmt.Stringer change for a later cleanup PR then.

Copy link
Contributor

@efd6 efd6 Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type is

type bytesStringer []byte

func (b bytesStringer) String() string { return string(b) }

with

s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", bytesStringer(message))

@ShourieG ShourieG changed the title [filebeat][streaming] - Fixed a scenario where CEL would process invalid/empty messages in case of a websocket error [filebeat][streaming] - Fix for streaming input handling of invalid or empty websocket messages Dec 16, 2024
@ShourieG
Copy link
Contributor Author

@efd6, updated the PR

@ShourieG ShourieG merged commit d508a40 into elastic:main Dec 16, 2024
22 checks passed
@ShourieG ShourieG deleted the bugfix/websocket_retry branch December 16, 2024 09:15
mergify bot pushed a commit that referenced this pull request Dec 16, 2024
…r empty websocket messages (#42036)

* Fix for streaming input handling of invalid or empty websocket messages

(cherry picked from commit d508a40)
mergify bot pushed a commit that referenced this pull request Dec 16, 2024
…r empty websocket messages (#42036)

* Fix for streaming input handling of invalid or empty websocket messages

(cherry picked from commit d508a40)
mergify bot pushed a commit that referenced this pull request Dec 16, 2024
…r empty websocket messages (#42036)

* Fix for streaming input handling of invalid or empty websocket messages

(cherry picked from commit d508a40)
ShourieG added a commit that referenced this pull request Dec 16, 2024
…r empty websocket messages (#42036) (#42048)

* Fix for streaming input handling of invalid or empty websocket messages

(cherry picked from commit d508a40)

Co-authored-by: ShourieG <shourie.ganguly@elastic.co>
ShourieG added a commit that referenced this pull request Dec 17, 2024
…ut handling of invalid or empty websocket messages (#42049)

* [filebeat][streaming] - Fix for streaming input handling of invalid or empty websocket messages (#42036)

* Fix for streaming input handling of invalid or empty websocket messages

(cherry picked from commit d508a40)

* Update CHANGELOG.next.asciidoc

---------

Co-authored-by: ShourieG <shourie.ganguly@elastic.co>
ShourieG pushed a commit that referenced this pull request Dec 18, 2024
…ut handling of invalid or empty websocket messages (#42047)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-8.x Automated backport to the 8.x branch with mergify backport-8.16 Automated backport with mergify backport-8.17 Automated backport with mergify bugfix input:streaming Team:Security-Service Integrations Security Service Integrations Team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants