Skip to content

Commit

Permalink
closes #347, closes #348
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmlet committed Apr 13, 2023
1 parent 4987616 commit 0e31911
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 18 deletions.
9 changes: 9 additions & 0 deletions grpc-spring-boot-starter-demo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ apply plugin: "nebula.facet"
facets {
pureNettyTest
kafkaStreamTest
reactiveTest
customSecurityTest
bothPureAndShadedNettyTest
noConsulDependenciesTest
Expand Down Expand Up @@ -77,6 +78,9 @@ dependencies {
implementation "org.springframework.security:spring-security-oauth2-jose"
implementation "org.springframework.security:spring-security-oauth2-resource-server"
implementation 'org.springframework.boot:spring-boot-starter-validation'
compileOnly 'org.springframework:spring-tx'



implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation project(':grpc-spring-boot-starter')
Expand All @@ -93,6 +97,7 @@ dependencies {

testImplementation "com.playtika.testcontainers:embedded-keycloak:2.2.14"
testImplementation "com.playtika.testcontainers:embedded-consul:2.2.14"
testImplementation "com.playtika.testcontainers:embedded-postgresql:2.2.14"

testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'org.mockito:mockito-core:2.23.0'
Expand All @@ -106,6 +111,10 @@ dependencies {
kafkaStreamTestImplementation "com.playtika.testcontainers:embedded-kafka:2.2.14"
kafkaStreamTestImplementation "org.springframework.cloud:spring-cloud-starter-stream-kafka"

reactiveTestImplementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
reactiveTestImplementation 'org.postgresql:r2dbc-postgresql'
reactiveTestImplementation 'org.postgresql:postgresql'


}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.lognet.springboot.grpc.demo;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.examples.reactor.ReactiveHelloRequest;
import io.grpc.examples.reactor.ReactiveHelloResponse;
Expand All @@ -9,6 +11,9 @@
import org.lognet.springboot.grpc.recovery.GRpcExceptionHandler;
import org.lognet.springboot.grpc.recovery.GRpcExceptionScope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -17,6 +22,7 @@

@GRpcService
@Slf4j
@ConditionalOnClass(Transactional.class)
public class ReactiveGreeterGrpcService extends ReactorReactiveGreeterGrpc.ReactiveGreeterImplBase {

private ReactiveGreeterService reactiveGreeterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import io.grpc.examples.reactor.ReactiveHelloRequest;
import io.grpc.examples.reactor.ReactiveHelloResponse;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Mono;

@Service
@ConditionalOnClass(Transactional.class)
public class ReactiveGreeterService {
@Transactional(readOnly = true)
public Mono<ReactiveHelloResponse> greet(Mono<ReactiveHelloRequest> request) {
return Mono
.from(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@
import io.grpc.examples.reactor.ReactiveHelloResponse;
import io.grpc.examples.reactor.ReactorReactiveGreeterGrpc;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIn;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.lognet.springboot.grpc.GrpcServerTestBase;
import org.lognet.springboot.grpc.demo.DemoApp;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.hamcrest.Matchers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
Expand All @@ -35,7 +32,8 @@
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApp.class, webEnvironment = NONE)
@ActiveProfiles("disable-security")
@ActiveProfiles({"disable-security","r2dbc-test"})
@DirtiesContext
public class ReactiveDemoTest extends GrpcServerTestBase {
@Test
public void grpcGreetTest() {
Expand Down Expand Up @@ -66,7 +64,7 @@ public void reactorGreetFailureTest() {

ReactorReactiveGreeterGrpc.newReactorStub(channel)
.greet(simpleRequest(shrek))
.block(Duration.ofSeconds(10));
.block(Duration.ofMinutes(10));
});
assertThat(e.getMessage(), containsStringIgnoringCase("not welcome"));
assertThat(e.getStatus().getCode(), is(Status.INVALID_ARGUMENT.getCode()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
spring:
r2dbc:
url: "r2dbc:postgresql://${embedded.postgresql.host}:${embedded.postgresql.port}/${embedded.postgresql.schema}"
username: ${embedded.postgresql.user}
password: ${embedded.postgresql.password}
name: ${embedded.postgresql.schema}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
embedded:
postgresql:
enabled: true
database: test_db
init-script-path: db/init_db.sql
containers:
enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE SCHEMA test_db;
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public GrpcSpringBootExtension(Project project) {
protocVersion.set("3.21.7");

reactiveProtocVersion = this.project.getObjects().property(String.class);
reactiveProtocVersion.set("1.2.3");
reactiveProtocVersion.set("1.2.4");

reactiveFeature = this.project.getObjects().property(ReactiveFeature.class);
reactiveFeature.set(ReactiveFeature.OFF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public FailureHandlingSupport(GRpcExceptionHandlerMethodResolver methodResolver)
}

public void closeCall(RuntimeException e, ServerCall<?, ?> call, Metadata headers) throws RuntimeException {
closeCall(e,call,headers,null);
closeCall(e, call, headers, null);
}

public void closeCall( RuntimeException e, ServerCall<?, ?> call, Metadata headers, Consumer<GRpcExceptionScope.GRpcExceptionScopeBuilder> customizer) throws RuntimeException {
EXCEPTION_HANDLED.get().set(true);
if(e == null) {
public void closeCall(RuntimeException e, ServerCall<?, ?> call, Metadata headers, Consumer<GRpcExceptionScope.GRpcExceptionScopeBuilder> customizer) throws RuntimeException {
Optional.ofNullable(EXCEPTION_HANDLED.get()).ifPresent(h -> h.set(true));
if (e == null) {
log.warn("Closing null exception with {}", Status.INTERNAL);
call.close(Status.INTERNAL, new Metadata());
} else {
Expand All @@ -56,7 +56,7 @@ private void handle(HandlerMethod handler, ServerCall<?, ?> call, Consumer<GRpcE
.methodDescriptor(call.getMethodDescriptor())
.hint(GRpcRuntimeExceptionWrapper.getHint(e));

if(customizer != null) {
if (customizer != null) {
customizer.accept(exceptionScopeBuilder);
}

Expand All @@ -70,10 +70,10 @@ private void handle(HandlerMethod handler, ServerCall<?, ?> call, Consumer<GRpcE
call.close(statusToSend, Optional.ofNullable(metadataToSend).orElseGet(Metadata::new));
} catch (Exception handlerException) {
log.error("Caught exception while handling exception {} using method {}, closing with {}.",
unwrapped.getClass().getSimpleName(),
handler.getMethod(),
Status.INTERNAL,
handlerException);
unwrapped.getClass().getSimpleName(),
handler.getMethod(),
Status.INTERNAL,
handlerException);
call.close(Status.INTERNAL, new Metadata());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public GRpcExceptionHandlerInterceptor(GRpcExceptionHandlerMethodResolver method
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

final AtomicBoolean callIsClosed = new AtomicBoolean(false);
final AtomicBoolean exceptionHandled = new AtomicBoolean(false);


if (!methodResolver.hasErrorHandlers()) {
Expand All @@ -42,7 +43,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re

@Override
public void close(Status status, Metadata trailers) {
if(null != status.getCause() && !EXCEPTION_HANDLED.get().get()){
// prevent close from being invoked twice
// (like from Reactive service from different thread , close method invoked directly )
Boolean handled = Optional.ofNullable(EXCEPTION_HANDLED.get())
.map(AtomicBoolean::get)
.orElse(!exceptionHandled.compareAndSet(false, true));
if(null != status.getCause() && !handled){
failureHandlingSupport.closeCall(new GRpcRuntimeExceptionWrapper(status.getCause()), this, trailers);
}

Expand Down

0 comments on commit 0e31911

Please sign in to comment.