Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket stream failure on receive/send timeout (network lost detection) #4289

Merged
merged 7 commits into from
Aug 24, 2023

Conversation

gawronA
Copy link
Contributor

@gawronA gawronA commented Jul 28, 2023

Hello all,

I would like to contribute to Akka project by providing a feature that allows to catch websocket disconnections due to network unavailability.
I have introduced two configuration parameters for websocket client:

akka.http.client.websocket.receive-idle-timeout
akka.http.client.websocket.send-idle-timeout

and server:

akka.http.server.websocket.receive-idle-timeout
akka.http.server.websocket.send-idle-timeout

that can be configured together with akka.http.{client, server}.periodic-keep-alive-max-idle to detect lost network connection.

The working principle is simple: if elements are not send or received (including ping/pong frames), the streams are failed with
akka.stream.StreamIdleTimeoutException

The default config values are infinite which means that this feature is disabled.

I have provided two new test modules: WebSocketServerReceiveIdleTimeoutSpec.scala and WebSocketServerSendIdleTimeoutSpec.scala because I couldn't find out how config overrides can be injected upon server creation (they are always taken from ActorSystem settings). If anyone have a better idea how this can be resolved, I'm open to proposals.

Thanks!

References #2419

* Add receiveIdleTimeout and sendIdleTimeout config parameters to websocket client and server
* Add idleTimeout stack layer
* Add integration tests
@lightbend-cla-validator

Hi @gawronA,

Thank you for your contribution! We really value the time you've taken to put this together.

Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement:

https://www.lightbend.com/contribute/cla

@gawronA
Copy link
Contributor Author

gawronA commented Jul 28, 2023

CLA signed.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a nice/useful feature, impl looks pretty good, I left some suggestions.

* Use BidiFlow.identity for default case instead of BidiFlow constructed from two identity Flows
* Remove redundant maps
* Add exception class matchers in tests
@gawronA
Copy link
Contributor Author

gawronA commented Aug 15, 2023

@johanandren thank you for the review. I applied your suggestions.
Meanwhile I've found one issue with a case where akka.http.client.websocket.send-idle-timeout is set to non-infinite value and timeout is issued. Then the exception class that we get is akka.stream.scaladsl.TcpIdleTimeoutException instead of akka.stream.StreamIdleTimeoutException. This is caused by Akka stream tcp stage where any exceptions derived from java.util.concurrent.TimeoutException (including akka.stream.StreamIdleTimeoutException) are mapped to TcpIdleTimeoutException.
My proposals to resolve it:

  1. Intentionally disable idle-timeout handling by tcp layer and prepare implementation on http layer like in http2 server
  2. Leave it like that and properly document the case that when akka.http.client.websocket.send-idle-timeout is configured, the handler stream will be failed with akka.stream.scaladsl.TcpIdleTimeoutException
  3. Remove the akka.http.client.websocket.send-idle-timeout config parameter and related functionality

Also feel free to directly push whatever headers are needed to WebSocketServerReceiveIdleTimeoutSpec.scala and WebScoketServerSendIdleTimeoutSpec.scala.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do a sbt "headerCreateAll; compile" to get the headers added and the code formatted and check in the result.

I think the TCP stage turning any stream idle timeout to TCP timeout is unfortunate but something that we could possibly fix in Akka core instead of doing anything about here.

Thread.sleep(200)
}, Duration.apply(3, TimeUnit.SECONDS))
x
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace the in-stream-blocking map+sleep in the tests with .delay(200.millis)

@gawronA
Copy link
Contributor Author

gawronA commented Aug 18, 2023

In the tests I want to simulate constant flow of messages in one direction and ensure that we still get timeout when messages are not sent in opposite direction.
I managed to replace the in-stream-blocking map with .delay(200.mililis) in send-idle-timeout test, but when I also changed it in receive-idle-timeout test it failed cause the stream wasn't time-outed. This is something I didn't expect to happen and investigated that this piece of code:

val future = Source(1 to 10).map(_ => TextMessage("dummy"))
  .map(x => {
    Await.result(Future {
      Thread.sleep(200)
    }, 3.seconds)
    x
  })
  .via(Http().webSocketClientFlow(
    WebSocketRequest("ws://127.0.01:" + myPort),
    settings = ClientConnectionSettings("akka.http.client.websocket.receive-idle-timeout = 1s"))
  ).toMat(Sink.foreach(println))(Keep.right).run()

produces no output - expected. However, this piece of code (map replaced with delay):

val future = Source(1 to 10).map(_ => TextMessage("dummy"))
  .delay(300.millis, DelayOverflowStrategy.backpressure).addAttributes(Attributes.inputBuffer(1, 1))
  .via(Http().webSocketClientFlow(
    WebSocketRequest("ws://127.0.01:" + myPort),
    settings = ClientConnectionSettings("akka.http.client.websocket.receive-idle-timeout = 1s"))
  ).toMat(Sink.foreach(println))(Keep.right).run()

produces:

TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)
TextMessage.Strict(dummy)

which is quite unexpected as the .delay operator somehow passes elements through the websocket flow (as if they were received from server) and therefore the timeout don't happen.

I may be wrong but in my opinion the .delay operator or inputBuffer attribute somehow interfere websocket flow.


"fail the materialized future with akka.stream.StreamIdleTimeoutException if elements are not received within receive-idle-timeout" in {
val bindingFuture = Http().newServerAt("localhost", 0).bindSync({
_.attribute(webSocketUpgrade).get.handleMessages(Flow.apply, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flow.apply == Flow.identity so this is an echo-server, the test expects it to be a server that does not emit anything, something like Flow.fromSinkAndSourceCoupled(Sink.ignore, Source.maybe[Message]) could work in a test (in real app you'd need to deal with text/binary, and also consume streaming but here we know it will be strict)

@johanandren
Copy link
Member

johanandren commented Aug 21, 2023

In general:

With a Thread.sleep you are effectively blocking the entire thread and Akka stream that handles the tcp socket, while .delay just simulates async-busy-work by letting the stream do other things.

Other would be collecting those TextMessages in a buffer for example. There is however no async boundary between source and delay, so there is no buffer introduced and .addAttributes(Attributes.inputBuffer(1, 1)) does not actually do anything.

Blocking a stream without async boundaries should also mean that the idle timeout operators are blocked from doing anything until that blocking stops, that interferes with what we want to test here.

@gawronA
Copy link
Contributor Author

gawronA commented Aug 21, 2023

Ahh of course Flow.apply was the cause. I completely overlooked that one. Thank you for pointing it out :)

Other would be collecting those TextMessages in a buffer for example. There is however no async boundary between source and delay, so there is no buffer introduced and .addAttributes(Attributes.inputBuffer(1, 1)) does not actually do anything.

I would like to leave .addAttributes(Attributes.inputBuffer(1, 1)) as this causes the source to emit each element with a delay of 200ms. Removing it results in emitting all elements after the delay. The first case better reflects real world scenario.

@johanandren
Copy link
Member

Right, delay has an internal buffer, so it is not about the async boundary buffers, I didn't remember that from the top of my head. Fine to keep.

val myPort = binding.localAddress.getPort

Source(1 to 10).map(_ => {
Await.result(Future(Thread.sleep(200)), Duration(3, TimeUnit.SECONDS))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A final blocking sleep to get rid of, then I think this PR is good to go

* replace stream-blocking Await with .delay operator in websocket server timeout tests
* add outgoing message stream from server in receive-idle-timeout test
* replace printing source with ignoring source in receive-idle-timeout server test
Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks, good work!

@johanandren johanandren merged commit 77c29a8 into akka:main Aug 24, 2023
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants