Skip to content

Commit

Permalink
[2425] Some PoC of having a more configurable retry
Browse files Browse the repository at this point in the history
The main issue we can't use generic reactor-core retry concept is due to
the inability to detect if HTTP-level data has been sent across the
wire.

 reactor#2425
  • Loading branch information
Samuel Cox authored and crankydillo committed Aug 27, 2022
1 parent 1510518 commit 1f2f0cb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http.client;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
Expand All @@ -25,6 +26,7 @@
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import io.netty.buffer.ByteBuf;
Expand All @@ -49,6 +51,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.Http2SettingsSpec;
import reactor.netty.http.HttpProtocol;
Expand Down Expand Up @@ -644,14 +647,39 @@ public final RequestSender delete() {
* @since 0.9.6
*/
public final HttpClient disableRetry(boolean disableRetry) {
if (disableRetry == configuration().retryDisabled) {
if (RequestRetryConfig.DISABLED == configuration().retryConfig) { // yes instance comparison..
return this;
}
HttpClient dup = duplicate();
dup.configuration().retryDisabled = disableRetry;
dup.configuration().retryConfig = RequestRetryConfig.DISABLED;
return dup;
}

public final HttpClient retryConfig(final RequestRetryConfig retryConfig) {
Objects.requireNonNull(retryConfig, "retryConfig");
HttpClient dup = duplicate();
dup.configuration().retryConfig = retryConfig;
return dup;
}

public static class RequestRetryConfig {

public final static RequestRetryConfig DEFAULT = new RequestRetryConfig(1, AbortedException::isConnectionReset);
public final static RequestRetryConfig DISABLED = new RequestRetryConfig(0, anyException -> false);

final int maxRetries;
final Predicate<IOException> isRetriable;

public RequestRetryConfig(final int maxRetries, final Predicate<IOException> isRetriable) {
this.maxRetries = maxRetries;
this.isRetriable = isRetriable;
}

boolean isRetrieable(final IOException ioe) {
return isRetriable.test(ioe);
}
}

/**
* Setup a callback called when {@link HttpClientRequest} has been sent
* and {@link HttpClientState#REQUEST_SENT} has been emitted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.retry.Retry;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.Http2ConnectionProvider.OWNER;
Expand Down Expand Up @@ -314,7 +315,12 @@ public WebsocketClientSpec websocketClientSpec() {
BiConsumer<HttpHeaders, HttpClientRequest> redirectRequestBiConsumer;
Consumer<HttpClientRequest> redirectRequestConsumer;
Duration responseTimeout;

HttpClient.RequestRetryConfig retryConfig;

// TODO consolidate this with config concept
boolean retryDisabled;

SslProvider sslProvider;
URI uri;
String uriStr;
Expand All @@ -332,7 +338,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.method = HttpMethod.GET;
this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11};
this._protocols = h11;
this.retryDisabled = false;
this.retryConfig = HttpClient.RequestRetryConfig.DEFAULT;
}

HttpClientConfig(HttpClientConfig parent) {
Expand Down Expand Up @@ -361,6 +367,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer;
this.redirectRequestConsumer = parent.redirectRequestConsumer;
this.responseTimeout = parent.responseTimeout;
this.retryConfig = parent.retryConfig;
this.retryDisabled = parent.retryDisabled;
this.sslProvider = parent.sslProvider;
this.uri = parent.uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
Expand Down Expand Up @@ -269,7 +270,7 @@ public void subscribe(CoreSubscriber<? super Connection> actual) {
.acquire(_config, observer, handler, resolver)
.subscribe(new ClientTransportSubscriber(sink));

}).retryWhen(Retry.indefinitely().filter(handler))
}).retryWhen(Retry.max(config.retryConfig.maxRetries).filter(handler))
.subscribe(actual);
}

Expand Down Expand Up @@ -347,7 +348,7 @@ public void onUncaughtException(Connection connection, Throwable error) {
handler.previousRequestHeaders = ops.requestHeaders;
}
}
else if (handler.shouldRetry && AbortedException.isConnectionReset(error)) {
else if (handler.canRetry(error)) {
HttpClientOperations ops = connection.as(HttpClientOperations.class);
if (ops != null && ops.hasSentHeaders()) {
// In some cases the channel close event may be delayed and thus the connection to be
Expand Down Expand Up @@ -468,7 +469,12 @@ static final class HttpClientHandler extends SocketAddress
volatile String resourceUrl;
volatile UriEndpoint fromURI;
volatile Supplier<String>[] redirectedFrom;
volatile boolean shouldRetry;

final RequestRetryConfig retryConfig;

// TODO not happy with name as it collides with config concept..
volatile boolean shouldRetry = true;

volatile HttpHeaders previousRequestHeaders;

HttpClientHandler(HttpClientConfig configuration) {
Expand All @@ -488,7 +494,7 @@ static final class HttpClientHandler extends SocketAddress
new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER);

this.websocketClientSpec = configuration.websocketClientSpec;
this.shouldRetry = !configuration.retryDisabled;
this.retryConfig = configuration.retryConfig;
this.handler = configuration.body;

if (configuration.uri == null) {
Expand Down Expand Up @@ -682,14 +688,22 @@ public boolean test(Throwable throwable) {
redirect(re.location);
return true;
}
if (shouldRetry && AbortedException.isConnectionReset(throwable)) {
shouldRetry = false;
if (shouldRetry && canRetry(throwable)) {
redirect(toURI.toString());
return true;
}
return false;
}

/**
* Signals that the request <i>can</i> be retried.
*/
boolean canRetry(final Throwable err) {
return shouldRetry &&
err instanceof IOException &&
retryConfig.isRetrieable((IOException)err);
}

@Override
public String toString() {
return "{" + "uri=" + toURI + ", method=" + method + '}';
Expand Down

0 comments on commit 1f2f0cb

Please sign in to comment.