Skip to content

Commit

Permalink
WebSockets Next: provide strategies to process unhandled failures
Browse files Browse the repository at this point in the history
- resolves quarkusio#40648
  • Loading branch information
mkouba committed May 15, 2024
1 parent 9726d32 commit 4479a98
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.websockets.next.test.errors;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/echo")
public class EchoMessageError {

static final CountDownLatch MESSAGE_CALLED = new CountDownLatch(1);

@OnTextMessage
String echo(String message) {
if ("foo".equals(message)) {
MESSAGE_CALLED.countDown();
throw new IllegalStateException("I cannot do it!");
} else {
return message;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.websockets.next.test.errors;

import java.util.concurrent.CountDownLatch;

import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/echo")
public class EchoOpenError {

static final CountDownLatch OPEN_CALLED = new CountDownLatch(1);

@OnOpen
void open() {
OPEN_CALLED.countDown();
throw new IllegalStateException("I cannot do it!");
}

@OnTextMessage
String echo(String message) {
return message;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class UnhandledMessageFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(EchoMessageError.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() throws InterruptedException {
try (WSClient client = WSClient.create(vertx).connect(testUri)) {
client.sendAndAwait("foo");
assertTrue(EchoMessageError.MESSAGE_CALLED.await(5, TimeUnit.SECONDS));
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class UnhandledMessageFailureLogStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(EchoMessageError.class, WSClient.class);
}).overrideConfigKey("quarkus.websockets-next.server.unhandled-failure-strategy", "log");

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testErrorDoesNotCloseConnection() throws InterruptedException {
try (WSClient client = WSClient.create(vertx).connect(testUri)) {
client.sendAndAwait("foo");
assertTrue(EchoMessageError.MESSAGE_CALLED.await(5, TimeUnit.SECONDS));
client.sendAndAwait("bar");
client.waitForMessages(1);
assertEquals("bar", client.getLastMessage().toString());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class UnhandledOpenFailureDefaultStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(EchoOpenError.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testError() throws InterruptedException {
try (WSClient client = WSClient.create(vertx).connect(testUri)) {
assertTrue(EchoOpenError.OPEN_CALLED.await(5, TimeUnit.SECONDS));
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed());
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.websockets.next.test.errors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class UnhandledOpenFailureLogStrategyTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(EchoOpenError.class, WSClient.class);
}).overrideConfigKey("quarkus.websockets-next.server.unhandled-failure-strategy", "log");

@Inject
Vertx vertx;

@TestHTTPResource("echo")
URI testUri;

@Test
void testErrorDoesNotCloseConnection() throws InterruptedException {
try (WSClient client = WSClient.create(vertx).connect(testUri)) {
assertTrue(EchoOpenError.OPEN_CALLED.await(5, TimeUnit.SECONDS));
client.sendAndAwait("foo");
client.waitForMessages(1);
assertEquals("foo", client.getLastMessage().toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public boolean isClosed() {
return socket.get().isClosed();
}

public int closeStatusCode() {
return socket.get().closeStatusCode();
}

@Override
public void close() {
disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class CloseReason {

public static final CloseReason NORMAL = new CloseReason(WebSocketCloseStatus.NORMAL_CLOSURE.code());

public static final CloseReason INTERNAL_SERVER_ERROR = new CloseReason(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code());

private final int code;

private final String message;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.websockets.next;

/**
* The strategy used when an error occurs but no error handler can handle the failure.
*/
public enum UnhandledFailureStrategy {
/**
* Close the connection.
*/
CLOSE,
/**
* Log an error message.
*/
LOG,
/**
* No operation.
*/
NOOP;

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ public interface WebSocketsClientRuntimeConfig {
*/
Optional<Duration> autoPingInterval();

/**
* The strategy used when an error occurs but no error handler can handle the failure.
* <p>
* By default, the connection is closed when an unhandled failure occurs.
*/
@WithDefault("close")
UnhandledFailureStrategy unhandledFailureStrategy();

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,12 @@ public interface WebSocketsServerRuntimeConfig {
*/
Optional<Duration> autoPingInterval();

/**
* The strategy used when an error occurs but no error handler can handle the failure.
* <p>
* By default, the connection is closed when an unhandled failure occurs.
*/
@WithDefault("close")
UnhandledFailureStrategy unhandledFailureStrategy();

}
Loading

0 comments on commit 4479a98

Please sign in to comment.