Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebFlux: When using jetty-reactive-httpclient calling retry on WebClient.exchange throws CancellationException #23005

Closed
scottjohnson opened this issue May 20, 2019 · 8 comments
Assignees
Labels
for: external-project Needs a fix in external project in: web Issues in web modules (web, webmvc, webflux, websocket)

Comments

@scottjohnson
Copy link

When using the jetty-reactive-httpclient client connector with WebClient, whenever we call retry after WebClient.exchange (to retry the HTTP request), a CancellationException is thrown instead of the request being retried. When using reactor-netty client, the WebClient HTTP request is retried as expected.

It is unclear to us whether this is a Spring issue, or one with Reactor or Jetty. Please feel free to redirect us to the correct project.

Versions:
Spring up to 5.1.7.RELEASE
Spring Boot up to 2.1.4.RELEASE
Jetty 9.4.18.v20190429
Reactor 3.2.8.RELEASE

See this sample repository for a full demonstration of the problem: https://github.com/scottjohnson/webclient-retry-repro

In part:

webClient.get()
                .uri("https://postman-echo.com/status/404")
                .exchange()
                .flatMap(cr -> {
                    if (!cr.statusCode().is2xxSuccessful()) {
                        throw new ResponseStatusException(HttpStatus.BAD_GATEWAY, "Didn't get a 200, retrying...");
                    }
                    else return cr.bodyToMono(String.class);
                })
                .retry(2);

We would expect this code to retry the requested URI two more times. This happens as expected when using the reactor-netty client.

When using jetty-reactive-http-client, the original request is made, but on retry instead of retrying the following exception is thrown:

java.util.concurrent.CancellationException: null
	at org.eclipse.jetty.reactive.client.internal.AbstractSinglePublisher.subscribe(AbstractSinglePublisher.java:54) ~[jetty-reactive-httpclient-1.0.3.jar:na]
	at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:43) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3710) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:109) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:93) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:122) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at org.eclipse.jetty.reactive.client.internal.AbstractSingleProcessor.downStreamOnNext(AbstractSingleProcessor.java:110) ~[jetty-reactive-httpclient-1.0.3.jar:na]
	at org.eclipse.jetty.reactive.client.internal.ResponseListenerPublisher.onNext(ResponseListenerPublisher.java:130) ~[jetty-reactive-httpclient-1.0.3.jar:na]
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3710) ~[reactor-core-3.2.8.RELEASE.jar:3.2.8.RELEASE]
	at org.eclipse.jetty.reactive.client.internal.ResponseListenerPublisher.onHeaders(ResponseListenerPublisher.java:72) ~[jetty-reactive-httpclient-1.0.3.jar:na]
	at org.eclipse.jetty.client.ResponseNotifier.notifyHeaders(ResponseNotifier.java:98) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.ResponseNotifier.notifyHeaders(ResponseNotifier.java:90) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.HttpReceiver.responseHeaders(HttpReceiver.java:267) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.headerComplete(HttpReceiverOverHTTP.java:256) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.http.HttpParser.parseFields(HttpParser.java:1218) ~[jetty-http-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1502) ~[jetty-http-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:172) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.process(HttpReceiverOverHTTP.java:135) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:73) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.http.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:133) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.client.http.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:155) ~[jetty-client-9.4.18.v20190429.jar:9.4.18.v20190429]
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:427) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:321) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:159) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) ~[jetty-io-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683) ~[jetty-util-9.4.15.v20190215.jar:9.4.15.v20190215]
	at java.base/java.lang.Thread.run(Thread.java:844) ~[na:na]
@scottjohnson scottjohnson changed the title WebFlux: When using jetty-reactive-httpclient calling retry on Webclient.exchange throws CancellationException WebFlux: When using jetty-reactive-httpclient calling retry on WebClient.exchange throws CancellationException May 20, 2019
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label May 20, 2019
@bclozel bclozel added the in: web Issues in web modules (web, webmvc, webflux, websocket) label May 21, 2019
@sdeleuze sdeleuze self-assigned this May 28, 2019
@sdeleuze
Copy link
Contributor

It looks like a potential https://github.com/jetty-project/jetty-reactive-httpclient issue.

@scottjohnson Could you please try to reproduce using only jetty-reactive-httpclient API + Reactor retry operator?

@sbordet
Copy link

sbordet commented Jul 11, 2019

@sdeleuze what I see analyzing this on the jetty-reactive-httpclient side is that the exception causes cancel() to be invoked on a subscription, and I don't see how a canceled subscription can be reused (I could not find any explicit behavior described by the reactivestream spec).

Isn't the current behavior violating Rule 2.12?

In this particular case, you want to send another request, so I'd say that you have to run again through the whole process of creating a request, which would re-create new publishers, subscribers and subscriptions so that they all are different objects than the previous request (and therefore you won't see the CancellationException).

In other words, what is the semantic of retry() in this case:

AtomicInteger unique = new AtomicInteger();
webClient.get()
    .uri("http://localhost/" + unique.incrementAndGet())
    .exchange()
    .flatMap(r -> {
        throw new ResponseStatusException(HttpStatus.OK);
    })
    .retry(1)

Is there a request to /1 and then a request to /2 for the retry?

If not, what if the first request is a POST that consumes a content that is not re-consumable (e.g. reads from a InputStream that provides the request content only once)?
The retry won't be able to send the content a second time.

In general, a call to Jetty's ReactiveRequest.response(...) it's a once-only operation (per request instance) and whatever is passed to it or returned by it cannot be reused.

Thanks!

@scottjohnson
Copy link
Author

Thanks @sbordet for weighing in. This analysis matches with our observations that the cancelled subscription was being re-used after being put in a cancelled state.

@sdeleuze, we weren't able to effectively create a test on our end that reproduces using only the jetty-reactive-httpclient API; do you have any examples of component tests at a low level that I could adapt for this purpose?

@sbordet
Copy link

sbordet commented Jul 12, 2019

@scottjohnson I have a reproducer, below instructions.

Be sure you have the dependency on spring-webflux:

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webflux</artifactId>
  <version>5.1.8.RELEASE</version>
  <scope>test</scope>
</dependency>

Then the test class with this test:

@Test
public void testRetry() throws Exception {
    // Setup the server here.

    AtomicInteger unique = new AtomicInteger();
    WebClient client = WebClient.builder().clientConnector(new JettyClientHttpConnector(httpClient())).build();
    client.get()
        .uri(uri() + "/" + unique.incrementAndGet())
        .exchange()
        .flatMap(r -> {
            throw new ResponseStatusException(HttpStatus.OK);
        })
        .retry(1)
        .block();
}

@scottjohnson
Copy link
Author

Hi, is there any progress on this issue?

@scottjohnson
Copy link
Author

Hi, just checking in if there is any progress on this issue? Thanks!

@rstoyanchev rstoyanchev assigned rstoyanchev and unassigned sdeleuze Jan 28, 2020
@rstoyanchev
Copy link
Contributor

It looks like this was fixed as a side effect of #22375. As a result of wrapping the exchange with Mono#defer now after cancellation, JettyClientHttpConnector is invoked again causing a new request to be created.

I confirmed that starting with 5.2 this is no longer an issue. Nevertheless I do want to follow up on the following:

@sdeleuze what I see analyzing this on the jetty-reactive-httpclient side is that the exception causes cancel() to be invoked on a subscription, and I don't see how a canceled subscription can be reused (I could not find any explicit behavior described by the reactivestream spec).

Isn't the current behavior violating Rule 2.12?

The subscription for the original subscriber is indeed cancelled. However on the retry AbstractSinglePublisher#subscribe is called with a new Subscriber and that can happen many times as per Rule 2.10.

For comparison, in Reactor Netty, a cancellation is interpreted as a request to close the request, while each additional Subscriber is a request for a new connection.

@rstoyanchev rstoyanchev added for: external-project Needs a fix in external project and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Jan 28, 2020
@scottjohnson
Copy link
Author

Thanks for all your efforts @rstoyanchev. Much appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project Needs a fix in external project in: web Issues in web modules (web, webmvc, webflux, websocket)
Projects
None yet
Development

No branches or pull requests

6 participants