From 8c39d8051dd39fda9b3d3b637bab881100fab45a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Tue, 27 Jul 2021 10:11:07 +0200 Subject: [PATCH] gRPC blocking client calls - event loop detection fixes #17344 --- .../grpc/deployment/GrpcClientBuildItem.java | 14 ++++-- .../grpc/deployment/GrpcClientProcessor.java | 8 +++- .../quarkus/grpc/deployment/GrpcDotNames.java | 2 + .../BlockingClientCallOnEventLoopTest.java | 43 +++++++++++++++++++ .../lock/detection/CallBlockingService.java | 39 +++++++++++++++++ .../src/test/proto/call-hello.proto | 16 +++++++ .../EventLoopBlockingCheckInterceptor.java | 21 +++++++++ .../supports/GrpcClientConfigProvider.java | 4 ++ 8 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/BlockingClientCallOnEventLoopTest.java create mode 100644 extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/CallBlockingService.java create mode 100644 extensions/grpc/deployment/src/test/proto/call-hello.proto create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/EventLoopBlockingCheckInterceptor.java diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java index 17913e920c71e..bd40ff0ee654b 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java @@ -70,18 +70,24 @@ public boolean equals(Object obj) { public enum ClientType { - BLOCKING_STUB("newBlockingStub"), - MUTINY_STUB("newMutinyStub"), - MUTINY_CLIENT(null); + BLOCKING_STUB("newBlockingStub", true), + MUTINY_STUB("newMutinyStub", false), + MUTINY_CLIENT(null, false); private final String factoryMethodName; + private boolean blocking; - ClientType(String factoryMethodName) { + ClientType(String factoryMethodName, boolean blocking) { this.factoryMethodName = factoryMethodName; + this.blocking = blocking; } public String getFactoryMethodName() { return factoryMethodName; } + + public boolean isBlocking() { + return blocking; + } } } diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java index 7d529d1d25cbb..2ea5584d23946 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java @@ -1,6 +1,7 @@ package io.quarkus.grpc.deployment; import static io.quarkus.deployment.Feature.GRPC_CLIENT; +import static io.quarkus.grpc.deployment.GrpcDotNames.ADD_BLOCKING_CLIENT_INTERCEPTOR; import static io.quarkus.grpc.deployment.GrpcDotNames.CONFIGURE_STUB; import static io.quarkus.grpc.deployment.GrpcDotNames.CREATE_CHANNEL_METHOD; import static io.quarkus.grpc.deployment.GrpcDotNames.RETRIEVE_CHANNEL_METHOD; @@ -197,7 +198,7 @@ public void generateGrpcClientProducers(List clients, .addQualifier().annotation(GrpcDotNames.GRPC_CLIENT).addValue("value", client.getClientName()).done() .scope(Singleton.class) .unremovable() - .creator(new Consumer() { + .creator(new Consumer<>() { @Override public void accept(MethodCreator mc) { GrpcClientProcessor.this.generateChannelProducer(mc, client); @@ -210,7 +211,7 @@ public void accept(MethodCreator mc) { syntheticBeans.produce(SyntheticBeanBuildItem.configure(clientInfo.className) .addQualifier().annotation(GrpcDotNames.GRPC_CLIENT).addValue("value", svcName).done() .scope(Singleton.class) - .creator(new Consumer() { + .creator(new Consumer<>() { @Override public void accept(MethodCreator mc) { GrpcClientProcessor.this.generateClientProducer(mc, svcName, clientInfo); @@ -340,6 +341,9 @@ private void generateClientProducer(MethodCreator mc, String svcName, ClientInfo // If needed, modify the call options, e.g. stub = stub.withCompression("gzip") client = mc.invokeStaticMethod(CONFIGURE_STUB, serviceName, client); + if (clientInfo.type.isBlocking()) { + client = mc.invokeStaticMethod(ADD_BLOCKING_CLIENT_INTERCEPTOR, client); + } } mc.returnValue(client); mc.close(); diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java index a9c69da3eac72..444c0af34bd82 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java @@ -45,6 +45,8 @@ public class GrpcDotNames { static final MethodDescriptor CONFIGURE_STUB = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class, "configureStub", AbstractStub.class, String.class, AbstractStub.class); + static final MethodDescriptor ADD_BLOCKING_CLIENT_INTERCEPTOR = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class, + "addBlockingClientInterceptor", AbstractStub.class, AbstractStub.class); static final MethodDescriptor GET_STUB_CONFIGURATOR = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class, "getStubConfigurator", BiFunction.class); diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/BlockingClientCallOnEventLoopTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/BlockingClientCallOnEventLoopTest.java new file mode 100644 index 0000000000000..c46c3a509a645 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/BlockingClientCallOnEventLoopTest.java @@ -0,0 +1,43 @@ +package io.quarkus.grpc.lock.detection; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.grpc.examples.helloworld.GreeterGrpc; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.blocking.call.test.CallBlocking; +import io.quarkus.grpc.blocking.call.test.CallBlockingGrpc; +import io.quarkus.grpc.blocking.call.test.CallHello; +import io.quarkus.grpc.server.services.HelloService; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +public class BlockingClientCallOnEventLoopTest { + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setFlatClassPath(true) + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(HelloService.class, CallBlockingService.class, CallBlocking.class) + .addPackage(GreeterGrpc.class.getPackage()) + .addPackage(CallBlockingGrpc.class.getPackage())); + + @GrpcClient + CallBlocking callBlocking; + + @Test + void shouldThrowExceptionOnBlockingClientCall() { + Uni result = callBlocking.doBlockingCall(CallHello.Empty.getDefaultInstance()); + CallHello.SuccessOrFailureDescription response = result.await().atMost(Duration.ofSeconds(10)); + + assertThat(response.getSuccess()).isFalse(); + + assertThat(response.getErrorDescription()).contains(CallBlockingService.EXPECTED_ERROR_PREFIX); + } + +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/CallBlockingService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/CallBlockingService.java new file mode 100644 index 0000000000000..a686b35c0a664 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/lock/detection/CallBlockingService.java @@ -0,0 +1,39 @@ +package io.quarkus.grpc.lock.detection; + +import org.jboss.logging.Logger; + +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.blocking.call.test.CallBlocking; +import io.quarkus.grpc.blocking.call.test.CallHello; +import io.quarkus.grpc.blocking.call.test.CallHello.SuccessOrFailureDescription; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class CallBlockingService implements CallBlocking { + + public static String EXPECTED_ERROR_PREFIX = "Blocking gRPC client call made from the event loop"; + + private static final Logger log = Logger.getLogger(CallBlockingService.class); + + @GrpcClient + GreeterGrpc.GreeterBlockingStub blockingClient; + + @Override + public Uni doBlockingCall(CallHello.Empty request) { + try { + HelloReply reply = blockingClient.sayHello(HelloRequest.newBuilder().setName("Bob the Blocker").build()); + return Uni.createFrom().item(reply) + .map(helloReply -> SuccessOrFailureDescription.newBuilder().setSuccess(true).build()); + } catch (Exception e) { + if (!e.getMessage().contains(EXPECTED_ERROR_PREFIX)) { + log.error("Error ", e); + } + return Uni.createFrom().item(SuccessOrFailureDescription.newBuilder().setSuccess(false) + .setErrorDescription(e.getMessage()).build()); + } + } +} diff --git a/extensions/grpc/deployment/src/test/proto/call-hello.proto b/extensions/grpc/deployment/src/test/proto/call-hello.proto new file mode 100644 index 0000000000000..389cb3dc5fa7e --- /dev/null +++ b/extensions/grpc/deployment/src/test/proto/call-hello.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package blocking.call.test; + +option java_package = "io.quarkus.grpc.blocking.call.test"; + +service CallBlocking { + rpc doBlockingCall(Empty) returns (SuccessOrFailureDescription); +} + +message Empty {} + +message SuccessOrFailureDescription { + bool success = 1; + string error_description = 2; +} \ No newline at end of file diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/EventLoopBlockingCheckInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/EventLoopBlockingCheckInterceptor.java new file mode 100644 index 0000000000000..ce43d7c7490de --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/EventLoopBlockingCheckInterceptor.java @@ -0,0 +1,21 @@ +package io.quarkus.grpc.runtime.supports; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; +import io.vertx.core.Context; + +public class EventLoopBlockingCheckInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, + Channel next) { + if (Context.isOnEventLoopThread()) { + throw new IllegalStateException("Blocking gRPC client call made from the event loop. " + + "If the code is executed from a gRPC service or a RESTEasy Reactive resource, either annotate the method " + + " that makes the call with `@Blocking` or use the non-blocking client."); + } + return next.newCall(method, callOptions); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/GrpcClientConfigProvider.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/GrpcClientConfigProvider.java index 7a56949a97302..062b4d85dec9b 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/GrpcClientConfigProvider.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/GrpcClientConfigProvider.java @@ -54,6 +54,10 @@ public static AbstractStub configureStub(String serviceName, AbstractStub return Arc.container().instance(GrpcClientConfigProvider.class).get().adjustCallOptions(serviceName, stub); } + public static AbstractStub addBlockingClientInterceptor(AbstractStub stub) { + return stub.withInterceptors(new EventLoopBlockingCheckInterceptor()); + } + public static BiFunction, AbstractStub> getStubConfigurator() { return GrpcClientConfigProvider::configureStub; }