Skip to content

Commit

Permalink
Merge branch '6.1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
snicoll committed Apr 4, 2024
2 parents e3281a5 + d955549 commit 5253af1
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import kotlin.reflect.jvm.KCallablesJvm;
import kotlin.reflect.jvm.ReflectJvmMapping;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import org.springframework.core.CoroutinesUtils;
import org.springframework.core.DefaultParameterNameDiscoverer;
Expand All @@ -60,6 +61,12 @@
* Extension of {@link HandlerMethod} that invokes the underlying method with
* argument values resolved from the current HTTP request through a list of
* {@link HandlerMethodArgumentResolver}.
* <p>By default, the method invocation happens on the thread from which the
* {@code Mono} was subscribed to, or in some cases the thread that emitted one
* of the resolved arguments (e.g. when the request body needs to be decoded).
* To ensure a predictable thread for the underlying method's invocation,
* a {@link Scheduler} can optionally be provided via
* {@link #setInvocationScheduler(Scheduler)}.
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
Expand All @@ -86,6 +93,9 @@ public class InvocableHandlerMethod extends HandlerMethod {

private Class<?>[] validationGroups = EMPTY_GROUPS;

@Nullable
private Scheduler invocationScheduler;


/**
* Create an instance from a {@code HandlerMethod}.
Expand Down Expand Up @@ -154,6 +164,13 @@ public void setMethodValidator(@Nullable MethodValidator methodValidator) {
methodValidator.determineValidationGroups(getBean(), getBridgedMethod()) : EMPTY_GROUPS);
}

/**
* Set the {@link Scheduler} on which to perform the method invocation.
* @since 6.1.6
*/
public void setInvocationScheduler(@Nullable Scheduler invocationScheduler) {
this.invocationScheduler = invocationScheduler;
}

/**
* Invoke the method for the given exchange.
Expand All @@ -166,7 +183,7 @@ public void setMethodValidator(@Nullable MethodValidator methodValidator) {
public Mono<HandlerResult> invoke(
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {

return getMethodArgumentValues(exchange, bindingContext, providedArgs).flatMap(args -> {
return getMethodArgumentValuesOnScheduler(exchange, bindingContext, providedArgs).flatMap(args -> {
if (shouldValidateArguments() && this.methodValidator != null) {
this.methodValidator.applyArgumentValidation(
getBean(), getBridgedMethod(), getMethodParameters(), args, this.validationGroups);
Expand Down Expand Up @@ -218,6 +235,12 @@ public Mono<HandlerResult> invoke(
});
}

private Mono<Object[]> getMethodArgumentValuesOnScheduler(
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {
Mono<Object[]> argumentValuesMono = getMethodArgumentValues(exchange, bindingContext, providedArgs);
return this.invocationScheduler != null ? argumentValuesMono.publishOn(this.invocationScheduler) : argumentValuesMono;
}

private Mono<Object[]> getMethodArgumentValues(
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.scheduler.Scheduler;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -103,6 +104,12 @@ class ControllerMethodResolver {

private final ReactiveAdapterRegistry reactiveAdapterRegistry;

@Nullable
private final Scheduler invocationScheduler;

@Nullable
private final Predicate<? super HandlerMethod> blockingMethodPredicate;

@Nullable
private final MethodValidator methodValidator;

Expand All @@ -125,7 +132,9 @@ class ControllerMethodResolver {
ControllerMethodResolver(
ArgumentResolverConfigurer customResolvers, ReactiveAdapterRegistry adapterRegistry,
ConfigurableApplicationContext context, List<HttpMessageReader<?>> readers,
@Nullable WebBindingInitializer webBindingInitializer) {
@Nullable WebBindingInitializer webBindingInitializer,
@Nullable Scheduler invocationScheduler,
@Nullable Predicate<? super HandlerMethod> blockingMethodPredicate) {

Assert.notNull(customResolvers, "ArgumentResolverConfigurer is required");
Assert.notNull(adapterRegistry, "ReactiveAdapterRegistry is required");
Expand All @@ -137,6 +146,8 @@ class ControllerMethodResolver {
this.requestMappingResolvers = requestMappingResolvers(customResolvers, adapterRegistry, context, readers);
this.exceptionHandlerResolvers = exceptionHandlerResolvers(customResolvers, adapterRegistry, context);
this.reactiveAdapterRegistry = adapterRegistry;
this.invocationScheduler = invocationScheduler;
this.blockingMethodPredicate = blockingMethodPredicate;

if (BEAN_VALIDATION_PRESENT) {
this.methodValidator = HandlerMethodValidator.from(webBindingInitializer, null,
Expand Down Expand Up @@ -287,6 +298,21 @@ private static Predicate<MethodParameter> methodParamPredicate(
};
}

/**
* Return a {@link Scheduler} for the given method if it is considered
* blocking by the underlying blocking method predicate, or null if no
* particular scheduler should be used for this method invocation.
*/
@Nullable
public Scheduler getSchedulerFor(HandlerMethod handlerMethod) {
if (this.invocationScheduler != null) {
Assert.state(this.blockingMethodPredicate != null, "Expected HandlerMethod Predicate");
if (this.blockingMethodPredicate.test(handlerMethod)) {
return this.invocationScheduler;
}
}
return null;
}

/**
* Return an {@link InvocableHandlerMethod} for the given
Expand All @@ -297,6 +323,7 @@ public InvocableHandlerMethod getRequestMappingMethod(HandlerMethod handlerMetho
invocable.setArgumentResolvers(this.requestMappingResolvers);
invocable.setReactiveAdapterRegistry(this.reactiveAdapterRegistry);
invocable.setMethodValidator(this.methodValidator);
invocable.setInvocationScheduler(getSchedulerFor(handlerMethod));
return invocable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public void afterPropertiesSet() throws Exception {

this.methodResolver = new ControllerMethodResolver(
this.argumentResolverConfigurer, this.reactiveAdapterRegistry, this.applicationContext,
this.messageReaders, this.webBindingInitializer);
this.messageReaders, this.webBindingInitializer,
this.scheduler, this.blockingMethodPredicate);

this.modelInitializer = new ModelInitializer(this.methodResolver, this.reactiveAdapterRegistry);
}
Expand Down Expand Up @@ -260,11 +261,9 @@ public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.onErrorResume(ex -> exceptionHandler.handleError(exchange, ex));

if (this.scheduler != null) {
Assert.state(this.blockingMethodPredicate != null, "Expected HandlerMethod Predicate");
if (this.blockingMethodPredicate.test(handlerMethod)) {
resultMono = resultMono.subscribeOn(this.scheduler);
}
Scheduler optionalScheduler = this.methodResolver.getSchedulerFor(handlerMethod);
if (optionalScheduler != null) {
return resultMono.subscribeOn(optionalScheduler);
}

return resultMono;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import org.springframework.core.io.buffer.DataBuffer;
Expand Down Expand Up @@ -75,6 +77,15 @@ void resolveArg() {
assertHandlerResultValue(mono, "success:value1");
}

@Test
void resolveArgOnSchedulerThread() {
this.resolvers.add(stubResolver(Mono.<Object>just("success").publishOn(Schedulers.newSingle("wrong"))));
Method method = ResolvableMethod.on(TestController.class).mockCall(o -> o.singleArgThread(null)).method();
Mono<HandlerResult> mono = invokeOnScheduler(Schedulers.newSingle("good"), new TestController(), method);

assertHandlerResultValue(mono, "success on thread: good-", false);
}

@Test
void resolveNoArgValue() {
this.resolvers.add(stubResolver(Mono.empty()));
Expand All @@ -92,6 +103,14 @@ void resolveNoArgs() {
assertHandlerResultValue(mono, "success");
}

@Test
void resolveNoArgsOnSchedulerThread() {
Method method = ResolvableMethod.on(TestController.class).mockCall(TestController::noArgsThread).method();
Mono<HandlerResult> mono = invokeOnScheduler(Schedulers.newSingle("good"), new TestController(), method);

assertHandlerResultValue(mono, "on thread: good-", false);
}

@Test
void cannotResolveArg() {
Method method = ResolvableMethod.on(TestController.class).mockCall(o -> o.singleArg(null)).method();
Expand Down Expand Up @@ -229,6 +248,13 @@ private Mono<HandlerResult> invoke(Object handler, Method method, Object... prov
return invocable.invoke(this.exchange, new BindingContext(), providedArgs);
}

private Mono<HandlerResult> invokeOnScheduler(Scheduler scheduler, Object handler, Method method, Object... providedArgs) {
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handler, method);
invocable.setArgumentResolvers(this.resolvers);
invocable.setInvocationScheduler(scheduler);
return invocable.invoke(this.exchange, new BindingContext(), providedArgs);
}

private HandlerMethodArgumentResolver stubResolver(Object stubValue) {
return stubResolver(Mono.just(stubValue));
}
Expand All @@ -241,8 +267,19 @@ private HandlerMethodArgumentResolver stubResolver(Mono<Object> stubValue) {
}

private void assertHandlerResultValue(Mono<HandlerResult> mono, String expected) {
this.assertHandlerResultValue(mono, expected, true);
}

private void assertHandlerResultValue(Mono<HandlerResult> mono, String expected, boolean strict) {
StepVerifier.create(mono)
.consumeNextWith(result -> assertThat(result.getReturnValue()).isEqualTo(expected))
.assertNext(result -> {
if (strict) {
assertThat(result.getReturnValue()).isEqualTo(expected);
}
else {
assertThat(String.valueOf(result.getReturnValue())).startsWith(expected);
}
})
.expectComplete()
.verify();
}
Expand All @@ -259,6 +296,14 @@ String noArgs() {
return "success";
}

String singleArgThread(String q) {
return q + " on thread: " + Thread.currentThread().getName();
}

String noArgsThread() {
return "on thread: " + Thread.currentThread().getName();
}

void exceptionMethod() {
throw new IllegalStateException("boo");
}
Expand Down
Loading

0 comments on commit 5253af1

Please sign in to comment.