Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets Next: add rule for Transactional annotation #44498

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,23 @@ Method receiving messages from the client are annotated with `@OnTextMessage` or

==== Invocation rules

When invoking these annotated methods, the _session_ scope linked to the WebSocket connection remains active.
When invoking the callback methods, the _session_ scope linked to the WebSocket connection remains active.
In addition, the request scope is active until the completion of the method (or until it produces its result for async and reactive methods).

Quarkus WebSocket Next supports _blocking_ and _non-blocking_ logic, akin to Quarkus REST, determined by the method signature and additional annotations such as `@Blocking` and `@NonBlocking`.
WebSocket Next supports _blocking_ and _non-blocking_ logic, akin to Quarkus REST, determined from the return type of the method and additional annotations such as `@Blocking` and `@NonBlocking`.

Here are the rules governing execution:

* Non-blocking methods must execute on the connection's event loop.
* Methods annotated with `@RunOnVirtualThread` are considered blocking and should execute on a virtual thread.
* Blocking methods must execute on a worker thread if not annotated with `@RunOnVirtualThread`.
* When `@RunOnVirtualThread` is employed, each invocation spawns a new virtual thread.
* Methods returning `CompletionStage`, `Uni` and `Multi` are considered non-blocking.
* Methods returning `void` or plain objects are considered blocking.
* Kotlin `suspend` functions are considered non-blocking.
* Methods annotated with `@RunOnVirtualThread`, `@Blocking` or `@Transactional` are considered blocking.
* Methods annotated with `@NonBlocking` are considered non-blocking.
* Methods declared on a class annotated with `@Transactional` are considered blocking unless annotated with `@NonBlocking`.
* If the method does not declare any of the annotations listed above the execution model is derived from the return type:
** Methods returning `Uni` and `Multi` are considered non-blocking.
** Methods returning `void` or any other type are considered blocking.
* Kotlin `suspend` functions are always considered non-blocking and may not be annotated with `@Blocking`, `@NonBlocking` or `@RunOnVirtualThread`.
* Non-blocking methods must execute on the connection's event loop thread.
* Blocking methods must execute on a worker thread unless annotated with `@RunOnVirtualThread`.
* Methods annotated with `@RunOnVirtualThread` must execute on a virtual thread, each invocation spawns a new virtual thread.

==== Method parameters

Expand Down
5 changes: 5 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>opentelemetry-semconv</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ final class WebSocketDotNames {
static final DotName HANDSHAKE_REQUEST = DotName.createSimple(HandshakeRequest.class);
static final DotName THROWABLE = DotName.createSimple(Throwable.class);
static final DotName CLOSE_REASON = DotName.createSimple(CloseReason.class);
static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");

static final List<DotName> CALLBACK_ANNOTATIONS = List.of(ON_OPEN, ON_CLOSE, ON_BINARY_MESSAGE, ON_TEXT_MESSAGE,
ON_PONG_MESSAGE, ON_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,13 +1578,16 @@ private static ExecutionModel executionModel(MethodInfo method, TransformedAnnot
throw new WebSocketException("Kotlin `suspend` functions in WebSockets Next endpoints may not be "
+ "annotated @Blocking, @NonBlocking or @RunOnVirtualThread: " + method);
}

if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.RUN_ON_VIRTUAL_THREAD)) {
return ExecutionModel.VIRTUAL_THREAD;
} else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.BLOCKING)) {
return ExecutionModel.WORKER_THREAD;
} else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.NON_BLOCKING)) {
return ExecutionModel.EVENT_LOOP;
} else if (transformedAnnotations.hasAnnotation(method, WebSocketDotNames.TRANSACTIONAL)
|| transformedAnnotations.hasAnnotation(method.declaringClass(), WebSocketDotNames.TRANSACTIONAL)) {
// Method annotated with @Transactional or declared on a class annotated @Transactional is also treated as a blocking method
return ExecutionModel.WORKER_THREAD;
} else {
return hasBlockingSignature(method) ? ExecutionModel.WORKER_THREAD : ExecutionModel.EVENT_LOOP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class BlockingAnnotationTest {

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx).connect(endUri)) {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class NonBlockingAnnotationTest {

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx).connect(endUri)) {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:true,worker:false", client.sendAndAwaitReply("foo").toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.websockets.next.test.executionmodel;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;

import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class TransactionalClassTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("endpoint")
URI endUri;

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString());
}
}

@Transactional
@WebSocket(path = "/endpoint")
public static class Endpoint {

@OnTextMessage
Uni<String> message(String ignored) {
return Uni.createFrom().item("evenloop:" + Context.isOnEventLoopThread() + ",worker:" + Context.isOnWorkerThread());
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.quarkus.websockets.next.test.executionmodel;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;

import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class TransactionalMethodTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("endpoint")
URI endUri;

@Test
void testEndoint() {
try (WSClient client = new WSClient(vertx)) {
client.connect(endUri);
assertEquals("evenloop:false,worker:true", client.sendAndAwaitReply("foo").toString());
}
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

@Transactional
@OnTextMessage
Uni<String> message(String ignored) {
return Uni.createFrom().item("evenloop:" + Context.isOnEventLoopThread() + ",worker:" + Context.isOnWorkerThread());
}

}

}
Loading