-
Notifications
You must be signed in to change notification settings - Fork 566
WebSockets Support: Tyrus Integration
This document describes a prototype integration of Tyrus and Helidon.
Tyrus is the reference implementation of JSR 356: Java API for WebSocket. It provides both a client and a server API to communicate using WebSockets for Java. In addition, the JSR defines an SPI to integrate implementations with HTTP containers/connectors. This document focuses on the server API part of JSR 356.
A WebSocket connection starts as a normal HTTP connection that is subsequently upgraded to use WebSockets, essentially turning an HTTP connection into a plain TCP connection. JSR 356 provides an API to not only send and receive messages over a WebSocket connection but also an API to intercept an HTTP upgrade request and potentially alter its outcome --all JSR implementations provide a default upgrade handler.
Yes, Netty has support for WebSockets. However, there is an overlap between this support in Netty and what Tyrus provides. For example, both the Netty's handlers and Tyrus provide support for protocol upgrades and both handle codecs for different types of messages.
In order to support JSR 356 in Helidon, we need to limit Netty's involvement in handling connections to a minimum and let Tyrus do its work. In particular, we need protocol upgrades to be handled by Tyrus (or the application handler) and for Netty to simply (and opaquely!) relay WebSocket messages to connection peers.
A connection upgrade starts as a regular HTTP request with some special headers. As stated above, this request must be forwarded to Tyrus so that it can participate in the upgrade process: this is accomplished by setting up the regular Netty pipeline without any special handlers for WebSockets at this time.
The response to this initial upgrade request needs to be intercepted by Helidon in order to detect a successful upgrade, in which case, Netty's pipeline needs to be modified so that WebSocket messages can be relayed over the now upgraded connection. In particular, HTTP handlers in Netty's pipeline need to be replaced by handlers capable of delivering WebSocket messages.
After a connection is upgraded, WebSocket messages are allowed to flow over a full-duplex channel. These messages are intended to be processed by Tyrus and must not be interpreted by Netty. After updating the Netty pipeline accordingly, Helidon's implementation of Tyrus' SPI can register a Flow.Publisher
for server-to-client messages and a Flow.Subscriber
for client-to-server ones. This is the responsibility of the TyrusSupport
class --akin to JerseySupport
.
Any messages received by the registered Flow.Subscriber
are forwarded to the SPI's ReadHandler
for processing; conversely, any messages written to the SPI's Writer
are forwarded to the registered Flow.Publisher
for delivery.
Publishers and subscribers in Helidon use the type DataChunk
. In order to support JSR 356's CompletionHandler
, DataChunk
needs to be extended with a write listener. This listener must invoke the completion handler when the data is successfully written to the connection (or an error occurs). Each message sent from the server provides an instance of CompletionHandler
--in fact, only when the handler is called is Tyrus allowed to re-use its byte buffers.
Outline of changes to DataChunk
:
new DataChunk() {
private boolean isReleased = false;
private CompletableFuture<DataChunk> writeFuture; // write listener
// ...
@Override
public void writeFuture(CompletableFuture<DataChunk> writeFuture) {
this.writeFuture = writeFuture;
}
@Override
public Optional<CompletableFuture<DataChunk>> writeFuture() {
return Optional.ofNullable(writeFuture);
}
};
webServer = WebServer.create(builder.build(),
Routing.builder().register("/tyrus",
TyrusSupport.builder().register(EchoEndpoint.class).build()));
@ServerEndpoint(
value = "/echo",
encoders = { UppercaseCodec.class },
decoders = { UppercaseCodec.class },
configurator = ServerConfigurator.class
)
public class EchoEndpoint {
private static final Logger LOGGER = Logger.getLogger(
EchoEndpoint.class.getName());
@OnOpen
public void onOpen(Session session) throws IOException {
LOGGER.info("OnOpen called");
}
@OnMessage
public void echo(Session session, String message) throws IOException {
LOGGER.info("OnMessage called with '" + message + "'");
session.getBasicRemote().sendText(message);
}
@OnError
public void onError(Throwable t) {
LOGGER.info("OnError called");
}
@OnClose
public void onClose() {
LOGGER.info("OnClose called");
}
}
-
Sessions
-
WebSocket client API/SPI
-
Tracing
-
Security and TLS