Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC blocking client calls - event loop detection #19015

Merged
merged 1 commit into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,24 @@ public boolean equals(Object obj) {

public enum ClientType {

BLOCKING_STUB("newBlockingStub"),
MUTINY_STUB("newMutinyStub"),
MUTINY_CLIENT(null);
BLOCKING_STUB("newBlockingStub", true),
MUTINY_STUB("newMutinyStub", false),
MUTINY_CLIENT(null, false);

private final String factoryMethodName;
private boolean blocking;

ClientType(String factoryMethodName) {
ClientType(String factoryMethodName, boolean blocking) {
this.factoryMethodName = factoryMethodName;
this.blocking = blocking;
}

public String getFactoryMethodName() {
return factoryMethodName;
}

public boolean isBlocking() {
return blocking;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.grpc.deployment;

import static io.quarkus.deployment.Feature.GRPC_CLIENT;
import static io.quarkus.grpc.deployment.GrpcDotNames.ADD_BLOCKING_CLIENT_INTERCEPTOR;
import static io.quarkus.grpc.deployment.GrpcDotNames.CONFIGURE_STUB;
import static io.quarkus.grpc.deployment.GrpcDotNames.CREATE_CHANNEL_METHOD;
import static io.quarkus.grpc.deployment.GrpcDotNames.RETRIEVE_CHANNEL_METHOD;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void generateGrpcClientProducers(List<GrpcClientBuildItem> clients,
.addQualifier().annotation(GrpcDotNames.GRPC_CLIENT).addValue("value", client.getClientName()).done()
.scope(Singleton.class)
.unremovable()
.creator(new Consumer<MethodCreator>() {
.creator(new Consumer<>() {
@Override
public void accept(MethodCreator mc) {
GrpcClientProcessor.this.generateChannelProducer(mc, client);
Expand All @@ -210,7 +211,7 @@ public void accept(MethodCreator mc) {
syntheticBeans.produce(SyntheticBeanBuildItem.configure(clientInfo.className)
.addQualifier().annotation(GrpcDotNames.GRPC_CLIENT).addValue("value", svcName).done()
.scope(Singleton.class)
.creator(new Consumer<MethodCreator>() {
.creator(new Consumer<>() {
@Override
public void accept(MethodCreator mc) {
GrpcClientProcessor.this.generateClientProducer(mc, svcName, clientInfo);
Expand Down Expand Up @@ -340,6 +341,9 @@ private void generateClientProducer(MethodCreator mc, String svcName, ClientInfo

// If needed, modify the call options, e.g. stub = stub.withCompression("gzip")
client = mc.invokeStaticMethod(CONFIGURE_STUB, serviceName, client);
if (clientInfo.type.isBlocking()) {
client = mc.invokeStaticMethod(ADD_BLOCKING_CLIENT_INTERCEPTOR, client);
}
}
mc.returnValue(client);
mc.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class GrpcDotNames {

static final MethodDescriptor CONFIGURE_STUB = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class,
"configureStub", AbstractStub.class, String.class, AbstractStub.class);
static final MethodDescriptor ADD_BLOCKING_CLIENT_INTERCEPTOR = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class,
"addBlockingClientInterceptor", AbstractStub.class, AbstractStub.class);
static final MethodDescriptor GET_STUB_CONFIGURATOR = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class,
"getStubConfigurator", BiFunction.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.grpc.lock.detection;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.blocking.call.test.CallBlocking;
import io.quarkus.grpc.blocking.call.test.CallBlockingGrpc;
import io.quarkus.grpc.blocking.call.test.CallHello;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class BlockingClientCallOnEventLoopTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setFlatClassPath(true)
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloService.class, CallBlockingService.class, CallBlocking.class)
.addPackage(GreeterGrpc.class.getPackage())
.addPackage(CallBlockingGrpc.class.getPackage()));

@GrpcClient
CallBlocking callBlocking;

@Test
void shouldThrowExceptionOnBlockingClientCall() {
Uni<CallHello.SuccessOrFailureDescription> result = callBlocking.doBlockingCall(CallHello.Empty.getDefaultInstance());
CallHello.SuccessOrFailureDescription response = result.await().atMost(Duration.ofSeconds(10));

assertThat(response.getSuccess()).isFalse();

assertThat(response.getErrorDescription()).contains(CallBlockingService.EXPECTED_ERROR_PREFIX);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.grpc.lock.detection;

import org.jboss.logging.Logger;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.blocking.call.test.CallBlocking;
import io.quarkus.grpc.blocking.call.test.CallHello;
import io.quarkus.grpc.blocking.call.test.CallHello.SuccessOrFailureDescription;
import io.smallrye.mutiny.Uni;

@GrpcService
public class CallBlockingService implements CallBlocking {

public static String EXPECTED_ERROR_PREFIX = "Blocking gRPC client call made from the event loop";

private static final Logger log = Logger.getLogger(CallBlockingService.class);

@GrpcClient
GreeterGrpc.GreeterBlockingStub blockingClient;

@Override
public Uni<SuccessOrFailureDescription> doBlockingCall(CallHello.Empty request) {
try {
HelloReply reply = blockingClient.sayHello(HelloRequest.newBuilder().setName("Bob the Blocker").build());
return Uni.createFrom().item(reply)
.map(helloReply -> SuccessOrFailureDescription.newBuilder().setSuccess(true).build());
} catch (Exception e) {
if (!e.getMessage().contains(EXPECTED_ERROR_PREFIX)) {
log.error("Error ", e);
}
return Uni.createFrom().item(SuccessOrFailureDescription.newBuilder().setSuccess(false)
.setErrorDescription(e.getMessage()).build());
}
}
}
16 changes: 16 additions & 0 deletions extensions/grpc/deployment/src/test/proto/call-hello.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package blocking.call.test;

option java_package = "io.quarkus.grpc.blocking.call.test";

service CallBlocking {
rpc doBlockingCall(Empty) returns (SuccessOrFailureDescription);
}

message Empty {}

message SuccessOrFailureDescription {
bool success = 1;
string error_description = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.grpc.runtime.supports;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import io.vertx.core.Context;

public class EventLoopBlockingCheckInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
Channel next) {
if (Context.isOnEventLoopThread()) {
throw new IllegalStateException("Blocking gRPC client call made from the event loop. " +
"If the code is executed from a gRPC service or a RESTEasy Reactive resource, either annotate the method " +
" that makes the call with `@Blocking` or use the non-blocking client.");
}
return next.newCall(method, callOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static AbstractStub<?> configureStub(String serviceName, AbstractStub<?>
return Arc.container().instance(GrpcClientConfigProvider.class).get().adjustCallOptions(serviceName, stub);
}

public static AbstractStub<?> addBlockingClientInterceptor(AbstractStub<?> stub) {
return stub.withInterceptors(new EventLoopBlockingCheckInterceptor());
}

public static BiFunction<String, AbstractStub<?>, AbstractStub<?>> getStubConfigurator() {
return GrpcClientConfigProvider::configureStub;
}
Expand Down