Skip to content

Commit

Permalink
WebSockets Next: provide strategies to process unhandled failures
Browse files Browse the repository at this point in the history
- resolves quarkusio#40648
- also add WebSocketConnection#closeReason() and
  WebSocketClientConnection#closeReason()
  • Loading branch information
mkouba committed May 15, 2024
1 parent faf6c83 commit 2880cb8
Show file tree
Hide file tree
Showing 24 changed files with 603 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.websockets.next.test.client;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocketClient;

@WebSocketClient(path = "/endpoint")
public class ClientMessageErrorEndpoint {

static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1);

static final List<String> MESSAGES = new CopyOnWriteArrayList<>();

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnTextMessage
void message(String message) {
if ("foo".equals(message)) {
throw new IllegalStateException("I cannot do it!");
} else {
MESSAGES.add(message);
}
MESSAGE_LATCH.countDown();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.websockets.next.test.client;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocketClient;

@WebSocketClient(path = "/endpoint")
public class ClientOpenErrorEndpoint {

static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1);

static final List<String> MESSAGES = new CopyOnWriteArrayList<>();

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnOpen
void open() {
throw new IllegalStateException("I cannot do it!");
}

@OnTextMessage
void message(String message) {
MESSAGES.add(message);
MESSAGE_LATCH.countDown();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.websockets.next.test.client;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/endpoint")
public class ServerEndpoint {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnTextMessage
String echo(String message) {
return message;
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledMessageFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientMessageErrorEndpoint.class);
});

@Inject
WebSocketConnector<ClientMessageErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(ClientMessageErrorEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(connection.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), connection.closeReason().getCode());
assertTrue(ClientMessageErrorEndpoint.MESSAGES.isEmpty());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledMessageFailureLogStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientMessageErrorEndpoint.class);
}).overrideConfigKey("quarkus.websockets-next.client.unhandled-failure-strategy", "log");

@Inject
WebSocketConnector<ClientMessageErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertFalse(connection.isClosed());
connection.sendText("bar");
assertTrue(ClientMessageErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("bar", ClientMessageErrorEndpoint.MESSAGES.get(0));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledOpenFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientOpenErrorEndpoint.class);
});

@Inject
WebSocketConnector<ClientOpenErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(ClientOpenErrorEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(connection.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), connection.closeReason().getCode());
assertTrue(ClientOpenErrorEndpoint.MESSAGES.isEmpty());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.websockets.next.test.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;

public class UnhandledOpenFailureLogStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(ServerEndpoint.class, ClientOpenErrorEndpoint.class);
}).overrideConfigKey("quarkus.websockets-next.client.unhandled-failure-strategy", "log");

@Inject
WebSocketConnector<ClientOpenErrorEndpoint> connector;

@TestHTTPResource("/")
URI testUri;

@Test
void testError() throws InterruptedException {
WebSocketClientConnection connection = connector
.baseUri(testUri)
.connectAndAwait();
connection.sendTextAndAwait("foo");
assertFalse(connection.isClosed());
assertNull(connection.closeReason());
assertTrue(ClientOpenErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("foo", ClientOpenErrorEndpoint.MESSAGES.get(0));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.websockets.next.test.errors;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/echo")
public class EchoMessageError {

static final CountDownLatch MESSAGE_FAILURE_CALLED = new CountDownLatch(1);

@OnTextMessage
String echo(String message) {
if ("foo".equals(message)) {
MESSAGE_FAILURE_CALLED.countDown();
throw new IllegalStateException("I cannot do it!");
} else {
return message;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.websockets.next.test.errors;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/echo")
public class EchoOpenError {

static final CountDownLatch OPEN_CALLED = new CountDownLatch(1);

@OnOpen
void open() {
OPEN_CALLED.countDown();
throw new IllegalStateException("I cannot do it!");
}

@OnTextMessage
String echo(String message) {
return message;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class UnhandledMessageFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(EchoMessageError.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() throws InterruptedException {
try (WSClient client = WSClient.create(vertx).connect(testUri)) {
client.sendAndAwait("foo");
assertTrue(EchoMessageError.MESSAGE_FAILURE_CALLED.await(5, TimeUnit.SECONDS));
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode());
}
}

}
Loading

0 comments on commit 2880cb8

Please sign in to comment.