Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebFlux Client Hangs indefinitely on Mono.block() #23363

Closed
bfrisoni opened this issue Jul 26, 2019 · 9 comments
Closed

WebFlux Client Hangs indefinitely on Mono.block() #23363

bfrisoni opened this issue Jul 26, 2019 · 9 comments
Labels
status: invalid An issue that we don't feel is valid

Comments

@bfrisoni
Copy link

Versions:
org.springframework.boot:spring-boot-starter:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-actuator:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-data-mongodb:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-data-jpa:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-log4j2:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-web:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-webflux:2.1.1.RELEASE
org.springframework.boot:spring-boot-starter-test:2.1.1.RELEASE
org.springframework.cloud:spring-cloud-starter-sleuth:2.1.1.RELEASE
org.springframework.security:spring-security-test:5.1.2.RELEASE

So I am having a very strange issue where Mono.block() hangs indefinitely and it only seems to occur when spring-boot-start-actuator is included as a dependency and the service being called returns an error status. Not sure what the relation is or if I am doing something wrong.

Below is the code being used:
Mono<A> aMono = webClient.get().uri().retrieve().bodyToMono(A.class).cache()
aMono.subscribe()
aMono.block()

If if the service returns a successful response, the aMono.block() returns without issue.
If spring-boot-starter-actuator:2.1.1.RELEASE dependency is removed, aMono.block() returns without issues for successful and errors responses.
Below is where the thread hangs:

http-nio-8080-exec-1" #42 daemon prio=5 os_prio=0 tid=0x0000000035c5c800 nid=0x4bb4 waiting on condition [0x000000003e679000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076ec549d0> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:81)
at reactor.core.publisher.Mono.block(Mono.java:1493
..
..
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:791)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1417)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
- locked <0x000000075bd66538> (a org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x000000075769f570> (a java.util.concurrent.ThreadPoolExecutor$Worker)

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Jul 26, 2019
@rstoyanchev
Copy link
Contributor

Is it intentional that this is calling both subscribe and block? Both subscribe, it's just one does it without blocking.

@rstoyanchev rstoyanchev added the status: waiting-for-feedback We need additional information before we can continue label Jul 26, 2019
@bfrisoni
Copy link
Author

Yes it is intentional. The use case is that there is some synchronous validation that needs to be done by utilizing data in a 4-5 different external services. So these requests are all fired and subscribed to initiate the call asynchronously, once the data is needed down the pipe a aMono.block() is called to return this cached response. Let me know if this is clear. I am doing some additional troubleshooting, it seems when the subscription consumes the error this hanging does not occur. For example:
aMono.doOnError(throwable -> System.out.println(throwable))

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jul 26, 2019
@bfrisoni
Copy link
Author

Looks like my last comment stating that if the error is consumed the aMono.block() does not hang is incorrect. It still hangs. Although if I step into the aMono.block() when my debugger is enabled, the thread does not hang.

I am doing some additional troubleshooting, it seems when the subscription consumes the error this hanging does not occur. For example:
aMono.doOnError(throwable -> System.out.println(throwable))

@rstoyanchev
Copy link
Contributor

The use case is that there is some synchronous validation that needs to be done by utilizing data in a 4-5 different external services. So these requests are all fired and subscribed to initiate the call asynchronously, once the data is needed down the pipe a aMono.block() is called to return this cached response.

The snippet shown is too basic perhaps, but I still don't understand the reason for caching + subscribing and blocking. If you need to call multiple services and validate when all have returned I would expect that to be composed into a single chain. I have too little information to work with but based on the given description it could be something like this:

Mono<A> aMono = webClient.get().uri().retrieve().bodyToMono(A.class);
Mono<B> bMono = webClient.get().uri().retrieve().bodyToMono(B.class);
Mono<C> cMono = webClient.get().uri().retrieve().bodyToMono(C.class);
Mono<D> dMono = webClient.get().uri().retrieve().bodyToMono(D.class);

Mono.zip(aMono, bMono, cMono, dMono)
    .doOnNext((a,b,c,d) -> {
            // ...
    )}
    .block();

@rstoyanchev rstoyanchev added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Jul 31, 2019
@bfrisoni
Copy link
Author

bfrisoni commented Aug 1, 2019

Thanks @rstoyanchev for the reply. Let me expand a little bit on the current use case and the limitations with your suggestion.
1.Mon.zip(..) maxes out I believe at 8 parameters
2.With your current implementation, the application would completely block until all requests have responded which is what i was trying to avoid.

Instead, with my current solution I am able to
1.Fire all 5 (could be more than 5 as the application grows) requests and subscribe asynchronously
2.Begin computations on validating a bunch of other rules that need to be satisfied, which may also require synchronous RDBMS lookups for validation data locally owned by the application itself to validate certain rules.
3.While number #2 is computing, in the back round all my requests are being processed asynchronously and only once I get to the rules that actually require this web service external data does the application block, which at this point most responses have been received and cached allowing the Mono.block() to return immediately.

Again, the thread only hangs, if actuator module is enabled AND one of the responses returns an error. If i remove the actuator module OR all responses return without error, the mono.block() does not hang. I have yet to test if this occurs with a Flux.block() under these sames conditions.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Aug 1, 2019
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Aug 1, 2019

@bfrisoni

  1. Mon.zip(..) maxes out I believe at 8 parameters

It doesn't max out. Please check the variant that takes Iterable.

2.With your current implementation, the application would completely block until all requests

It's not an implementation of anything. It's a sketch based on a very incomplete description.

1.Fire all 5 ...
2.Begin computations ...
3.While number #2 is computing, ...

All of this sounds quite feasible with 1) and 2) modeled as independent streams to be joined when complete, and 1) itself consisting of 5 more sub-streams joined with zip. I ask you to change your assumption this can only be done with cache, async, and block, but this is now turning into a discussion better suited for StackOverflow.

I'm afraid there isn't anything I can do without a sample that demonstrates the issue. I can't tell you why it blocks based on a loose description of a complex scenario. In addition, if Actuator is the cause of anything that can't be fixed here in the Spring Framework.

@rstoyanchev rstoyanchev added status: invalid An issue that we don't feel is valid and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on labels Aug 1, 2019
@bfrisoni
Copy link
Author

bfrisoni commented Aug 2, 2019

@rstoyanchev
Thanks for the feedback. I will create a sample project demonstrating the issue.

@rstoyanchev
Copy link
Contributor

Okay, if you get that feel free to comment here for consideration.

@bfrisoni
Copy link
Author

bfrisoni commented Dec 2, 2020

I never circled back around but I was able to get this to work by registering an error consumer. Without registering this consumer, it seems if the first subscription (aMono.subscribe()) generates an error, the error is not "cached" and replayed to the second subscription aMono.block() leaving the second subscription hanging waiting for a signal which of course never occurs. Not sure if this expected behavior, but posting this incase anyone else has a similar use case.

Mono aMono = webClient.get().uri().retrieve().bodyToMono(A.class).cache()
.doOnError(throwable -> logger.debug("Error", throwable ));
aMono.subscribe()
aMono.block()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

3 participants