From 03a5ea7431b8e6a600ff9af73cdd7a3546423e50 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Mon, 18 Nov 2024 09:24:06 +0100 Subject: [PATCH] Extend websocket next documentation to explain when and when not to subscribe to a Uni or Multi Fix https://github.com/quarkusio/quarkus/issues/42566 --- .../asciidoc/websockets-next-reference.adoc | 96 +++++++++++++++++-- 1 file changed, 90 insertions(+), 6 deletions(-) diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index b343654c44e07..937079b1e99b1 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -267,7 +267,29 @@ However, it may also accept the following parameters: The message object represents the data sent and can be accessed as either raw content (`String`, `JsonObject`, `JsonArray`, `Buffer` or `byte[]`) or deserialized high-level objects, which is the recommended approach. When receiving a `Multi`, the method is invoked once per connection, and the provided `Multi` receives the items transmitted by this connection. -The method must subscribe to the `Multi` to receive these items (or return a Multi). +If the method returns a `Multi` (constructed from the received one), Quarkus will automatically subscribe to it and write the emitted items until completion, failure, or cancellation. +However, if your method does not return a `Multi`, you must subscribe to the incoming `Multi` to consume the data. + +Here are two examples: + +[source,java] +---- +// No need to subscribe to the incoming Multi as the method returns a Multi derived from the incoming one +@OnTextMessage +public Multi stream(Multi incoming) { + return incoming.log(); +} + +// ... + +// Must subscribe to the incoming Multi as the method does not return a Multi, otherwise no data will be consumed +@OnTextMessage +public void stream(Multi incoming) { + incoming.subscribe().with(item -> log(item)); +} +---- + +See <> to learn more about subscribing to the incoming `Multi`. ==== Supported return types @@ -319,7 +341,9 @@ Multi stream(Message m) { } ---- -When returning a Multi, Quarkus subscribes to the returned Multi automatically and writes the emitted items until completion, failure, or cancellation. Failure or cancellation terminates the connection. +Methods returning `Uni` and `Multi` are considered non-blocking. +In addition, Quarkus automatically subscribes to the returned `Multi` / `Uni` and writes the emitted items until completion, failure, or cancellation. +Failure or cancellation terminates the connection. ==== Streams @@ -340,7 +364,8 @@ public Multi stream(Multi incoming) { This approach allows bi-directional streaming. -When the method returns `void`, it must subscribe to the incoming `Multi`: +When the method returns `void`, and so does not return a `Multi`, the code must subscribe to the incoming `Multi`. +Otherwise, no data will be consumed, and the connection will not be closed: [source, java] ---- @@ -350,6 +375,10 @@ public void stream(Multi incoming) { } ---- +Also note that the `stream` method will complete before the `Multi` completes. + +See <> to learn more about subscribing to the incoming `Multi`. + ==== Skipping reply When a method is intended to produce a message written to the client, it can emit `null`. @@ -636,7 +665,19 @@ String param = connection.pathParam("foo"); The `WebSocketConnection` provides both a blocking and a non-blocking method variants to send messages: - `sendTextAndAwait(String message)`: Sends a text message to the client and waits for the message to be sent. It's blocking and should only be called from an executor thread. -- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking, but you must subscribe to it. +- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking. Make sure you or Quarkus subscribes to the returned `Uni` to send the message. +If you return the `Uni` from a method invoked by Quarkus (like with Quarkus REST, Quarkus WebSocket Next or Quarkus Messaging), it will subscribe to it and send the message. +For example: + +[source,java] +---- +@POST +public Uni send() { + return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message. +} +---- + +See <> to learn more about subscribing to the `Uni`. [[list-open-connections]] ==== List open connections @@ -655,7 +696,7 @@ class MyBean { OpenConnections connections; void logAllOpenConnections() { - Log.infof("Open connections: %s", connections.listAll()); <1> + Log.infof("Open connections: %s", connections.listAll()); // <1> } } ---- @@ -1078,7 +1119,17 @@ String param = connection.pathParam("foo"); The `WebSocketClientConnection` provides both a blocking and a non-blocking method variants to send messages: - `sendTextAndAwait(String message)`: Sends a text message to the client and waits for the message to be sent. It's blocking and should only be called from an executor thread. -- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking, but you must subscribe to it. +- `sendText(String message)`: Sends a text message to the client. It returns a `Uni`. It's non-blocking. Make sure you or Quarkus subscribes to the returned `Uni` to send the message. +If you return the `Uni` from a method invoked by Quarkus (like with Quarkus REST, Quarkus WebSocket Next or Quarkus Messaging), it will subscribe to it and send the message. +For example: + +[source,java] +---- +@POST +public Uni send() { + return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message. +} +---- [[list-open-client-connections]] ==== List open client connections @@ -1203,6 +1254,39 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3> <2> Set the number of characters of a text message payload which will be logged. <3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`. +[[subscribe-or-not-subscribe]] +== When to subscribe to a `Uni` or `Multi` + +`Uni` and `Multi` are lazy types, which means that they do not start processing until they are subscribed to. + +When you get (from a parameter or from a method you called) a `Uni` or a `Multi`, whether you should subscribe to it depends on the context: + +- if you return the `Uni` or `Multi` in a method invoked by Quarkus (like with Quarkus REST, Quarkus WebSocket Next or Quarkus Messaging), Quarkus subscribes to it and processes the items emitted by the `Multi` or the item emitted by the `Uni`: + +[source, java] +---- +@Incoming("...") +@Outgoing("...") +public Multi process(Multi input) { + // No need to subscribe to the input Multi, the `process` method is called by Quarkus (Messaging). + return input.map(String::toUpperCase); +} +---- + +When a `Uni` or `Multi` is returned from a method annotated with `@OnOpen`, `@OnTextMessage`, `@OnBinaryMessage`, or `@OnClose`, Quarkus subscribes to it automatically. + +- if you do not return the `Uni` or `Multi` in a method invoked by Quarkus, you should subscribe to it: + +[source, java] +---- +@Incoming("...") +@Outgoing("...") +public void process(Multi input) { + input.map(String::toUpperCase) + .subscribe().with(s -> log(s)); +} +---- + [[telemetry]] == Telemetry