Skip to content

Commit

Permalink
Merge pull request #19015 from michalszynkiewicz/grpc-client-lock-det…
Browse files Browse the repository at this point in the history
…ection

gRPC blocking client calls - event loop detection
  • Loading branch information
mkouba authored Jul 27, 2021
2 parents 8e09481 + 8c39d80 commit db143c5
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,24 @@ public boolean equals(Object obj) {

public enum ClientType {

BLOCKING_STUB("newBlockingStub"),
MUTINY_STUB("newMutinyStub"),
MUTINY_CLIENT(null);
BLOCKING_STUB("newBlockingStub", true),
MUTINY_STUB("newMutinyStub", false),
MUTINY_CLIENT(null, false);

private final String factoryMethodName;
private boolean blocking;

ClientType(String factoryMethodName) {
ClientType(String factoryMethodName, boolean blocking) {
this.factoryMethodName = factoryMethodName;
this.blocking = blocking;
}

public String getFactoryMethodName() {
return factoryMethodName;
}

public boolean isBlocking() {
return blocking;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.grpc.deployment;

import static io.quarkus.deployment.Feature.GRPC_CLIENT;
import static io.quarkus.grpc.deployment.GrpcDotNames.ADD_BLOCKING_CLIENT_INTERCEPTOR;
import static io.quarkus.grpc.deployment.GrpcDotNames.CONFIGURE_STUB;
import static io.quarkus.grpc.deployment.GrpcDotNames.CREATE_CHANNEL_METHOD;
import static io.quarkus.grpc.deployment.GrpcDotNames.RETRIEVE_CHANNEL_METHOD;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void generateGrpcClientProducers(List<GrpcClientBuildItem> clients,
.addQualifier().annotation(GrpcDotNames.GRPC_CLIENT).addValue("value", client.getClientName()).done()
.scope(Singleton.class)
.unremovable()
.creator(new Consumer<MethodCreator>() {
.creator(new Consumer<>() {
@Override
public void accept(MethodCreator mc) {
GrpcClientProcessor.this.generateChannelProducer(mc, client);
Expand All @@ -210,7 +211,7 @@ public void accept(MethodCreator mc) {
syntheticBeans.produce(SyntheticBeanBuildItem.configure(clientInfo.className)
.addQualifier().annotation(GrpcDotNames.GRPC_CLIENT).addValue("value", svcName).done()
.scope(Singleton.class)
.creator(new Consumer<MethodCreator>() {
.creator(new Consumer<>() {
@Override
public void accept(MethodCreator mc) {
GrpcClientProcessor.this.generateClientProducer(mc, svcName, clientInfo);
Expand Down Expand Up @@ -340,6 +341,9 @@ private void generateClientProducer(MethodCreator mc, String svcName, ClientInfo

// If needed, modify the call options, e.g. stub = stub.withCompression("gzip")
client = mc.invokeStaticMethod(CONFIGURE_STUB, serviceName, client);
if (clientInfo.type.isBlocking()) {
client = mc.invokeStaticMethod(ADD_BLOCKING_CLIENT_INTERCEPTOR, client);
}
}
mc.returnValue(client);
mc.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class GrpcDotNames {

static final MethodDescriptor CONFIGURE_STUB = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class,
"configureStub", AbstractStub.class, String.class, AbstractStub.class);
static final MethodDescriptor ADD_BLOCKING_CLIENT_INTERCEPTOR = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class,
"addBlockingClientInterceptor", AbstractStub.class, AbstractStub.class);
static final MethodDescriptor GET_STUB_CONFIGURATOR = MethodDescriptor.ofMethod(GrpcClientConfigProvider.class,
"getStubConfigurator", BiFunction.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.grpc.lock.detection;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.blocking.call.test.CallBlocking;
import io.quarkus.grpc.blocking.call.test.CallBlockingGrpc;
import io.quarkus.grpc.blocking.call.test.CallHello;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class BlockingClientCallOnEventLoopTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setFlatClassPath(true)
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HelloService.class, CallBlockingService.class, CallBlocking.class)
.addPackage(GreeterGrpc.class.getPackage())
.addPackage(CallBlockingGrpc.class.getPackage()));

@GrpcClient
CallBlocking callBlocking;

@Test
void shouldThrowExceptionOnBlockingClientCall() {
Uni<CallHello.SuccessOrFailureDescription> result = callBlocking.doBlockingCall(CallHello.Empty.getDefaultInstance());
CallHello.SuccessOrFailureDescription response = result.await().atMost(Duration.ofSeconds(10));

assertThat(response.getSuccess()).isFalse();

assertThat(response.getErrorDescription()).contains(CallBlockingService.EXPECTED_ERROR_PREFIX);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.grpc.lock.detection;

import org.jboss.logging.Logger;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.blocking.call.test.CallBlocking;
import io.quarkus.grpc.blocking.call.test.CallHello;
import io.quarkus.grpc.blocking.call.test.CallHello.SuccessOrFailureDescription;
import io.smallrye.mutiny.Uni;

@GrpcService
public class CallBlockingService implements CallBlocking {

public static String EXPECTED_ERROR_PREFIX = "Blocking gRPC client call made from the event loop";

private static final Logger log = Logger.getLogger(CallBlockingService.class);

@GrpcClient
GreeterGrpc.GreeterBlockingStub blockingClient;

@Override
public Uni<SuccessOrFailureDescription> doBlockingCall(CallHello.Empty request) {
try {
HelloReply reply = blockingClient.sayHello(HelloRequest.newBuilder().setName("Bob the Blocker").build());
return Uni.createFrom().item(reply)
.map(helloReply -> SuccessOrFailureDescription.newBuilder().setSuccess(true).build());
} catch (Exception e) {
if (!e.getMessage().contains(EXPECTED_ERROR_PREFIX)) {
log.error("Error ", e);
}
return Uni.createFrom().item(SuccessOrFailureDescription.newBuilder().setSuccess(false)
.setErrorDescription(e.getMessage()).build());
}
}
}
16 changes: 16 additions & 0 deletions extensions/grpc/deployment/src/test/proto/call-hello.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package blocking.call.test;

option java_package = "io.quarkus.grpc.blocking.call.test";

service CallBlocking {
rpc doBlockingCall(Empty) returns (SuccessOrFailureDescription);
}

message Empty {}

message SuccessOrFailureDescription {
bool success = 1;
string error_description = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.grpc.runtime.supports;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import io.vertx.core.Context;

public class EventLoopBlockingCheckInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
Channel next) {
if (Context.isOnEventLoopThread()) {
throw new IllegalStateException("Blocking gRPC client call made from the event loop. " +
"If the code is executed from a gRPC service or a RESTEasy Reactive resource, either annotate the method " +
" that makes the call with `@Blocking` or use the non-blocking client.");
}
return next.newCall(method, callOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static AbstractStub<?> configureStub(String serviceName, AbstractStub<?>
return Arc.container().instance(GrpcClientConfigProvider.class).get().adjustCallOptions(serviceName, stub);
}

public static AbstractStub<?> addBlockingClientInterceptor(AbstractStub<?> stub) {
return stub.withInterceptors(new EventLoopBlockingCheckInterceptor());
}

public static BiFunction<String, AbstractStub<?>, AbstractStub<?>> getStubConfigurator() {
return GrpcClientConfigProvider::configureStub;
}
Expand Down

0 comments on commit db143c5

Please sign in to comment.