Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APP-2821: Implement a configurable retry mechanism #232

Merged
merged 21 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.symphony.bdk.core;

import com.symphony.bdk.http.api.ApiClient;
import com.symphony.bdk.core.auth.AuthSession;
import com.symphony.bdk.core.client.ApiClientFactory;
import com.symphony.bdk.core.config.model.BdkConfig;
import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder;
import com.symphony.bdk.core.service.MessageService;
import com.symphony.bdk.core.service.SessionService;
import com.symphony.bdk.core.service.datafeed.DatafeedService;
Expand All @@ -23,6 +23,9 @@
import com.symphony.bdk.gen.api.StreamsApi;
import com.symphony.bdk.gen.api.UserApi;
import com.symphony.bdk.gen.api.UsersApi;
import com.symphony.bdk.http.api.ApiClient;

import com.symphony.bdk.http.api.ApiException;

import org.apiguardian.api.API;

Expand All @@ -44,12 +47,16 @@ class ServiceFactory {
private final ApiClient agentClient;
private final AuthSession authSession;
private final BdkConfig config;
private final RetryWithRecoveryBuilder retryBuilder;

public ServiceFactory(ApiClientFactory apiClientFactory, AuthSession authSession, BdkConfig config) {
this.podClient = apiClientFactory.getPodClient();
this.agentClient = apiClientFactory.getAgentClient();
this.authSession = authSession;
this.config = config;
this.retryBuilder = new RetryWithRecoveryBuilder<>()
.retryConfig(config.getRetry())
.recoveryStrategy(ApiException::isUnauthorized, authSession::refresh);
}

/**
Expand All @@ -58,7 +65,7 @@ public ServiceFactory(ApiClientFactory apiClientFactory, AuthSession authSession
* @return an new {@link UserService} instance.
*/
public UserService getUserService() {
return new UserService(new UserApi(podClient), new UsersApi(podClient), authSession );
return new UserService(new UserApi(podClient), new UsersApi(podClient), authSession, retryBuilder);
}

/**
Expand All @@ -67,7 +74,7 @@ public UserService getUserService() {
* @return an new {@link StreamService} instance.
*/
public StreamService getStreamService() {
return new StreamService(new StreamsApi(podClient), authSession);
return new StreamService(new StreamsApi(podClient), authSession, retryBuilder);
}

/**
Expand All @@ -76,7 +83,7 @@ public StreamService getStreamService() {
* @return an new {@link SessionService} instance.
*/
public SessionService getSessionService() {
return new SessionService(new SessionApi(podClient));
return new SessionService(new SessionApi(podClient), new RetryWithRecoveryBuilder<>().retryConfig(config.getRetry()));
}

/**
Expand All @@ -97,8 +104,8 @@ public DatafeedService getDatafeedService() {
* @return an new {@link MessageService} instance.
*/
public MessageService getMessageService() {
return new MessageService(new MessagesApi(this.agentClient), new MessageApi(this.podClient),
new MessageSuppressionApi(this.podClient), new StreamsApi(this.podClient), new PodApi(this.podClient),
new AttachmentsApi(this.agentClient), new DefaultApi(this.podClient), this.authSession);
return new MessageService(new MessagesApi(agentClient), new MessageApi(podClient),
new MessageSuppressionApi(podClient), new StreamsApi(podClient), new PodApi(podClient),
new AttachmentsApi(agentClient), new DefaultApi(podClient), authSession, retryBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ protected SymphonyBdk(BdkConfig config, ApiClientFactory apiClientFactory)
AuthSession botSession = authenticatorFactory.getBotAuthenticator().authenticateBot();
this.oboAuthenticator = config.isOboConfigured() ? authenticatorFactory.getOboAuthenticator() : null;
this.extensionAppAuthenticator = config.isOboConfigured() ? authenticatorFactory.getExtensionAppAuthenticator() : null;
ServiceFactory serviceFactory =
new ServiceFactory(apiClientFactory, botSession, config);
ServiceFactory serviceFactory = new ServiceFactory(apiClientFactory, botSession, config);
// service init
this.sessionService = serviceFactory.getSessionService();
this.userService = serviceFactory.getUserService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,48 @@
import lombok.Getter;
import lombok.Setter;

/**
* Class holding the whole BDK configuration
*/
@Getter
@Setter
public class BdkConfig {

private static final String DEFAULT_SCHEME = "https";
private static final int DEFAULT_HTTPS_PORT = 443;

private String scheme = DEFAULT_SCHEME;
private String host;
private Integer port = DEFAULT_HTTPS_PORT;
private String context = "";

private BdkClientConfig agent = new BdkClientConfig();
private BdkClientConfig pod = new BdkClientConfig();
private BdkClientConfig keyManager = new BdkClientConfig();
private BdkClientConfig sessionAuth = new BdkClientConfig();

private BdkBotConfig bot = new BdkBotConfig();
private BdkExtAppConfig app = new BdkExtAppConfig();
private BdkSslConfig ssl = new BdkSslConfig();

private BdkRetryConfig retry = new BdkRetryConfig();
private BdkDatafeedConfig datafeed = new BdkDatafeedConfig();

public boolean isOboConfigured() {
return app.isConfigured();
}
private static final String DEFAULT_SCHEME = "https";
private static final int DEFAULT_HTTPS_PORT = 443;

private String scheme = DEFAULT_SCHEME;
private String host;
private Integer port = DEFAULT_HTTPS_PORT;
private String context = "";

private BdkClientConfig agent = new BdkClientConfig();
private BdkClientConfig pod = new BdkClientConfig();
private BdkClientConfig keyManager = new BdkClientConfig();
private BdkClientConfig sessionAuth = new BdkClientConfig();

private BdkBotConfig bot = new BdkBotConfig();
private BdkExtAppConfig app = new BdkExtAppConfig();
private BdkSslConfig ssl = new BdkSslConfig();

private BdkRetryConfig retry = new BdkRetryConfig();
private BdkDatafeedConfig datafeed = new BdkDatafeedConfig();

/**
* Check if OBO is configured. Checks {@link BdkExtAppConfig#isConfigured()} on field {@link #app}.
*
* @return true if OBO is configured.
*/
public boolean isOboConfigured() {
return app.isConfigured();
}

/**
* Returns the retry configuration used for DataFeed services.
*
* @return {@link BdkDatafeedConfig#getRetry()} from {@link #datafeed} if not null, {@link #retry} otherwise
*/
public BdkRetryConfig getDatafeedRetryConfig() {
return datafeed.getRetry() == null ? retry : datafeed.getRetry();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.symphony.bdk.core.retry;


import com.symphony.bdk.core.util.function.ConsumerWithThrowable;
import com.symphony.bdk.core.util.function.SupplierWithApiException;
import com.symphony.bdk.http.api.ApiException;

import com.symphony.bdk.http.api.ApiRuntimeException;

import lombok.extern.slf4j.Slf4j;
import org.apiguardian.api.API;

import java.util.Map;
import java.util.function.Predicate;

/**
* Abstract class to implement a retry mechanism with recovery strategies,
* e.g. refresh a session in a case of session expiration.
* @param <T> the type of the object to be eventually returned by the {@link #supplier}
*/
@Slf4j
@API(status = API.Status.INTERNAL)
public abstract class RetryWithRecovery<T> {
private SupplierWithApiException<T> supplier;
private Predicate<ApiException> ignoreApiException;
private Map<Predicate<ApiException>, ConsumerWithThrowable> recoveryStrategies;

/**
* This is a helper function designed to cover most of the retry cases.
* It retries on the conditions defined by {@link RetryWithRecoveryBuilder#isNetworkOrMinorError}
* and refreshes the authSession if we get an unauthorized error.
*
* @param baseRetryBuilder the {@link RetryWithRecoveryBuilder} containing the base settings for the retry mechanism.
* @param name the name of the retry, can be any string but should specific to the function neing retried.
* @param supplier the supplier returning the desired object which may fail with an exception.
* @param <T> the type of the object to be returned by the supplier.
* @return the object returned by the supplier
* @throws ApiRuntimeException if a non-handled {@link ApiException} thrown or if the max number of retries has been reached.
* @throws RuntimeException if any other exception thrown.
*/
public static <T> T executeAndRetry(RetryWithRecoveryBuilder baseRetryBuilder, String name, SupplierWithApiException<T> supplier) {
RetryWithRecovery<T> retry = RetryWithRecoveryBuilder.<T>from(baseRetryBuilder)
.name(name)
.supplier(supplier)
.build();

try {
return retry.execute();
} catch (ApiException e) {
throw new ApiRuntimeException(e);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}

public RetryWithRecovery(SupplierWithApiException<T> supplier, Predicate<ApiException> ignoreApiException,
Map<Predicate<ApiException>, ConsumerWithThrowable> recoveryStrategies) {
this.supplier = supplier;
this.ignoreApiException = ignoreApiException;
this.recoveryStrategies = recoveryStrategies;
}

/**
* Method called by client which should implement the retry.
* This should call {@link #executeOnce()} which executes one actual call to the supplier, runs potential recovery
* actions and potentially throws an exception.
*
* @return the object returned by the supplier.
* @throws Throwable in case the max number of retries exhausted
* or if any other exception thrown by the supplier or the recovery functions.
*/
public abstract T execute() throws Throwable;

/**
* This implements the logic corresponding to one retry:
* calls the {@link #supplier}, catches the potential {@link ApiException},
* return null if it satisfies {@link #ignoreApiException}
* and runs the recovery functions if it matches its corresponding condition.
* This should be called by any implementation of {@link #execute()}.
*
* @return the object returned by the {@link #supplier}.
* @throws Throwable in case an exception has been thrown by the {@link #supplier} or by the recovery functions.
*/
protected T executeOnce() throws Throwable {
try {
return supplier.get();
} catch (ApiException e) {
if (ignoreApiException.test(e)) {
log.debug("Exception ignored: {}", e);
return null;
}

handleRecovery(e);
throw e;
}
}

private void handleRecovery(ApiException e) throws Throwable {
boolean recoveryTriggered = false;

for (Map.Entry<Predicate<ApiException>, ConsumerWithThrowable> entry : recoveryStrategies.entrySet()) {
if (entry.getKey().test(e)) {
log.debug("Exception recovered: {}", e);
recoveryTriggered = true;
entry.getValue().consume();
}
}

if (!recoveryTriggered) {
log.error("Error {}: {}", e.getCode(), e.getMessage());
}
}
}
Loading