Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max Retries #337

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: https://remote-function-host
maxNumBatchRequests: 10000
maxRetries: 5
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: https://remote-function-host:8000
maxNumBatchRequests: 10000
maxRetries: 5
transport:
type: io.statefun.transports.v1/async
trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt
client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8
client_key_password: /opt/statefun/modules/statefun-smoke-e2e/certs/key_password.txt
client_key_password: /opt/statefun/modules/statefun-smoke-e2e/certs/key_password.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: http://remote-function-host:8000
maxNumBatchRequests: 10000
maxNumBatchRequests: 10000
maxRetries: 5
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: http://localhost:8000
maxNumBatchRequests: 10000
maxNumBatchRequests: 10000
maxRetries: 5
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ final class DefaultHttpRequestReplyClient implements RequestReplyClient {
public CompletableFuture<FromFunction> call(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
ToFunction toFunction) {
ToFunction toFunction,
int maxRetries) {
Request request =
new Request.Builder()
.url(url)
Expand All @@ -65,13 +66,17 @@ public CompletableFuture<FromFunction> call(

Call newCall = client.newCall(request);
RetryingCallback callback =
new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown);
new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown, maxRetries);
callback.attachToCall(newCall);
return callback.future().thenApply(DefaultHttpRequestReplyClient::parseResponse);
}

private static FromFunction parseResponse(Response response) {
final InputStream httpResponseBody = responseBody(response);
if (response == null) {
return null;
}

final InputStream httpResponseBody = responseBody(response);
try {
return parseProtobufOrThrow(FromFunction.parser(), httpResponseBody);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public final class HttpFunctionEndpointSpec implements Serializable {
private static final long serialVersionUID = 1;

private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;

private static final Integer DEFAULT_MAX_RETRIES = -1;

private static final TransportClientSpec DEFAULT_TRANSPORT_CLIENT_SPEC =
new TransportClientSpec(
TransportClientConstants.ASYNC_CLIENT_FACTORY_TYPE,
Expand All @@ -47,6 +50,7 @@ public final class HttpFunctionEndpointSpec implements Serializable {
private final TargetFunctions targetFunctions;
private final UrlPathTemplate urlPathTemplate;
private final int maxNumBatchRequests;
private final int maxRetries;

// ============================================================
// HTTP transport related properties
Expand All @@ -63,11 +67,13 @@ private HttpFunctionEndpointSpec(
TargetFunctions targetFunctions,
UrlPathTemplate urlPathTemplate,
int maxNumBatchRequests,
int maxRetries,
TypeName transportClientFactoryType,
ObjectNode transportClientProps) {
this.targetFunctions = targetFunctions;
this.urlPathTemplate = urlPathTemplate;
this.maxNumBatchRequests = maxNumBatchRequests;
this.maxRetries = maxRetries;
this.transportClientFactoryType = transportClientFactoryType;
this.transportClientProps = transportClientProps;
}
Expand All @@ -84,6 +90,10 @@ public int maxNumBatchRequests() {
return maxNumBatchRequests;
}

public int maxRetries() {
return maxRetries;
}

public TypeName transportClientFactoryType() {
return transportClientFactoryType;
}
Expand All @@ -99,6 +109,7 @@ public static final class Builder {
private final UrlPathTemplate urlPathTemplate;

private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
private int maxRetries = DEFAULT_MAX_RETRIES;
private TransportClientSpec transportClientSpec = DEFAULT_TRANSPORT_CLIENT_SPEC;

@JsonCreator
Expand All @@ -118,6 +129,12 @@ public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
return this;
}

@JsonProperty("maxRetries")
public Builder withMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}

/**
* This is marked with @JsonProperty specifically to tell Jackson to use this method when
* deserializing from Json.
Expand All @@ -138,6 +155,7 @@ public HttpFunctionEndpointSpec build() {
targetFunctions,
urlPathTemplate,
maxNumBatchRequests,
maxRetries,
transportClientSpec.factoryKind(),
transportClientSpec.specNode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public StatefulFunction functionOfType(FunctionType functionType) {
return new RequestReplyFunction(
functionType,
endpointSpec.maxNumBatchRequests(),
endpointSpec.maxRetries(),
requestReplyClientFactory.createTransportClient(
endpointSpec.transportClientProperties(), endpointUrl));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.Timeout;
import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
Expand All @@ -42,8 +43,7 @@
final class RetryingCallback implements Callback {
private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10);

private static final Set<Integer> RETRYABLE_HTTP_CODES =
new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500));
private static final Set<Integer> RETRYABLE_HTTP_CODES = new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500));

private static final Logger LOG = LoggerFactory.getLogger(RetryingCallback.class);

Expand All @@ -52,19 +52,25 @@ final class RetryingCallback implements Callback {
private final ToFunctionRequestSummary requestSummary;
private final RemoteInvocationMetrics metrics;
private final BooleanSupplier isShutdown;
private final int maxRetries;

private long requestStarted;

private int retryAttempts;

RetryingCallback(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
Timeout timeout,
BooleanSupplier isShutdown) {
BooleanSupplier isShutdown,
int maxRetries) {
this.resultFuture = new CompletableFuture<>();
this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
this.requestSummary = requestSummary;
this.metrics = metrics;
this.isShutdown = Objects.requireNonNull(isShutdown);
this.maxRetries = maxRetries;
this.retryAttempts = 0;
}

CompletableFuture<Response> future() {
Expand Down Expand Up @@ -105,6 +111,18 @@ private void onResponseUnsafe(Call call, Response response) {
resultFuture.complete(response);
return;
}

if ((maxRetries >= 0) && (response.code() == 500)) {
if (retryAttempts < maxRetries) {
LOG.warn("Failed attempt " + retryAttempts + " of " + maxRetries + ". Retrying.");
retryAttempts++;
} else {
LOG.warn("Maximum number of attempts (" + maxRetries + ") exceeded. Dropping message.");
resultFuture.complete(null);
return;
}
}

if (!RETRYABLE_HTTP_CODES.contains(response.code()) && response.code() < 500) {
throw new IllegalStateException("Non successful HTTP response code " + response.code());
}
Expand All @@ -130,7 +148,8 @@ private boolean retryAfterApplyingBackoff(Call call) {
}

/**
* Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during
* Executes the runnable, and completes {@link #resultFuture} with any
* exceptions thrown, during
* its execution.
*/
private void tryWithFuture(RunnableWithException runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* functions: com.foo.bar/* (typename)
* urlPathTemplate: https://bar.foo.com:8080/{function.name} (string)
* maxNumBatchRequests: 10000 (int, optional)
* maxRetries: 5 (int, optional)
* timeouts: (object, optional)
* call: 1minute (duration, optional)
* connect: 20seconds (duration, optional)
Expand All @@ -68,6 +69,7 @@ public final class HttpEndpointBinderV1 implements ComponentBinder {
private static final JsonPointer URL_PATH_TEMPLATE = JsonPointer.compile("/urlPathTemplate");
private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
JsonPointer.compile("/maxNumBatchRequests");
private static final JsonPointer MAX_RETRIES = JsonPointer.compile("/maxRetries");

private HttpEndpointBinderV1() {}

Expand Down Expand Up @@ -105,6 +107,8 @@ private static HttpFunctionEndpointSpec parseSpec(ComponentJsonObject component)
optionalMaxNumBatchRequests(httpEndpointSpecNode)
.ifPresent(specBuilder::withMaxNumBatchRequests);

optionalMaxRetries(httpEndpointSpecNode).ifPresent(specBuilder::withMaxRetries);

final TransportClientSpec transportClientSpec =
new TransportClientSpec(
TransportClientConstants.OKHTTP_CLIENT_FACTORY_TYPE, (ObjectNode) httpEndpointSpecNode);
Expand All @@ -126,4 +130,8 @@ private static UrlPathTemplate urlPathTemplate(JsonNode functionEndpointSpecNode
private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) {
return Selectors.optionalIntegerAt(functionNode, MAX_NUM_BATCH_REQUESTS);
}

private static OptionalInt optionalMaxRetries(JsonNode functionNode) {
return Selectors.optionalIntegerAt(functionNode, MAX_RETRIES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ private NettyClient(
public CompletableFuture<FromFunction> call(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
ToFunction toFunction) {
ToFunction toFunction,
int maxRetries) {
NettyRequest request = new NettyRequest(this, metrics, requestSummary, toFunction);
return request.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ public ClassLoaderSafeRequestReplyClient(RequestReplyClient delegate) {
public CompletableFuture<FromFunction> call(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
ToFunction toFunction) {
ToFunction toFunction,
int maxRetries) {
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();

try {
Thread.currentThread().setContextClassLoader(delegateClassLoader);
return delegate.call(requestSummary, metrics, toFunction);
return delegate.call(requestSummary, metrics, toFunction, maxRetries);
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ public interface RequestReplyClient {
CompletableFuture<FromFunction> call(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
ToFunction toFunction);
ToFunction toFunction,
int maxRetries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class RequestReplyFunction implements StatefulFunction {
private final FunctionType functionType;
private final RequestReplyClient client;
private final int maxNumBatchRequests;
private final int maxRetries;

/**
* This flag indicates whether or not at least one request has already been sent to the remote
Expand Down Expand Up @@ -93,20 +94,31 @@ public final class RequestReplyFunction implements StatefulFunction {
@Persisted private final PersistedRemoteFunctionValues managedStates;

public RequestReplyFunction(
FunctionType functionType, int maxNumBatchRequests, RequestReplyClient client) {
this(functionType, new PersistedRemoteFunctionValues(), maxNumBatchRequests, client, false);
FunctionType functionType,
int maxNumBatchRequests,
int maxRetries,
RequestReplyClient client) {
this(
functionType,
new PersistedRemoteFunctionValues(),
maxNumBatchRequests,
maxRetries,
client,
false);
}

@VisibleForTesting
RequestReplyFunction(
FunctionType functionType,
PersistedRemoteFunctionValues states,
int maxNumBatchRequests,
int maxRetries,
RequestReplyClient client,
boolean isFirstRequestSent) {
this.functionType = Objects.requireNonNull(functionType);
this.managedStates = Objects.requireNonNull(states);
this.maxNumBatchRequests = maxNumBatchRequests;
this.maxRetries = maxRetries;
this.client = Objects.requireNonNull(client);
this.isFirstRequestSent = isFirstRequestSent;
}
Expand Down Expand Up @@ -181,6 +193,9 @@ private void onAsyncResult(

private static Either<InvocationResponse, IncompleteInvocationContext> unpackResponse(
FromFunction fromFunction) {
if (fromFunction == null) {
return Either.Left(InvocationResponse.getDefaultInstance());
}
if (fromFunction.hasIncompleteInvocationContext()) {
return Either.Right(fromFunction.getIncompleteInvocationContext());
}
Expand Down Expand Up @@ -339,7 +354,7 @@ private void sendToFunction(InternalContext context, ToFunction toFunction) {
toFunction.getInvocation().getInvocationsCount());
RemoteInvocationMetrics metrics = context.functionTypeMetrics();
CompletableFuture<FromFunction> responseFuture =
client.call(requestSummary, metrics, toFunction);
client.call(requestSummary, metrics, toFunction, maxRetries);

if (isFirstRequestSent) {
context.registerAsyncOperation(toFunction, responseFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public class RequestReplyFunctionTest {

private final RequestReplyFunction functionUnderTest =
new RequestReplyFunction(
FN_TYPE, testInitialRegisteredState("session", "com.foo.bar/myType"), 10, client, true);
FN_TYPE,
testInitialRegisteredState("session", "com.foo.bar/myType"),
10,
5,
client,
true);

@Test
public void example() {
Expand Down Expand Up @@ -121,7 +126,7 @@ public void batchIsAccumulatedWhileARequestIsInFlight() {

@Test
public void reachingABatchLimitTriggersBackpressure() {
RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client);
RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, 5, client);

// send one message
functionUnderTest.invoke(context, TypedValue.getDefaultInstance());
Expand All @@ -137,7 +142,7 @@ public void reachingABatchLimitTriggersBackpressure() {

@Test
public void returnedMessageReleaseBackpressure() {
RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client);
RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, 5, client);

// the following invocations should cause backpressure
functionUnderTest.invoke(context, TypedValue.getDefaultInstance());
Expand Down Expand Up @@ -342,7 +347,7 @@ public void retryBatchOnUnkownAsyncResponseAfterRestore() {
ToFunction originalRequest = client.wasSentToFunction;

RequestReplyFunction restoredFunction =
new RequestReplyFunction(FN_TYPE, new PersistedRemoteFunctionValues(), 2, client, true);
new RequestReplyFunction(FN_TYPE, new PersistedRemoteFunctionValues(), 2, 5, client, true);
restoredFunction.invoke(context, unknownAsyncOperation(originalRequest));

// retry batch after a restore on an unknown async operation should start with empty state specs
Expand Down Expand Up @@ -393,7 +398,8 @@ private static final class FakeClient implements RequestReplyClient {
public CompletableFuture<FromFunction> call(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
ToFunction toFunction) {
ToFunction toFunction,
int maxRetries) {
this.wasSentToFunction = toFunction;
try {
return CompletableFuture.completedFuture(this.fromFunction.get());
Expand Down
Loading