Skip to content

Commit

Permalink
Merge pull request quarkusio#41348 from mkouba/issue-41333
Browse files Browse the repository at this point in the history
WebSockets Next: fire CDI events for client connections added/removed
  • Loading branch information
mkouba authored Jun 21, 2024
2 parents fab0ec5 + 30e3561 commit 2f41fac
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 2 deletions.
22 changes: 22 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ class MyBean {
There are also other convenient methods.
For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint.

[[server-cdi-events]]
==== CDI events

Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.Open` asynchronously when a new connection is opened.
Expand Down Expand Up @@ -938,6 +939,27 @@ class MyBean {
There are also other convenient methods.
For example, `OpenClientConnections#findByClientId(String)` makes it easy to find connections for a specific endpoint.

[[client-cdi-events]]
==== CDI events

Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketClientConnection` with qualifier `@io.quarkus.websockets.next.Open` asynchronously when a new connection is opened.
Moreover, a CDI event of type `WebSocketClientConnection` with qualifier `@io.quarkus.websockets.next.Closed` is fired asynchronously when a connection is closed.

[source, java]
----
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketClientConnection;
class MyBean {
void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { <1>
// This observer method is called when a connection is opened...
}
}
----
<1> An asynchronous observer method is executed using the default blocking executor service.

=== Configuring SSL/TLS

To establish a TLS connection, you need to configure a _named_ configuration using the xref:./tls-registry-reference.adoc[TLS registry]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.event.ObservesAsync;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.Closed;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketClient;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;
import io.quarkus.websockets.next.test.utils.WSClient;

public class ClientConnectionEventsTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class);
});

@TestHTTPResource("/")
URI baseUri;

@Inject
WebSocketConnector<EndpointClient> connector;

@Test
void testEvents() throws Exception {
// Open connection, EndpointClient sends a message with client connection id
WebSocketClientConnection connection = connector
.baseUri(baseUri)
.connectAndAwait();
// Wait for the message
assertTrue(Endpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
// Assert the @Open event was fired
assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.OPEN_CONN.get());
assertEquals(connection.id(), ObservingBean.OPEN_CONN.get().id());
assertEquals(connection.id(), Endpoint.MESSAGE.get());
// Close the connection
connection.closeAndAwait();
assertTrue(EndpointClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
// Assert the @Closed event was fired
assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.CLOSED_CONN.get());
assertEquals(connection.id(), ObservingBean.CLOSED_CONN.get().id());
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

static final AtomicReference<String> MESSAGE = new AtomicReference<>();

static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1);

@OnTextMessage
void message(String message) {
MESSAGE.set(message);
MESSAGE_LATCH.countDown();
}

}

@WebSocketClient(path = "/endpoint")
public static class EndpointClient {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnOpen
String open(WebSocketClientConnection connection) {
return connection.id();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}

@Singleton
public static class ObservingBean {

static final CountDownLatch OPEN_LATCH = new CountDownLatch(1);
static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

static final AtomicReference<WebSocketClientConnection> OPEN_CONN = new AtomicReference<>();
static final AtomicReference<WebSocketClientConnection> CLOSED_CONN = new AtomicReference<>();

void onOpen(@ObservesAsync @Open WebSocketClientConnection connection) {
OPEN_CONN.set(connection);
OPEN_LATCH.countDown();
}

void onClose(@ObservesAsync @Closed WebSocketClientConnection connection) {
CLOSED_CONN.set(connection);
CLOSED_LATCH.countDown();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed.
* This qualifier is used for CDI events fired asynchronously when a WebSocket connection is closed.
* <p>
* The payload is {@link WebSocketConnection} for server connections and {@link WebSocketClientConnection} for client
* connections.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened.
* This qualifier is used for CDI events fired asynchronously when a new WebSocket connection is opened.
* <p>
* The payload is {@link WebSocketConnection} for server connections and {@link WebSocketClientConnection} for client
* connections.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
import java.util.stream.Stream;

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.event.Event;
import jakarta.inject.Singleton;

import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.websockets.next.Closed;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.OpenClientConnections;
import io.quarkus.websockets.next.WebSocketClientConnection;

Expand All @@ -25,6 +30,19 @@ public class ClientConnectionManager implements OpenClientConnections {

private final List<ClientConnectionListener> listeners = new CopyOnWriteArrayList<>();

private final Event<WebSocketClientConnection> openEvent;
private final Event<WebSocketClientConnection> closedEvent;

ClientConnectionManager(@Open Event<WebSocketClientConnection> openEvent,
@Closed Event<WebSocketClientConnection> closedEvent) {
ArcContainer container = Arc.container();
this.openEvent = container.resolveObserverMethods(WebSocketClientConnection.class, Open.Literal.INSTANCE).isEmpty()
? null
: openEvent;
this.closedEvent = container.resolveObserverMethods(WebSocketClientConnection.class, Closed.Literal.INSTANCE)
.isEmpty() ? null : closedEvent;
}

@Override
public Iterator<WebSocketClientConnection> iterator() {
return stream().iterator();
Expand All @@ -38,6 +56,9 @@ public Stream<WebSocketClientConnection> stream() {
void add(String endpoint, WebSocketClientConnection connection) {
LOG.debugf("Add client connection: %s", connection);
if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) {
if (openEvent != null) {
openEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ClientConnectionListener listener : listeners) {
try {
Expand All @@ -56,6 +77,9 @@ void remove(String endpoint, WebSocketClientConnection connection) {
Set<WebSocketClientConnection> connections = endpointToConnections.get(endpoint);
if (connections != null) {
if (connections.remove(connection)) {
if (closedEvent != null) {
closedEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ClientConnectionListener listener : listeners) {
try {
Expand Down

0 comments on commit 2f41fac

Please sign in to comment.