diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java index 1190b59949..eb62ac6b4f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java @@ -63,6 +63,7 @@ import reactor.netty.transport.ClientTransport; import reactor.util.Metrics; import reactor.util.annotation.Nullable; +import reactor.util.retry.RetrySpec; /** * An HttpClient allows building in a safe immutable way an http client that is @@ -637,49 +638,39 @@ public final RequestSender delete() { } /** - * Option to disable {@code retry once} support for the outgoing requests that fail with - * {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}. - *

By default this is set to false in which case {@code retry once} is enabled. + * Option to disable {@code request retry} established by {@link #requestRetry(RetrySpec)}. + * See its doc for any default retry behavior! + *

By default this is set to false. * - * @param disableRetry true to disable {@code retry once}, false to enable it + * @param disableRetry true to disable {@code request retry}, false to enable it * * @return a new {@link HttpClient} * @since 0.9.6 */ public final HttpClient disableRetry(boolean disableRetry) { - if (RequestRetryConfig.DISABLED == configuration().retryConfig) { // yes instance comparison.. + if (disableRetry == configuration().retryDisabled) { return this; } HttpClient dup = duplicate(); - dup.configuration().retryConfig = RequestRetryConfig.DISABLED; + dup.configuration().retryDisabled = disableRetry; return dup; } - public final HttpClient retryConfig(final RequestRetryConfig retryConfig) { - Objects.requireNonNull(retryConfig, "retryConfig"); + /** + * Option to customize {@code request retry} behavior. If any HTTP request data + * (headers, body, etc.), the request will not be resubmitted regardless of this + * configuration. This can be disabled via {@link #disableRetry(boolean)}. + * + *

This defaults to {@code retry once} for outgoing requests that fail with + * {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}. + */ + public final HttpClient requestRetry(final RetrySpec requestRetry) { + Objects.requireNonNull(requestRetry, "requestRetry"); HttpClient dup = duplicate(); - dup.configuration().retryConfig = retryConfig; + dup.configuration().requestRetrySpec = requestRetry; return dup; } - public static class RequestRetryConfig { - - public final static RequestRetryConfig DEFAULT = new RequestRetryConfig(1, AbortedException::isConnectionReset); - public final static RequestRetryConfig DISABLED = new RequestRetryConfig(0, anyException -> false); - - final int maxRetries; - final Predicate isRetriable; - - public RequestRetryConfig(final int maxRetries, final Predicate isRetriable) { - this.maxRetries = maxRetries; - this.isRetriable = isRetriable; - } - - boolean isRetrieable(final IOException ioe) { - return isRetriable.test(ioe); - } - } - /** * Setup a callback called when {@link HttpClientRequest} has been sent * and {@link HttpClientState#REQUEST_SENT} has been emitted. diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 997ca98950..515996372a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -68,6 +68,7 @@ import reactor.netty.NettyOutbound; import reactor.netty.NettyPipeline; import reactor.netty.ReactorNetty; +import reactor.netty.channel.AbortedException; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; import reactor.netty.http.Http2SettingsSpec; @@ -84,6 +85,7 @@ import reactor.util.annotation.Nullable; import reactor.util.context.Context; import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; import static reactor.netty.ReactorNetty.format; import static reactor.netty.http.client.Http2ConnectionProvider.OWNER; @@ -316,9 +318,8 @@ public WebsocketClientSpec websocketClientSpec() { Consumer redirectRequestConsumer; Duration responseTimeout; - HttpClient.RequestRetryConfig retryConfig; + RetrySpec requestRetrySpec; - // TODO consolidate this with config concept boolean retryDisabled; SslProvider sslProvider; @@ -338,7 +339,7 @@ public WebsocketClientSpec websocketClientSpec() { this.method = HttpMethod.GET; this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11}; this._protocols = h11; - this.retryConfig = HttpClient.RequestRetryConfig.DEFAULT; + this.requestRetrySpec = Retry.max(1).filter(AbortedException::isConnectionReset); } HttpClientConfig(HttpClientConfig parent) { @@ -367,7 +368,7 @@ public WebsocketClientSpec websocketClientSpec() { this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer; this.redirectRequestConsumer = parent.redirectRequestConsumer; this.responseTimeout = parent.responseTimeout; - this.retryConfig = parent.retryConfig; + this.requestRetrySpec = parent.requestRetrySpec; this.retryDisabled = parent.retryDisabled; this.sslProvider = parent.sslProvider; this.uri = parent.uri; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java index df08542bc5..73f4b43147 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConnect.java @@ -15,7 +15,6 @@ */ package reactor.netty.http.client; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -67,6 +66,7 @@ import reactor.util.annotation.Nullable; import reactor.util.context.Context; import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; import static reactor.netty.ReactorNetty.format; import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; @@ -208,7 +208,7 @@ static final class MonoHttpConnect extends Mono { public void subscribe(CoreSubscriber actual) { HttpClientHandler handler = new HttpClientHandler(config); - Mono.create(sink -> { + final Mono baseMono = Mono.create(sink -> { HttpClientConfig _config = config; //append secure handler if needed @@ -269,7 +269,6 @@ public void subscribe(CoreSubscriber actual) { .acquire(_config, observer, handler, resolver) .subscribe(new ClientTransportSubscriber(sink)); - // TODO definitely not happy about spreading the retry logic even more }).retryWhen(Retry.indefinitely().filter(err -> { if (err instanceof RedirectClientException) { RedirectClientException re = (RedirectClientException)err; @@ -280,8 +279,13 @@ public void subscribe(CoreSubscriber actual) { return true; } return false; - })).retryWhen(Retry.max(config.retryConfig.maxRetries).filter(handler)) - .subscribe(actual); + })); + + // If request retry is enabled, the handler should guarantee no request data sent + (config.retryDisabled + ? baseMono + : baseMono.retryWhen(config.requestRetrySpec.modifyErrorFilter(handler::and)) + ).subscribe(actual); } private void removeIncompatibleProtocol(HttpClientConfig config, HttpProtocol protocol) { @@ -358,7 +362,7 @@ public void onUncaughtException(Connection connection, Throwable error) { handler.previousRequestHeaders = ops.requestHeaders; } } - else if (handler.canRetry(error)) { + else if (handler.requestRetrySpec.errorFilter.test(error)) { HttpClientOperations ops = connection.as(HttpClientOperations.class); if (ops != null && ops.hasSentHeaders()) { // In some cases the channel close event may be delayed and thus the connection to be @@ -369,8 +373,8 @@ else if (handler.canRetry(error)) { // Mark the connection as non-persistent here so that it is never returned to the pool and leave // the channel close event to invalidate it. ops.markPersistent(false); - // Disable retry if the headers or/and the body were sent - handler.shouldRetry = false; + // Signal to retry that headers or/and the body were sent + handler.requestDataSent = true; if (log.isWarnEnabled()) { log.warn(format(connection.channel(), "The connection observed an error, the request cannot be " + @@ -480,10 +484,14 @@ static final class HttpClientHandler extends SocketAddress volatile UriEndpoint fromURI; volatile Supplier[] redirectedFrom; - final RequestRetryConfig retryConfig; + /** + * A {@link RetrySpec} that is tied to request submission. The implementation + * that leverages this is supposed to guarantee that no retry will happen if + * any HTTP request data is sent over the wire. + */ + final RetrySpec requestRetrySpec; - // TODO not happy with name as it collides with config concept.. - volatile boolean shouldRetry = true; + volatile boolean requestDataSent; volatile HttpHeaders previousRequestHeaders; @@ -504,7 +512,7 @@ static final class HttpClientHandler extends SocketAddress new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER); this.websocketClientSpec = configuration.websocketClientSpec; - this.retryConfig = configuration.retryConfig; + this.requestRetrySpec = configuration.requestRetrySpec; this.handler = configuration.body; if (configuration.uri == null) { @@ -690,22 +698,13 @@ void channel(HttpClientOperations ops) { @Override public boolean test(Throwable throwable) { - if (shouldRetry && canRetry(throwable)) { + if (!requestDataSent) { redirect(toURI.toString()); return true; } return false; } - /** - * Signals that the request can be retried. - */ - boolean canRetry(final Throwable err) { - return shouldRetry && - err instanceof IOException && - retryConfig.isRetrieable((IOException)err); - } - @Override public String toString() { return "{" + "uri=" + toURI + ", method=" + method + '}';