diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/ServiceFactory.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/ServiceFactory.java index 6449cb127..3c1d28e32 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/ServiceFactory.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/ServiceFactory.java @@ -1,9 +1,9 @@ package com.symphony.bdk.core; -import com.symphony.bdk.http.api.ApiClient; import com.symphony.bdk.core.auth.AuthSession; import com.symphony.bdk.core.client.ApiClientFactory; import com.symphony.bdk.core.config.model.BdkConfig; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.MessageService; import com.symphony.bdk.core.service.SessionService; import com.symphony.bdk.core.service.datafeed.DatafeedService; @@ -23,6 +23,9 @@ import com.symphony.bdk.gen.api.StreamsApi; import com.symphony.bdk.gen.api.UserApi; import com.symphony.bdk.gen.api.UsersApi; +import com.symphony.bdk.http.api.ApiClient; + +import com.symphony.bdk.http.api.ApiException; import org.apiguardian.api.API; @@ -44,12 +47,16 @@ class ServiceFactory { private final ApiClient agentClient; private final AuthSession authSession; private final BdkConfig config; + private final RetryWithRecoveryBuilder retryBuilder; public ServiceFactory(ApiClientFactory apiClientFactory, AuthSession authSession, BdkConfig config) { this.podClient = apiClientFactory.getPodClient(); this.agentClient = apiClientFactory.getAgentClient(); this.authSession = authSession; this.config = config; + this.retryBuilder = new RetryWithRecoveryBuilder<>() + .retryConfig(config.getRetry()) + .recoveryStrategy(ApiException::isUnauthorized, authSession::refresh); } /** @@ -58,7 +65,7 @@ public ServiceFactory(ApiClientFactory apiClientFactory, AuthSession authSession * @return an new {@link UserService} instance. */ public UserService getUserService() { - return new UserService(new UserApi(podClient), new UsersApi(podClient), authSession ); + return new UserService(new UserApi(podClient), new UsersApi(podClient), authSession, retryBuilder); } /** @@ -67,7 +74,7 @@ public UserService getUserService() { * @return an new {@link StreamService} instance. */ public StreamService getStreamService() { - return new StreamService(new StreamsApi(podClient), authSession); + return new StreamService(new StreamsApi(podClient), authSession, retryBuilder); } /** @@ -76,7 +83,7 @@ public StreamService getStreamService() { * @return an new {@link SessionService} instance. */ public SessionService getSessionService() { - return new SessionService(new SessionApi(podClient)); + return new SessionService(new SessionApi(podClient), new RetryWithRecoveryBuilder<>().retryConfig(config.getRetry())); } /** @@ -97,8 +104,8 @@ public DatafeedService getDatafeedService() { * @return an new {@link MessageService} instance. */ public MessageService getMessageService() { - return new MessageService(new MessagesApi(this.agentClient), new MessageApi(this.podClient), - new MessageSuppressionApi(this.podClient), new StreamsApi(this.podClient), new PodApi(this.podClient), - new AttachmentsApi(this.agentClient), new DefaultApi(this.podClient), this.authSession); + return new MessageService(new MessagesApi(agentClient), new MessageApi(podClient), + new MessageSuppressionApi(podClient), new StreamsApi(podClient), new PodApi(podClient), + new AttachmentsApi(agentClient), new DefaultApi(podClient), authSession, retryBuilder); } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/SymphonyBdk.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/SymphonyBdk.java index 77a5136b1..df7953e1a 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/SymphonyBdk.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/SymphonyBdk.java @@ -52,8 +52,7 @@ protected SymphonyBdk(BdkConfig config, ApiClientFactory apiClientFactory) AuthSession botSession = authenticatorFactory.getBotAuthenticator().authenticateBot(); this.oboAuthenticator = config.isOboConfigured() ? authenticatorFactory.getOboAuthenticator() : null; this.extensionAppAuthenticator = config.isOboConfigured() ? authenticatorFactory.getExtensionAppAuthenticator() : null; - ServiceFactory serviceFactory = - new ServiceFactory(apiClientFactory, botSession, config); + ServiceFactory serviceFactory = new ServiceFactory(apiClientFactory, botSession, config); // service init this.sessionService = serviceFactory.getSessionService(); this.userService = serviceFactory.getUserService(); diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/config/model/BdkConfig.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/config/model/BdkConfig.java index 4acbbd8ff..116a508c1 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/config/model/BdkConfig.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/config/model/BdkConfig.java @@ -3,31 +3,48 @@ import lombok.Getter; import lombok.Setter; +/** + * Class holding the whole BDK configuration + */ @Getter @Setter public class BdkConfig { - private static final String DEFAULT_SCHEME = "https"; - private static final int DEFAULT_HTTPS_PORT = 443; - - private String scheme = DEFAULT_SCHEME; - private String host; - private Integer port = DEFAULT_HTTPS_PORT; - private String context = ""; - - private BdkClientConfig agent = new BdkClientConfig(); - private BdkClientConfig pod = new BdkClientConfig(); - private BdkClientConfig keyManager = new BdkClientConfig(); - private BdkClientConfig sessionAuth = new BdkClientConfig(); - - private BdkBotConfig bot = new BdkBotConfig(); - private BdkExtAppConfig app = new BdkExtAppConfig(); - private BdkSslConfig ssl = new BdkSslConfig(); - - private BdkRetryConfig retry = new BdkRetryConfig(); - private BdkDatafeedConfig datafeed = new BdkDatafeedConfig(); - - public boolean isOboConfigured() { - return app.isConfigured(); - } + private static final String DEFAULT_SCHEME = "https"; + private static final int DEFAULT_HTTPS_PORT = 443; + + private String scheme = DEFAULT_SCHEME; + private String host; + private Integer port = DEFAULT_HTTPS_PORT; + private String context = ""; + + private BdkClientConfig agent = new BdkClientConfig(); + private BdkClientConfig pod = new BdkClientConfig(); + private BdkClientConfig keyManager = new BdkClientConfig(); + private BdkClientConfig sessionAuth = new BdkClientConfig(); + + private BdkBotConfig bot = new BdkBotConfig(); + private BdkExtAppConfig app = new BdkExtAppConfig(); + private BdkSslConfig ssl = new BdkSslConfig(); + + private BdkRetryConfig retry = new BdkRetryConfig(); + private BdkDatafeedConfig datafeed = new BdkDatafeedConfig(); + + /** + * Check if OBO is configured. Checks {@link BdkExtAppConfig#isConfigured()} on field {@link #app}. + * + * @return true if OBO is configured. + */ + public boolean isOboConfigured() { + return app.isConfigured(); + } + + /** + * Returns the retry configuration used for DataFeed services. + * + * @return {@link BdkDatafeedConfig#getRetry()} from {@link #datafeed} if not null, {@link #retry} otherwise + */ + public BdkRetryConfig getDatafeedRetryConfig() { + return datafeed.getRetry() == null ? retry : datafeed.getRetry(); + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/RetryWithRecovery.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/RetryWithRecovery.java new file mode 100644 index 000000000..b60a10ebf --- /dev/null +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/RetryWithRecovery.java @@ -0,0 +1,113 @@ +package com.symphony.bdk.core.retry; + + +import com.symphony.bdk.core.util.function.ConsumerWithThrowable; +import com.symphony.bdk.core.util.function.SupplierWithApiException; +import com.symphony.bdk.http.api.ApiException; + +import com.symphony.bdk.http.api.ApiRuntimeException; + +import lombok.extern.slf4j.Slf4j; +import org.apiguardian.api.API; + +import java.util.Map; +import java.util.function.Predicate; + +/** + * Abstract class to implement a retry mechanism with recovery strategies, + * e.g. refresh a session in a case of session expiration. + * @param the type of the object to be eventually returned by the {@link #supplier} + */ +@Slf4j +@API(status = API.Status.INTERNAL) +public abstract class RetryWithRecovery { + private SupplierWithApiException supplier; + private Predicate ignoreApiException; + private Map, ConsumerWithThrowable> recoveryStrategies; + + /** + * This is a helper function designed to cover most of the retry cases. + * It retries on the conditions defined by {@link RetryWithRecoveryBuilder#isNetworkOrMinorError} + * and refreshes the authSession if we get an unauthorized error. + * + * @param baseRetryBuilder the {@link RetryWithRecoveryBuilder} containing the base settings for the retry mechanism. + * @param name the name of the retry, can be any string but should specific to the function neing retried. + * @param supplier the supplier returning the desired object which may fail with an exception. + * @param the type of the object to be returned by the supplier. + * @return the object returned by the supplier + * @throws ApiRuntimeException if a non-handled {@link ApiException} thrown or if the max number of retries has been reached. + * @throws RuntimeException if any other exception thrown. + */ + public static T executeAndRetry(RetryWithRecoveryBuilder baseRetryBuilder, String name, SupplierWithApiException supplier) { + RetryWithRecovery retry = RetryWithRecoveryBuilder.from(baseRetryBuilder) + .name(name) + .supplier(supplier) + .build(); + + try { + return retry.execute(); + } catch (ApiException e) { + throw new ApiRuntimeException(e); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + public RetryWithRecovery(SupplierWithApiException supplier, Predicate ignoreApiException, + Map, ConsumerWithThrowable> recoveryStrategies) { + this.supplier = supplier; + this.ignoreApiException = ignoreApiException; + this.recoveryStrategies = recoveryStrategies; + } + + /** + * Method called by client which should implement the retry. + * This should call {@link #executeOnce()} which executes one actual call to the supplier, runs potential recovery + * actions and potentially throws an exception. + * + * @return the object returned by the supplier. + * @throws Throwable in case the max number of retries exhausted + * or if any other exception thrown by the supplier or the recovery functions. + */ + public abstract T execute() throws Throwable; + + /** + * This implements the logic corresponding to one retry: + * calls the {@link #supplier}, catches the potential {@link ApiException}, + * return null if it satisfies {@link #ignoreApiException} + * and runs the recovery functions if it matches its corresponding condition. + * This should be called by any implementation of {@link #execute()}. + * + * @return the object returned by the {@link #supplier}. + * @throws Throwable in case an exception has been thrown by the {@link #supplier} or by the recovery functions. + */ + protected T executeOnce() throws Throwable { + try { + return supplier.get(); + } catch (ApiException e) { + if (ignoreApiException.test(e)) { + log.debug("Exception ignored: {}", e); + return null; + } + + handleRecovery(e); + throw e; + } + } + + private void handleRecovery(ApiException e) throws Throwable { + boolean recoveryTriggered = false; + + for (Map.Entry, ConsumerWithThrowable> entry : recoveryStrategies.entrySet()) { + if (entry.getKey().test(e)) { + log.debug("Exception recovered: {}", e); + recoveryTriggered = true; + entry.getValue().consume(); + } + } + + if (!recoveryTriggered) { + log.error("Error {}: {}", e.getCode(), e.getMessage()); + } + } +} diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/RetryWithRecoveryBuilder.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/RetryWithRecoveryBuilder.java new file mode 100644 index 000000000..b441c876a --- /dev/null +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/RetryWithRecoveryBuilder.java @@ -0,0 +1,177 @@ +package com.symphony.bdk.core.retry; + +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.retry.resilience4j.Resilience4jRetryWithRecovery; +import com.symphony.bdk.core.util.function.ConsumerWithThrowable; +import com.symphony.bdk.core.util.function.SupplierWithApiException; + +import com.symphony.bdk.http.api.ApiException; + +import org.apiguardian.api.API; + +import javax.ws.rs.ProcessingException; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Builder class to facilitate the instantiation of a {@link RetryWithRecovery}. + * + * @param the type to be returned by {@link RetryWithRecovery#execute()}. + */ +@API(status = API.Status.INTERNAL) +public class RetryWithRecoveryBuilder { + private String name; + private BdkRetryConfig retryConfig; + private SupplierWithApiException supplier; + private Predicate retryOnExceptionPredicate; + private Predicate ignoreException; + private Map, ConsumerWithThrowable> recoveryStrategies; + + /** + * Copies all fields of an existing builder except the {@link #supplier}. + * + * @param from the {@link RetryWithRecovery} to be copied. + * @param the target parametrized type. + * @return a copy of the builder passed as parameter. + */ + public static RetryWithRecoveryBuilder from(RetryWithRecoveryBuilder from) { + RetryWithRecoveryBuilder copy = new RetryWithRecoveryBuilder(); + copy.name = from.name; + copy.retryConfig = from.retryConfig; + copy.retryOnExceptionPredicate = from.retryOnExceptionPredicate; + copy.ignoreException = from.ignoreException; + copy.recoveryStrategies = new HashMap<>(from.recoveryStrategies); + + return copy; + } + + /** + * Checks if a throwable is a {@link ProcessingException} or a {@link ApiException} minor error. + * This is the default function used in {@link RetryWithRecovery} + * to check if a given exception thrown should lead to a retry. + * + * @param t the throwable to be checked. + * @return true if passed throwable is a {@link ProcessingException} (e.g. in case of a temporary network exception) + * or if it is a {@link ApiException} which {@link ApiException#isMinorError()}. + */ + public static boolean isNetworkOrMinorError(Throwable t) { + if (t instanceof ApiException) { + return ((ApiException) t).isMinorError(); + } + return t instanceof ProcessingException; + } + + /** + * Checks if a throwable is a {@link ProcessingException} or a {@link ApiException} minor error or client error. + * + * @param t the throwable to be checked. + * @return true if passed throwable is a {@link ProcessingException} (e.g. in case of a temporary network exception) + * or if it is a {@link ApiException} which {@link ApiException#isMinorError()} or {@link ApiException#isClientError()}. + */ + public static boolean isNetworkOrMinorErrorOrClientError(Throwable t) { + if (t instanceof ApiException) { + ApiException apiException = (ApiException) t; + return apiException.isMinorError() || apiException.isClientError(); + } + return t instanceof ProcessingException; + } + + /** + * Default constructor which ignores no exception + * and retries exceptions fulfilling {@link RetryWithRecoveryBuilder#isNetworkOrMinorError}. + */ + public RetryWithRecoveryBuilder() { + this.recoveryStrategies = new HashMap<>(); + this.ignoreException = e -> false; + this.retryOnExceptionPredicate = RetryWithRecoveryBuilder::isNetworkOrMinorError; + this.retryConfig = new BdkRetryConfig(); + } + + /** + * Sets the name and returns the modified builder. + * + * @param name the name of the {@link RetryWithRecovery} + * @return the modified builder instance. + */ + public RetryWithRecoveryBuilder name(String name) { + this.name = name; + return this; + } + + /** + * Sets the retry configuration and returns the modified builder. + * + * @param retryConfig the retry configuration to be used. + * @return the modified builder instance. + */ + public RetryWithRecoveryBuilder retryConfig(BdkRetryConfig retryConfig) { + this.retryConfig = retryConfig; + return this; + } + + /** + * Sets the retry configuration and returns the modified builder. + * + * @param supplier the function to be called by the {@link RetryWithRecovery} + * which returns the desired object and which may fail. + * @return the modified builder instance. + */ + public RetryWithRecoveryBuilder supplier(SupplierWithApiException supplier) { + this.supplier = supplier; + return this; + } + + /** + * Sets the conditions on which we should retry the call to the provided {@link #supplier}. + * + * @param retryOnExceptionPredicate the condition when we should retry the call + * when the {@link #supplier} throws an exception. + * @return the modified builder instance. + */ + public RetryWithRecoveryBuilder retryOnException(Predicate retryOnExceptionPredicate) { + this.retryOnExceptionPredicate = retryOnExceptionPredicate; + return this; + } + + /** + * Sets the condition on which we should ignore an exception thrown by the {@link #supplier} + * and return null in {@link RetryWithRecovery#execute()}. + * + * @param ignoreException the condition when we should ignore a given exception + * @return the modified builder instance. + */ + public RetryWithRecoveryBuilder ignoreException(Predicate ignoreException) { + this.ignoreException = ignoreException; + return this; + } + + /** + * Sets one recovery strategy which consists in a predicate on the thrown {@link ApiException} + * and in a corresponding recovery function to be executed when condition is met. + * + * @param condition the predicate to check if the exception should lead to the execution of the recovery function. + * @param recovery the recovery function to be executed when condition is fulfilled. + * @return + */ + public RetryWithRecoveryBuilder recoveryStrategy(Predicate condition, ConsumerWithThrowable recovery) { + this.recoveryStrategies.put(condition, recovery); + return this; + } + + public RetryWithRecoveryBuilder clearRecoveryStrategies() { + this.recoveryStrategies.clear(); + return this; + } + + /** + * Builds a {@link RetryWithRecovery} based on provided fields. + * + * @return a new instance of {@link RetryWithRecovery} based on the provided fields. + */ + public RetryWithRecovery build() { + return new Resilience4jRetryWithRecovery<>(name, retryConfig, supplier, retryOnExceptionPredicate, ignoreException, + recoveryStrategies); + } +} diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/resilience4j/Resilience4jRetryWithRecovery.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/resilience4j/Resilience4jRetryWithRecovery.java new file mode 100644 index 000000000..31bf57f00 --- /dev/null +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/retry/resilience4j/Resilience4jRetryWithRecovery.java @@ -0,0 +1,92 @@ +package com.symphony.bdk.core.retry.resilience4j; + +import com.symphony.bdk.core.config.model.BdkRetryConfig; + +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.util.BdkExponentialFunction; + +import com.symphony.bdk.core.util.function.ConsumerWithThrowable; +import com.symphony.bdk.core.util.function.SupplierWithApiException; + +import com.symphony.bdk.http.api.ApiException; + +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import lombok.extern.slf4j.Slf4j; +import org.apiguardian.api.API; + +import java.util.Map; +import java.util.function.Predicate; + +/** + * This class aims to implement a retry mechanism (on top of a{@link Retry}) + * with different recovery strategies based on predicates. + * @param the type of the object returned by {@link #execute()} + */ +@Slf4j +@API(status = API.Status.INTERNAL) +public class Resilience4jRetryWithRecovery extends RetryWithRecovery { + private final Retry retry; + + /** + * Constructor with no predicate on when to ignore an {@link ApiException}, + * i.e. ApiExceptions will never be ignored. + * @param name the name of the {@link Retry} service. + * @param bdkRetryConfig the retry configuration to be used. + * @param supplier the supplier responsible to provide the object of param type T and which may throw an {@link ApiException}. + * @param retryOnExceptionPredicate predicate on a thrown {@link ApiException} to know if call should be retried. + * @param recoveryStrategies mapping between {@link Predicate} and the corresponding recovery functions to be executed before retrying. + * If several predicates match, all corresponding consumers will be executed. + */ + public Resilience4jRetryWithRecovery(String name, BdkRetryConfig bdkRetryConfig, SupplierWithApiException supplier, + Predicate retryOnExceptionPredicate, + Map, ConsumerWithThrowable> recoveryStrategies) { + this(name, bdkRetryConfig, supplier, retryOnExceptionPredicate, (e) -> false, recoveryStrategies); + } + + /** + * Constructor with a predicate on when to ignore an {@link ApiException}, + * i.e. ApiExceptions will be ignored if the predicate matches. + * @param name the name of the {@link Retry} service. + * @param bdkRetryConfig the retry configuration to be used. + * @param supplier the supplier responsible to provide the object of param type T and which may throw an {@link ApiException}. + * @param retryOnExceptionPredicate predicate on a thrown {@link ApiException} to know if call should be retried. + * @param ignoreApiException predicate on a thrown {@link ApiException} to know if exception should be ignored, + * which means no subsequent retry will be made and null value will be returned. + * @param recoveryStrategies mapping between {@link Predicate} and the corresponding recovery functions to be executed before retrying. + * If several predicates match, all corresponding consumers will be executed. + */ + public Resilience4jRetryWithRecovery(String name, BdkRetryConfig bdkRetryConfig, SupplierWithApiException supplier, + Predicate retryOnExceptionPredicate, Predicate ignoreApiException, + Map, ConsumerWithThrowable> recoveryStrategies) { + super(supplier, ignoreApiException, recoveryStrategies); + this.retry = createRetry(name, bdkRetryConfig, retryOnExceptionPredicate); + } + + /** + * {@inheritDoc} + */ + public T execute() throws Throwable { + return this.retry.executeCheckedSupplier(this::executeOnce); + } + + private Retry createRetry(String name, BdkRetryConfig bdkRetryConfig, + Predicate retryOnExceptionPredicate) { + RetryConfig retryConfig = RetryConfig.custom() + .maxAttempts(bdkRetryConfig.getMaxAttempts()) + .intervalFunction(BdkExponentialFunction.ofExponentialBackoff(bdkRetryConfig)) + .retryOnException(retryOnExceptionPredicate) + .build(); + + Retry retry = Retry.of(name, retryConfig); + retry.getEventPublisher().onRetry(event -> { + double interval = event.getWaitInterval().toMillis() / 1000.0; + if (event.getLastThrowable() != null) { + log.debug("{} service failed due to {}", name, event.getLastThrowable().getMessage()); + } + log.info("Retry in {}s...", interval); + }); + + return retry; + } +} diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/MessageService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/MessageService.java index 235770219..18bc8f89c 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/MessageService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/MessageService.java @@ -1,13 +1,13 @@ package com.symphony.bdk.core.service; -import static com.symphony.bdk.core.util.SupplierWithApiException.callAndCatchApiException; - -import com.symphony.bdk.http.api.util.ApiUtils; +import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.pagination.PaginatedApi; import com.symphony.bdk.core.service.pagination.PaginatedService; -import com.symphony.bdk.core.auth.AuthSession; import com.symphony.bdk.core.service.stream.constant.AttachmentSort; +import com.symphony.bdk.core.util.function.SupplierWithApiException; import com.symphony.bdk.gen.api.AttachmentsApi; import com.symphony.bdk.gen.api.DefaultApi; import com.symphony.bdk.gen.api.MessageApi; @@ -25,6 +25,7 @@ import com.symphony.bdk.gen.api.model.V4ImportedMessage; import com.symphony.bdk.gen.api.model.V4Message; import com.symphony.bdk.gen.api.model.V4Stream; +import com.symphony.bdk.http.api.util.ApiUtils; import com.symphony.bdk.template.api.TemplateEngine; import com.symphony.bdk.template.api.TemplateException; import com.symphony.bdk.template.api.TemplateResolver; @@ -55,23 +56,25 @@ public class MessageService { private final DefaultApi defaultApi; private final AuthSession authSession; private final TemplateResolver templateResolver; + private final RetryWithRecoveryBuilder retryBuilder; public MessageService(MessagesApi messagesApi, MessageApi messageApi, MessageSuppressionApi messageSuppressionApi, - StreamsApi streamsApi, PodApi podApi, AttachmentsApi attachmentsApi, DefaultApi defaultApi, AuthSession authSession) { + StreamsApi streamsApi, PodApi podApi, AttachmentsApi attachmentsApi, DefaultApi defaultApi, + AuthSession authSession, RetryWithRecoveryBuilder retryBuilder) { this(messagesApi, messageApi, messageSuppressionApi, streamsApi, podApi, attachmentsApi, defaultApi, authSession, - new TemplateResolver()); + new TemplateResolver(), retryBuilder); } public MessageService(MessagesApi messagesApi, MessageApi messageApi, MessageSuppressionApi messageSuppressionApi, - StreamsApi streamsApi, PodApi podApi, AttachmentsApi attachmentsApi, DefaultApi defaultApi, AuthSession authSession, - TemplateEngine templateEngine) { + StreamsApi streamsApi, PodApi podApi, AttachmentsApi attachmentsApi, DefaultApi defaultApi, + AuthSession authSession, TemplateEngine templateEngine, RetryWithRecoveryBuilder retryBuilder) { this(messagesApi, messageApi, messageSuppressionApi, streamsApi, podApi, attachmentsApi, defaultApi, authSession, - new TemplateResolver(templateEngine)); + new TemplateResolver(templateEngine), retryBuilder); } private MessageService(MessagesApi messagesApi, MessageApi messageApi, MessageSuppressionApi messageSuppressionApi, - StreamsApi streamsApi, PodApi podApi, AttachmentsApi attachmentsApi, DefaultApi defaultApi, AuthSession authSession, - TemplateResolver templateResolver) { + StreamsApi streamsApi, PodApi podApi, AttachmentsApi attachmentsApi, DefaultApi defaultApi, + AuthSession authSession, TemplateResolver templateResolver, RetryWithRecoveryBuilder retryBuilder) { this.messagesApi = messagesApi; this.messageApi = messageApi; this.messageSuppressionApi = messageSuppressionApi; @@ -81,15 +84,16 @@ private MessageService(MessagesApi messagesApi, MessageApi messageApi, MessageSu this.authSession = authSession; this.templateResolver = templateResolver; this.defaultApi = defaultApi; + this.retryBuilder = retryBuilder; } /** * Get messages from an existing stream. Additionally returns any attachments associated with the message. * * @param stream the stream where to look for messages - * @param since instant of the earliest possible date of the first message returned. - * @param skip number of messages to skip. Optional and defaults to 0. - * @param limit maximum number of messages to return. Optional and defaults to 50. + * @param since instant of the earliest possible date of the first message returned. + * @param skip number of messages to skip. Optional and defaults to 0. + * @param limit maximum number of messages to return. Optional and defaults to 50. * @return the list of matching messages in the stream. * @see Messages */ @@ -101,44 +105,45 @@ public List getMessages(@Nonnull V4Stream stream, @Nonnull Instant si * Get messages from an existing stream. Additionally returns any attachments associated with the message. * * @param streamId the streamID where to look for messages - * @param since instant of the earliest possible date of the first message returned. - * @param skip number of messages to skip. Optional and defaults to 0. - * @param limit maximum number of messages to return. Optional and defaults to 50. + * @param since instant of the earliest possible date of the first message returned. + * @param skip number of messages to skip. Optional and defaults to 0. + * @param limit maximum number of messages to return. Optional and defaults to 50. * @return the list of matching messages in the stream. * @see Messages */ public List getMessages(@Nonnull String streamId, @Nonnull Instant since, Integer skip, Integer limit) { - return callAndCatchApiException(() -> messagesApi.v4StreamSidMessageGet(streamId, getEpochMillis(since), + return executeAndRetry("getMessages", () -> messagesApi.v4StreamSidMessageGet(streamId, getEpochMillis(since), authSession.getSessionToken(), authSession.getKeyManagerToken(), skip, limit)); } /** * Get messages from an existing stream. Additionally returns any attachments associated with the message. * - * @param stream the stream where to look for messages - * @param since instant of the earliest possible date of the first message returned. + * @param stream the stream where to look for messages + * @param since instant of the earliest possible date of the first message returned. * @param chunkSize size of elements to retrieve in one call. Optional and defaults to 50. * @param totalSize total maximum number of messages to return. Optional and defaults to 50. * @return a {@link Stream} of matching messages in the stream. * @see Messages */ - public Stream getMessagesStream(@Nonnull V4Stream stream, @Nonnull Instant since, Integer chunkSize, Integer totalSize) { + public Stream getMessagesStream(@Nonnull V4Stream stream, @Nonnull Instant since, Integer chunkSize, + Integer totalSize) { return getMessagesStream(stream.getStreamId(), since, chunkSize, totalSize); } /** * Get messages from an existing stream. Additionally returns any attachments associated with the message. * - * @param streamId the streamID where to look for messages - * @param since instant of the earliest possible date of the first message returned. + * @param streamId the streamID where to look for messages + * @param since instant of the earliest possible date of the first message returned. * @param chunkSize size of elements to retrieve in one call. Optional and defaults to 50. * @param totalSize total maximum number of messages to return. Optional and defaults to 50. * @return a {@link Stream} of matching messages in the stream. * @see Messages */ - public Stream getMessagesStream(@Nonnull String streamId, @Nonnull Instant since, Integer chunkSize, Integer totalSize) { - PaginatedApi api = ((offset, limit) -> messagesApi.v4StreamSidMessageGet(streamId, getEpochMillis(since), - authSession.getSessionToken(), authSession.getKeyManagerToken(), offset, limit)); + public Stream getMessagesStream(@Nonnull String streamId, @Nonnull Instant since, Integer chunkSize, + Integer totalSize) { + PaginatedApi api = ((offset, limit) -> getMessages(streamId, since, offset, limit)); final int actualChunkSize = chunkSize == null ? 50 : chunkSize.intValue(); final int actualTotalSize = totalSize == null ? 50 : totalSize.intValue(); @@ -167,7 +172,8 @@ public V4Message send(@Nonnull V4Stream stream, @Nonnull String message) { * @return a {@link V4Message} object containing the details of the sent message * @see Create Message v4 */ - public V4Message send(@Nonnull V4Stream stream, @Nonnull String template, Object parameters) throws TemplateException { + public V4Message send(@Nonnull V4Stream stream, @Nonnull String template, Object parameters) + throws TemplateException { return send(stream.getStreamId(), templateResolver.resolve(template).process(parameters)); } @@ -180,7 +186,8 @@ public V4Message send(@Nonnull V4Stream stream, @Nonnull String template, Object * @return a {@link V4Message} object containing the details of the sent message * @throws TemplateException */ - public V4Message send(@Nonnull String streamId, @Nonnull String template, Object parameters) throws TemplateException { + public V4Message send(@Nonnull String streamId, @Nonnull String template, Object parameters) + throws TemplateException { return send(streamId, templateResolver.resolve(template).process(parameters)); } @@ -193,7 +200,7 @@ public V4Message send(@Nonnull String streamId, @Nonnull String template, Object * @see Create Message v4 */ public V4Message send(@Nonnull String streamId, @Nonnull String message) { - return callAndCatchApiException(() -> messagesApi.v4StreamSidMessageCreatePost( + return executeAndRetry("send", () -> messagesApi.v4StreamSidMessageCreatePost( streamId, authSession.getSessionToken(), authSession.getKeyManagerToken(), @@ -208,15 +215,16 @@ public V4Message send(@Nonnull String streamId, @Nonnull String message) { /** * Downloads the attachment body by the stream ID, message ID and attachment ID. * - * @param streamId the stream ID where to look for the attachment - * @param messageId the ID of the message containing the attachment + * @param streamId the stream ID where to look for the attachment + * @param messageId the ID of the message containing the attachment * @param attachmentId the ID of the attachment * @return a byte array of attachment encoded in base 64 * @see Attachment */ public byte[] getAttachment(@Nonnull String streamId, @Nonnull String messageId, @Nonnull String attachmentId) { - return callAndCatchApiException(() -> attachmentsApi.v1StreamSidAttachmentGet(streamId, attachmentId, messageId, - authSession.getSessionToken(), authSession.getKeyManagerToken())); + return executeAndRetry("getAttachment", + () -> attachmentsApi.v1StreamSidAttachmentGet(streamId, attachmentId, messageId, + authSession.getSessionToken(), authSession.getKeyManagerToken())); } /** @@ -227,7 +235,7 @@ public byte[] getAttachment(@Nonnull String streamId, @Nonnull String messageId, * @see Import Message */ public List importMessages(List messages) { - return callAndCatchApiException(() -> messagesApi.v4MessageImportPost(authSession.getSessionToken(), + return executeAndRetry("importMessages", () -> messagesApi.v4MessageImportPost(authSession.getSessionToken(), authSession.getKeyManagerToken(), messages)); } @@ -239,7 +247,7 @@ public List importMessages(List messages) { * @see Suppress Message */ public MessageSuppressionResponse suppressMessage(@Nonnull String messageId) { - return callAndCatchApiException(() -> + return executeAndRetry("suppressMessage", () -> messageSuppressionApi.v1AdminMessagesuppressionIdSuppressPost(messageId, authSession.getSessionToken())); } @@ -252,7 +260,8 @@ public MessageSuppressionResponse suppressMessage(@Nonnull String messageId) { * @see Message Status */ public MessageStatus getMessageStatus(@Nonnull String messageId) { - return callAndCatchApiException(() -> messageApi.v1MessageMidStatusGet(messageId, authSession.getSessionToken())); + return executeAndRetry("getMessageStatus", + () -> messageApi.v1MessageMidStatusGet(messageId, authSession.getSessionToken())); } /** @@ -262,7 +271,7 @@ public MessageStatus getMessageStatus(@Nonnull String messageId) { * @see Attachment Types */ public List getAttachmentTypes() { - return callAndCatchApiException(() -> podApi.v1FilesAllowedTypesGet(authSession.getSessionToken())); + return executeAndRetry("getAttachmentTypes", () -> podApi.v1FilesAllowedTypesGet(authSession.getSessionToken())); } /** @@ -273,7 +282,7 @@ public List getAttachmentTypes() { * @see Get Message v1 */ public V4Message getMessage(@Nonnull String messageId) { - return callAndCatchApiException(() -> messagesApi.v1MessageIdGet(authSession.getSessionToken(), + return executeAndRetry("getMessage", () -> messagesApi.v1MessageIdGet(authSession.getSessionToken(), authSession.getKeyManagerToken(), messageId)); } @@ -281,10 +290,10 @@ public V4Message getMessage(@Nonnull String messageId) { * List attachments in a particular stream. * * @param streamId the stream ID where to look for the attachments - * @param since optional instant of the first required attachment. - * @param to optional instant of the last required attachment. - * @param limit maximum number of attachments to return. This optional value defaults to 50 and should be between 0 and 100. - * @param sort Attachment date sort direction : ASC or DESC (default to ASC) + * @param since optional instant of the first required attachment. + * @param to optional instant of the last required attachment. + * @param limit maximum number of attachments to return. This optional value defaults to 50 and should be between 0 and 100. + * @param sort Attachment date sort direction : ASC or DESC (default to ASC) * @return the list of attachments in the stream. * @see List Attachments */ @@ -292,41 +301,44 @@ public List listAttachments(@Nonnull String streamId, Inst AttachmentSort sort) { final String sortDir = sort == null ? AttachmentSort.ASC.name() : sort.name(); - return callAndCatchApiException(() -> - streamsApi.v1StreamsSidAttachmentsGet(streamId, authSession.getSessionToken(), getEpochMillis(since), getEpochMillis(to), limit, sortDir)); + return executeAndRetry("listAttachments", () -> + streamsApi.v1StreamsSidAttachmentsGet(streamId, authSession.getSessionToken(), getEpochMillis(since), + getEpochMillis(to), limit, sortDir)); } /** * Fetches message ids using timestamp. * * @param streamId the ID of the stream where to fetch messages. - * @param since optional instant of the first required messageId. - * @param to optional instant of the last required messageId. - * @param limit optional maximum number of messageIds to return. - * @param skip optional number of messageIds to skip. + * @param since optional instant of the first required messageId. + * @param to optional instant of the last required messageId. + * @param limit optional maximum number of messageIds to return. + * @param skip optional number of messageIds to skip. * @return a {@link MessageIdsFromStream} object containing the list of messageIds. * @see Get Message IDs by Timestamp */ - public MessageIdsFromStream getMessageIdsByTimestamp(@Nonnull String streamId, Instant since, Instant to, Integer limit, Integer skip) { - return callAndCatchApiException(() -> - defaultApi.v2AdminStreamsStreamIdMessageIdsGet(authSession.getSessionToken(), streamId, getEpochMillis(since), getEpochMillis(to), limit, skip)); + public MessageIdsFromStream getMessageIdsByTimestamp(@Nonnull String streamId, Instant since, Instant to, + Integer limit, Integer skip) { + return executeAndRetry("getMessageIdsByTimestamp", () -> + defaultApi.v2AdminStreamsStreamIdMessageIdsGet(authSession.getSessionToken(), streamId, getEpochMillis(since), + getEpochMillis(to), limit, skip)); } /** * Fetches message ids using timestamp. * - * @param streamId the ID of the stream where to fetch messages. - * @param since optional instant of the first required messageId. - * @param to optional instant of the last required messageId. + * @param streamId the ID of the stream where to fetch messages. + * @param since optional instant of the first required messageId. + * @param to optional instant of the last required messageId. * @param chunkSize size of elements to retrieve in one call. Optional and defaults to 50. * @param totalSize total maximum number of messages to return. Optional and defaults to 50. * @return a {@link Stream} containing the messageIds. * @see Get Message IDs by Timestamp */ - public Stream getMessageIdsByTimestampStream(@Nonnull String streamId, Instant since, Instant to, Integer chunkSize, Integer totalSize) { + public Stream getMessageIdsByTimestampStream(@Nonnull String streamId, Instant since, Instant to, + Integer chunkSize, Integer totalSize) { PaginatedApi api = ((offset, limit) -> - defaultApi.v2AdminStreamsStreamIdMessageIdsGet(authSession.getSessionToken(), streamId, getEpochMillis(since), getEpochMillis(to), limit, offset) - .getData()); + getMessageIdsByTimestamp(streamId, since, to, limit, offset).getData()); final int actualChunkSize = chunkSize == null ? 50 : chunkSize.intValue(); final int actualTotalSize = totalSize == null ? 50 : totalSize.intValue(); @@ -343,7 +355,7 @@ public Stream getMessageIdsByTimestampStream(@Nonnull String streamId, I * @see List Message Receipts */ public MessageReceiptDetailResponse listMessageReceipts(@Nonnull String messageId) { - return callAndCatchApiException(() -> + return executeAndRetry("listMessageReceipts", () -> defaultApi.v1AdminMessagesMessageIdReceiptsGet(authSession.getSessionToken(), messageId, null, null)); } @@ -357,11 +369,15 @@ public MessageReceiptDetailResponse listMessageReceipts(@Nonnull String messageI * @see Message Metadata */ public MessageMetadataResponse getMessageRelationships(@Nonnull String messageId) { - return callAndCatchApiException(() -> defaultApi.v1AdminMessagesMessageIdMetadataRelationshipsGet( + return executeAndRetry("getMessageRelationships", () -> defaultApi.v1AdminMessagesMessageIdMetadataRelationshipsGet( authSession.getSessionToken(), ApiUtils.getUserAgent(), messageId)); } private static Long getEpochMillis(Instant instant) { return instant == null ? null : instant.toEpochMilli(); } + + private T executeAndRetry(String name, SupplierWithApiException supplier) { + return RetryWithRecovery.executeAndRetry(retryBuilder, name, supplier); + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/SessionService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/SessionService.java index bf67d4553..b80d39c7e 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/SessionService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/SessionService.java @@ -1,10 +1,12 @@ package com.symphony.bdk.core.service; -import com.symphony.bdk.http.api.ApiException; -import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; +import com.symphony.bdk.core.util.function.SupplierWithApiException; import com.symphony.bdk.gen.api.SessionApi; import com.symphony.bdk.gen.api.model.UserV2; +import com.symphony.bdk.http.api.ApiException; import lombok.RequiredArgsConstructor; import org.apiguardian.api.API; @@ -17,6 +19,7 @@ public class SessionService { private final SessionApi sessionApi; + private final RetryWithRecoveryBuilder retryBuilder; /** * Retrieves the {@link UserV2} session from the pod using an {@link AuthSession} holder. @@ -25,10 +28,13 @@ public class SessionService { * @return Bot session info. */ public UserV2 getSession(AuthSession authSession) { - try { - return this.sessionApi.v2SessioninfoGet(authSession.getSessionToken()) ; - } catch (ApiException ex) { - throw new ApiRuntimeException(ex); - } + return executeAndRetry("getSession", + () -> sessionApi.v2SessioninfoGet(authSession.getSessionToken()), authSession); + } + + protected T executeAndRetry(String name, SupplierWithApiException supplier, AuthSession authSession) { + final RetryWithRecoveryBuilder retryBuilderWithAuthSession = RetryWithRecoveryBuilder.from(retryBuilder) + .recoveryStrategy(ApiException::isUnauthorized, authSession::refresh); + return RetryWithRecovery.executeAndRetry(retryBuilderWithAuthSession, name, supplier); } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/exception/NestedRetryException.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/exception/NestedRetryException.java new file mode 100644 index 000000000..6db499474 --- /dev/null +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/exception/NestedRetryException.java @@ -0,0 +1,13 @@ +package com.symphony.bdk.core.service.datafeed.exception; + +import com.symphony.bdk.core.retry.RetryWithRecovery; + +/** + * Exception thrown when recovery strategy in a {@link RetryWithRecovery} failed. + * Especially used in {@link com.symphony.bdk.core.service.datafeed.DatafeedService} implementations. + */ +public class NestedRetryException extends RuntimeException { + public NestedRetryException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/AbstractDatafeedService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/AbstractDatafeedService.java index 913e0d94a..1a440e9c0 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/AbstractDatafeedService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/AbstractDatafeedService.java @@ -2,23 +2,19 @@ import com.symphony.bdk.http.api.ApiException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.auth.exception.AuthUnauthorizedException; import com.symphony.bdk.core.config.model.BdkConfig; -import com.symphony.bdk.core.config.model.BdkRetryConfig; import com.symphony.bdk.core.service.datafeed.DatafeedService; import com.symphony.bdk.core.service.datafeed.RealTimeEventListener; -import com.symphony.bdk.core.util.BdkExponentialFunction; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.gen.api.DatafeedApi; import com.symphony.bdk.gen.api.model.V4Event; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; -import javax.ws.rs.ProcessingException; - /** * Base class for implementing the datafeed services. A datafeed services can help a bot subscribe or unsubscribe * a {@link RealTimeEventListener} and handle the received event by the subscribed listeners. @@ -26,97 +22,74 @@ @Slf4j abstract class AbstractDatafeedService implements DatafeedService { - protected final AuthSession authSession; - protected final BdkConfig bdkConfig; - protected final List listeners; - protected final RetryConfig retryConfig; - protected DatafeedApi datafeedApi; + protected final AuthSession authSession; + protected final BdkConfig bdkConfig; + protected final List listeners; + protected final RetryWithRecoveryBuilder retryWithRecoveryBuilder; + protected DatafeedApi datafeedApi; - public AbstractDatafeedService(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) { - this.datafeedApi = datafeedApi; - this.listeners = new ArrayList<>(); - this.authSession = authSession; - this.bdkConfig = config; - BdkRetryConfig bdkRetryConfig = this.bdkConfig.getDatafeed().getRetry() == null ? this.bdkConfig.getRetry() : this.bdkConfig.getDatafeed().getRetry(); - this.retryConfig = RetryConfig.custom() - .maxAttempts(bdkRetryConfig.getMaxAttempts()) - .intervalFunction(BdkExponentialFunction.ofExponentialBackoff(bdkRetryConfig)) - .retryOnException(e -> { - if (e instanceof ApiException) { - ApiException apiException = (ApiException) e; - return apiException.isServerError() || apiException.isUnauthorized(); - } - return e instanceof ProcessingException; - }) - .build(); - } + public AbstractDatafeedService(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) { + this.datafeedApi = datafeedApi; + this.listeners = new ArrayList<>(); + this.authSession = authSession; + this.bdkConfig = config; + this.retryWithRecoveryBuilder = new RetryWithRecoveryBuilder<>() + .retryConfig(config.getDatafeedRetryConfig()) + .recoveryStrategy(ApiException::isUnauthorized, this::refresh); + } - /** - * {@inheritDoc} - */ - @Override - public void subscribe(RealTimeEventListener listener) { - listeners.add(listener); - } + /** + * {@inheritDoc} + */ + @Override + public void subscribe(RealTimeEventListener listener) { + listeners.add(listener); + } - /** - * {@inheritDoc} - */ - @Override - public void unsubscribe(RealTimeEventListener listener) { - listeners.remove(listener); - } + /** + * {@inheritDoc} + */ + @Override + public void unsubscribe(RealTimeEventListener listener) { + listeners.remove(listener); + } - /** - * Handle a received listener by using the subscribed {@link RealTimeEventListener}. - * - * @param events List of Datafeed events to be handled - * - */ - protected void handleV4EventList(List events) { - for (V4Event event : events) { - if (event == null || event.getType() == null) { - continue; - } - if (this.isSelfGeneratedEvent(event)) { - continue; - } - try { - RealTimeEventType eventType = RealTimeEventType.valueOf(event.getType()); - for (RealTimeEventListener listener : listeners) { - try { - eventType.dispatch(listener, event); - } catch (Exception ex) { - log.debug("An error has occurred while dispatching event {}", event, ex); - } - } - } catch (IllegalArgumentException e) { - log.warn("Receive events with unknown type: {}", event.getType()); - } + /** + * Handle a received listener by using the subscribed {@link RealTimeEventListener}. + * + * @param events List of Datafeed events to be handled + */ + protected void handleV4EventList(List events) { + for (V4Event event : events) { + if (event == null || event.getType() == null) { + continue; + } + if (this.isSelfGeneratedEvent(event)) { + continue; + } + try { + RealTimeEventType eventType = RealTimeEventType.valueOf(event.getType()); + for (RealTimeEventListener listener : listeners) { + eventType.dispatch(listener, event); } + } catch (IllegalArgumentException e) { + log.warn("Receive events with unknown type: {}", event.getType()); + } } + } - private boolean isSelfGeneratedEvent(V4Event event) { - return event.getInitiator() != null && event.getInitiator().getUser() != null - && event.getInitiator().getUser().getUsername() != null - && event.getInitiator().getUser().getUsername().equals(this.bdkConfig.getBot().getUsername()); - } - - protected Retry getRetryInstance(String name, RetryConfig... config) { - Retry retry = config.length == 0 ? Retry.of(name, this.retryConfig) : Retry.of(name, config[0]); - retry.getEventPublisher().onRetry(event -> { - long intervalInMillis = event.getWaitInterval().toMillis(); - double interval = intervalInMillis / 1000.0; - if (event.getLastThrowable() != null) { - log.debug("Datafeed service failed due to {}", event.getLastThrowable().getMessage()); - } - log.info("Retry in {} secs...", interval); - }); - return retry; - } + private boolean isSelfGeneratedEvent(V4Event event) { + return event.getInitiator() != null && event.getInitiator().getUser() != null + && event.getInitiator().getUser().getUsername() != null + && event.getInitiator().getUser().getUsername().equals(this.bdkConfig.getBot().getUsername()); + } - protected void setDatafeedApi(DatafeedApi datafeedApi) { - this.datafeedApi = datafeedApi; - } + protected void refresh() throws AuthUnauthorizedException { + log.info("Re-authenticate and try again"); + authSession.refresh(); + } + protected void setDatafeedApi(DatafeedApi datafeedApi) { + this.datafeedApi = datafeedApi; + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1.java index e05202665..a489cf0b2 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1.java @@ -5,150 +5,137 @@ import com.symphony.bdk.core.auth.exception.AuthUnauthorizedException; import com.symphony.bdk.core.config.model.BdkConfig; import com.symphony.bdk.core.service.datafeed.DatafeedIdRepository; +import com.symphony.bdk.core.service.datafeed.exception.NestedRetryException; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.gen.api.DatafeedApi; import com.symphony.bdk.gen.api.model.V4Event; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import javax.ws.rs.ProcessingException; - /** * A class for implementing the datafeed v1 service. - * + *

* This service will be started by calling {@link DatafeedServiceV1#start()} - * + *

* At the beginning, a datafeed will be created and the BDK bot will listen to this datafeed to receive the real-time * events. With datafeed service v1, we don't have the api endpoint to retrieve the datafeed id that a service account * is listening, so, the id of the created datafeed must be persisted in the bot side. - * + *

* The BDK bot will listen to this datafeed to get all the received real-time events. - * + *

* From the second time, the bot will firstly retrieve the datafeed that was persisted and try to read the real-time * events from this datafeed. If this datafeed is expired or faulty, the datafeed service will create the new one for * listening. - * + *

* This service will be stopped by calling {@link DatafeedServiceV1#stop()} - * + *

* If the datafeed service is stopped during a read datafeed call, it has to wait until the last read finish to be * really stopped */ @Slf4j public class DatafeedServiceV1 extends AbstractDatafeedService { - private final AtomicBoolean started = new AtomicBoolean(); - private final DatafeedIdRepository datafeedRepository; - private String datafeedId; + private final AtomicBoolean started = new AtomicBoolean(); + private final DatafeedIdRepository datafeedRepository; + private String datafeedId; - public DatafeedServiceV1(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) { - this(datafeedApi, authSession, config, new OnDiskDatafeedIdRepository(config)); - } + public DatafeedServiceV1(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) { + this(datafeedApi, authSession, config, new OnDiskDatafeedIdRepository(config)); + } - public DatafeedServiceV1(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config, DatafeedIdRepository repository) { - super(datafeedApi, authSession, config); - this.started.set(false); - this.datafeedId = null; - this.datafeedRepository = repository; - } + public DatafeedServiceV1(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config, + DatafeedIdRepository repository) { + super(datafeedApi, authSession, config); + this.started.set(false); + this.datafeedId = null; + this.datafeedRepository = repository; + } - /** - * {@inheritDoc} - */ - @Override - public void start() throws AuthUnauthorizedException, ApiException { - if (this.started.get()) { - throw new IllegalStateException("The datafeed service is already started"); - } - Optional persistedDatafeedId = this.retrieveDatafeed(); - - try { - this.datafeedId = persistedDatafeedId.orElse(this.createDatafeed()); - log.debug("Start reading events from datafeed {}", datafeedId); - this.started.set(true); - do { - this.readDatafeed(); - } while (this.started.get()); - } catch (AuthUnauthorizedException | ApiException exception) { - throw exception; - } catch (Throwable throwable) { - log.error("Unknown error", throwable); - } + /** + * {@inheritDoc} + */ + @Override + public void start() throws AuthUnauthorizedException, ApiException { + if (this.started.get()) { + throw new IllegalStateException("The datafeed service is already started"); } + Optional persistedDatafeedId = this.retrieveDatafeed(); - /** - * {@inheritDoc} - */ - @Override - public void stop() { - log.info("Stop the datafeed service"); - this.started.set(false); + try { + this.datafeedId = persistedDatafeedId.orElse(this.createDatafeed()); + log.debug("Start reading events from datafeed {}", datafeedId); + this.started.set(true); + do { + readDatafeed(); + } while (this.started.get()); + } catch (AuthUnauthorizedException | ApiException exception) { + throw exception; + } catch (Throwable throwable) { + log.error("Unknown error", throwable); } + } - private void readDatafeed() throws Throwable { - RetryConfig config = RetryConfig.from(this.retryConfig).retryOnException(e -> { - if (e instanceof ApiException && e.getSuppressed().length == 0) { - ApiException apiException = (ApiException) e; - return apiException.isServerError() || apiException.isUnauthorized() || apiException.isClientError(); - } - return e instanceof ProcessingException; - }).build(); - Retry retry = this.getRetryInstance("Read Datafeed", config); - retry.executeCheckedSupplier(() -> { - try { - List events = datafeedApi.v4DatafeedIdReadGet(datafeedId, authSession.getSessionToken(), authSession.getKeyManagerToken(), null); - if (events != null && !events.isEmpty()) { - handleV4EventList(events); - } - } catch (ApiException e) { - if (e.isUnauthorized()) { - log.info("Re-authenticate and try again"); - authSession.refresh(); - } else { - log.error("Error {}: {}", e.getCode(), e.getMessage()); - if (e.isClientError()) { - log.info("Recreate a new datafeed and try again"); - try { - datafeedId = this.createDatafeed(); - } catch (Throwable throwable) { - e.addSuppressed(throwable); - } - } - } - throw e; - } - return null; - }); - } + /** + * {@inheritDoc} + */ + @Override + public void stop() { + log.info("Stop the datafeed service"); + this.started.set(false); + } - protected String createDatafeed() throws Throwable { - log.debug("Start creating a new datafeed and persisting it"); - Retry retry = this.getRetryInstance("Create Datafeed"); - return retry.executeCheckedSupplier(() -> { - try { - String id = this.datafeedApi.v4DatafeedCreatePost(authSession.getSessionToken(), authSession.getKeyManagerToken()).getId(); - this.datafeedRepository.write(id); - log.debug("Datafeed: {} was created and persisted", id); - return id; - } catch (ApiException e) { - if (e.isUnauthorized()) { - log.info("Re-authenticate and try again"); - authSession.refresh(); - } else { - log.error("Error {}: {}", e.getCode(), e.getMessage()); - } - throw e; - } - }); + private Void readAndHandleEvents() throws ApiException { + List events = + datafeedApi.v4DatafeedIdReadGet(datafeedId, authSession.getSessionToken(), authSession.getKeyManagerToken(), + null); + if (events != null && !events.isEmpty()) { + handleV4EventList(events); } + return null; + } - protected Optional retrieveDatafeed() { - log.debug("Start retrieving datafeed id"); - return this.datafeedRepository.read(); + private void readDatafeed() throws Throwable { + final RetryWithRecovery retry = RetryWithRecoveryBuilder.from(retryWithRecoveryBuilder) + .name("Read Datafeed V1") + .supplier(this::readAndHandleEvents) + .recoveryStrategy(ApiException::isClientError, this::recreateDatafeed) + .retryOnException(RetryWithRecoveryBuilder::isNetworkOrMinorErrorOrClientError) + .build(); + retry.execute(); + } + + private void recreateDatafeed() { + log.info("Recreate a new datafeed and try again"); + try { + datafeedId = createDatafeed(); + } catch (Throwable throwable) { + throw new NestedRetryException("Recreation of datafeed failed", throwable); } + } + + protected String createDatafeed() throws Throwable { + log.debug("Start creating a new datafeed and persisting it"); + final RetryWithRecovery retry = RetryWithRecoveryBuilder.from(retryWithRecoveryBuilder) + .name("Create Datafeed V1") + .supplier(this::createDatafeedAndPersist) + .build(); + return retry.execute(); + } + + private String createDatafeedAndPersist() throws ApiException { + String id = datafeedApi.v4DatafeedCreatePost(authSession.getSessionToken(), authSession.getKeyManagerToken()).getId(); + datafeedRepository.write(id); + log.debug("Datafeed: {} was created and persisted", id); + return id; + } + protected Optional retrieveDatafeed() { + log.debug("Start retrieving datafeed id"); + return this.datafeedRepository.read(); + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2.java index 1d6d4cd67..adc40ce7c 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2.java @@ -4,196 +4,176 @@ import com.symphony.bdk.core.auth.AuthSession; import com.symphony.bdk.core.auth.exception.AuthUnauthorizedException; import com.symphony.bdk.core.config.model.BdkConfig; +import com.symphony.bdk.core.service.datafeed.exception.NestedRetryException; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.gen.api.DatafeedApi; import com.symphony.bdk.gen.api.model.AckId; import com.symphony.bdk.gen.api.model.V4Event; import com.symphony.bdk.gen.api.model.V5Datafeed; import com.symphony.bdk.gen.api.model.V5EventList; -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import javax.ws.rs.ProcessingException; - /** * A class for implementing the datafeed v2 service. - * + *

* This service will be started by calling {@link DatafeedServiceV2#start()} - * + *

* At the beginning, the BDK bot will try to retrieve the list of datafeed to which it is listening. Since each bot * should only listening to just one datafeed, the first datafeed in the list will be used by the bot to be listened to. * If the retrieved list is empty, the BDK bot will create a new datafeed to listen. - * + *

* The BDK bot will listen to this datafeed to get all the received real-time events. - * + *

* If this datafeed becomes stale or faulty, the BDK bot will create the new one for listening. - * + *

* This service will be stopped by calling {@link DatafeedServiceV2#stop()} - * + *

* If the datafeed service is stopped during a read datafeed call, it has to wait until the last read finish to be * really stopped */ @Slf4j public class DatafeedServiceV2 extends AbstractDatafeedService { - private final AtomicBoolean started = new AtomicBoolean(); - private final AckId ackId; - private V5Datafeed datafeed; + private final AtomicBoolean started = new AtomicBoolean(); + private final AckId ackId; + private V5Datafeed datafeed; - public DatafeedServiceV2(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) { - super(datafeedApi, authSession, config); - this.ackId = new AckId().ackId(""); - } + public DatafeedServiceV2(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) { + super(datafeedApi, authSession, config); + this.ackId = new AckId().ackId(""); + } - /** - * {@inheritDoc} - */ - @Override - public void start() throws ApiException, AuthUnauthorizedException { - if (this.started.get()) { - throw new IllegalStateException("The datafeed service is already started"); - } - try { - this.datafeed = this.retrieveDatafeed(); - if (this.datafeed == null) { - this.datafeed = this.createDatafeed(); - } - log.debug("Start reading datafeed events"); - this.started.set(true); - do { - this.readDatafeed(); - } while (this.started.get()); - } catch (AuthUnauthorizedException | ApiException exception) { - throw exception; - } catch (Throwable throwable) { - log.error("Unknown error", throwable); - } + /** + * {@inheritDoc} + */ + @Override + public void start() throws ApiException, AuthUnauthorizedException { + if (this.started.get()) { + throw new IllegalStateException("The datafeed service is already started"); } - - protected AckId getAckId() { - return this.ackId; + try { + this.datafeed = this.retrieveDatafeed(); + if (this.datafeed == null) { + this.datafeed = this.createDatafeed(); + } + log.debug("Start reading datafeed events"); + this.started.set(true); + do { + this.readDatafeed(); + } while (this.started.get()); + } catch (AuthUnauthorizedException | ApiException | NestedRetryException exception) { + throw exception; + } catch (Throwable throwable) { + log.error("Unknown error", throwable); } + } - /** - * {@inheritDoc} - */ - @Override - public void stop() { - this.started.set(false); - } + protected AckId getAckId() { + return this.ackId; + } - private V5Datafeed createDatafeed() throws Throwable { - log.debug("Start creating datafeed from agent"); - Retry retry = this.getRetryInstance("Create V5Datafeed"); - return retry.executeCheckedSupplier(() -> { - try { - return this.datafeedApi.createDatafeed(authSession.getSessionToken(), authSession.getKeyManagerToken()); - } catch (ApiException e) { - if (e.isUnauthorized()) { - log.info("Re-authenticate and try again"); - authSession.refresh(); - } else { - log.error("Error {}: {}", e.getCode(), e.getMessage()); - } - throw e; - } - }); - } + /** + * {@inheritDoc} + */ + @Override + public void stop() { + this.started.set(false); + } + + private V5Datafeed createDatafeed() throws Throwable { + log.debug("Start creating datafeed from agent"); + + final RetryWithRecovery retry = RetryWithRecoveryBuilder.from(retryWithRecoveryBuilder) + .name("Create Datafeed V2") + .supplier(this::tryCreateDatafeed) + .build(); + + return retry.execute(); + } + + private V5Datafeed tryCreateDatafeed() throws ApiException { + return this.datafeedApi.createDatafeed(authSession.getSessionToken(), authSession.getKeyManagerToken()); + } + + private V5Datafeed retrieveDatafeed() throws Throwable { + log.debug("Start retrieving datafeed from agent"); - private V5Datafeed retrieveDatafeed() throws Throwable { - log.debug("Start retrieving datafeed from agent"); - Retry retry = this.getRetryInstance("Retrieve V5Datafeed"); - List datafeeds = retry.executeCheckedSupplier(() -> { - try { - return this.datafeedApi.listDatafeed(authSession.getSessionToken(), authSession.getKeyManagerToken()); - } catch (ApiException e) { - if (e.isUnauthorized()) { - log.info("Re-authenticate and try again"); - authSession.refresh(); - } else { - log.error("Error {}: {}", e.getCode(), e.getMessage()); - } - throw e; - } - }); - if (!datafeeds.isEmpty()) { - return datafeeds.get(0); - } - return null; + final RetryWithRecovery retry = RetryWithRecoveryBuilder.from(retryWithRecoveryBuilder) + .name("Retrieve Datafeed V2") + .supplier(this::tryRetrieveDatafeed) + .build(); + + return retry.execute(); + } + + private V5Datafeed tryRetrieveDatafeed() throws ApiException { + final List datafeeds = + this.datafeedApi.listDatafeed(authSession.getSessionToken(), authSession.getKeyManagerToken()); + + if (!datafeeds.isEmpty()) { + return datafeeds.get(0); } + return null; + } + + private void readDatafeed() throws Throwable { + log.debug("Reading datafeed events from datafeed {}", datafeed.getId()); + + final RetryWithRecovery retry = RetryWithRecoveryBuilder.from(retryWithRecoveryBuilder) + .name("Read Datafeed V2") + .supplier(this::readAndHandleEvents) + .retryOnException(RetryWithRecoveryBuilder::isNetworkOrMinorErrorOrClientError) + .recoveryStrategy(ApiException::isClientError, this::recreateDatafeed) + .build(); - private void readDatafeed() throws Throwable { - log.debug("Reading datafeed events from datafeed {}", datafeed.getId()); - RetryConfig config = RetryConfig.from(this.retryConfig) - .retryOnException(e -> { - if (e instanceof ApiException && e.getSuppressed().length == 0) { - ApiException apiException = (ApiException) e; - return apiException.isServerError() || apiException.isUnauthorized() || apiException.isClientError(); - } - return e instanceof ProcessingException; - }).build(); - Retry retry = this.getRetryInstance("Read Datafeed", config); - retry.executeCheckedSupplier(() -> { - try { - V5EventList v5EventList = this.datafeedApi.readDatafeed( - datafeed.getId(), - authSession.getSessionToken(), - authSession.getKeyManagerToken(), - ackId); - this.ackId.setAckId(v5EventList.getAckId()); - List events = v5EventList.getEvents(); - if (events != null && !events.isEmpty()) { - this.handleV4EventList(events); - } - } catch (ApiException e) { - if (e.isUnauthorized()) { - log.info("Re-authenticate and try again"); - authSession.refresh(); - } else { - log.error("Error {}: {}", e.getCode(), e.getMessage()); - if (e.isClientError()) { - try { - log.info("Try to delete the faulty datafeed"); - this.deleteDatafeed(); - log.info("Recreate a new datafeed and try again"); - this.datafeed = this.createDatafeed(); - } catch (Throwable throwable) { - e.addSuppressed(throwable); - } - } - } - throw e; - } - return null; - }); + retry.execute(); + } + + private Void readAndHandleEvents() throws ApiException { + V5EventList v5EventList = this.datafeedApi.readDatafeed( + datafeed.getId(), + authSession.getSessionToken(), + authSession.getKeyManagerToken(), + ackId); + this.ackId.setAckId(v5EventList.getAckId()); + List events = v5EventList.getEvents(); + if (events != null && !events.isEmpty()) { + this.handleV4EventList(events); } + return null; + } - private void deleteDatafeed() throws Throwable { - log.debug("Start deleting a faulty datafeed"); - Retry retry = this.getRetryInstance("Delete Datafeed"); - retry.executeCheckedSupplier(() -> { - try { - this.datafeedApi.deleteDatafeed(datafeed.getId(), authSession.getSessionToken(), authSession.getKeyManagerToken()); - this.datafeed = null; - } catch (ApiException e) { - if (e.isClientError()) { - log.debug("The datafeed doesn't exist or is already removed"); - } else { - if (e.isUnauthorized()) { - log.info("Re-authenticate and try again"); - authSession.refresh(); - } else { - log.error("Error {}: {}", e.getCode(), e.getMessage()); - } - throw e; - } - } - return null; - }); + private void recreateDatafeed() { + try { + log.info("Try to delete the faulty datafeed"); + this.deleteDatafeed(); + log.info("Recreate a new datafeed and try again"); + this.datafeed = this.createDatafeed(); + } catch (Throwable throwable) { + throw new NestedRetryException("Recreation of datafeed failed", throwable); } + } + + private void deleteDatafeed() throws Throwable { + log.debug("Start deleting a faulty datafeed"); + + final RetryWithRecovery retry = RetryWithRecoveryBuilder.from(retryWithRecoveryBuilder) + .name("Delete Datafeed V2") + .supplier(this::tryDeleteDatafeed) + .ignoreException(ApiException::isClientError) + .build(); + + retry.execute(); + } + private Void tryDeleteDatafeed() throws ApiException { + this.datafeedApi.deleteDatafeed(datafeed.getId(), authSession.getSessionToken(), authSession.getKeyManagerToken()); + this.datafeed = null; + return null; + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/OboStreamService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/OboStreamService.java index ec06f1d4a..05ff05240 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/OboStreamService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/OboStreamService.java @@ -1,21 +1,26 @@ package com.symphony.bdk.core.service.stream; -import com.symphony.bdk.http.api.ApiException; -import com.symphony.bdk.http.api.ApiRuntimeException; + import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; +import com.symphony.bdk.core.util.function.SupplierWithApiException; import com.symphony.bdk.gen.api.StreamsApi; import com.symphony.bdk.gen.api.model.StreamAttributes; import com.symphony.bdk.gen.api.model.StreamFilter; import com.symphony.bdk.gen.api.model.V2StreamAttributes; +import com.symphony.bdk.http.api.ApiException; import java.util.List; class OboStreamService { protected final StreamsApi streamsApi; + protected final RetryWithRecoveryBuilder retryBuilder; - protected OboStreamService(StreamsApi streamsApi) { + protected OboStreamService(StreamsApi streamsApi, RetryWithRecoveryBuilder retryBuilder) { this.streamsApi = streamsApi; + this.retryBuilder = retryBuilder; } /** @@ -23,15 +28,12 @@ protected OboStreamService(StreamsApi streamsApi) { * * @param authSession Bot Session or Obo Session * @param streamId The stream id - * @return The information about the stream with the given id. - * @see Stream Info V2 + * @return The information about the stream with the given id. + * @see Stream Info V2 */ public V2StreamAttributes getStreamInfo(AuthSession authSession, String streamId) { - try { - return streamsApi.v2StreamsSidInfoGet(streamId, authSession.getSessionToken()); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getStreamInfo", + () -> streamsApi.v2StreamsSidInfoGet(streamId, authSession.getSessionToken()), authSession); } /** @@ -39,15 +41,18 @@ public V2StreamAttributes getStreamInfo(AuthSession authSession, String streamId * * @param authSession Bot Session or Obo Session * @param filter The stream searching criteria - * @return The list of streams retrieved according to the searching criteria. - * @see List Streams + * @return The list of streams retrieved according to the searching criteria. + * @see List Streams */ public List listStreams(AuthSession authSession, StreamFilter filter) { - try { - return streamsApi.v1StreamsListPost(authSession.getSessionToken(), null, null, filter); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("listStreams", + () -> streamsApi.v1StreamsListPost(authSession.getSessionToken(), null, null, filter), authSession); } + protected T executeAndRetry(String name, SupplierWithApiException supplier, AuthSession authSession) { + final RetryWithRecoveryBuilder retryBuilderWithAuthSession = RetryWithRecoveryBuilder.from(retryBuilder) + .clearRecoveryStrategies() // to remove refresh on bot session put by default + .recoveryStrategy(ApiException::isUnauthorized, authSession::refresh); + return RetryWithRecovery.executeAndRetry(retryBuilderWithAuthSession, name, supplier); + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/StreamService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/StreamService.java index 89cdb961b..486acb79d 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/StreamService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/stream/StreamService.java @@ -3,7 +3,11 @@ import com.symphony.bdk.http.api.ApiException; import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.stream.constant.AttachmentSort; +import com.symphony.bdk.core.util.function.SupplierWithApiException; import com.symphony.bdk.gen.api.StreamsApi; import com.symphony.bdk.gen.api.model.RoomDetail; import com.symphony.bdk.gen.api.model.Stream; @@ -26,39 +30,38 @@ /** * Service class for managing streams. - * + *

* This service is used for retrieving information about a particular stream or * chatroom, searching streams, listing members, attachments of a particular stream, * perform some action related to a stream like: *

    - *
  • Create a IM or MIM
  • - *
  • Create a chatroom
  • - *
  • Activate or Deactivate a chatroom
  • - *
  • + *
  • Create a IM or MIM
  • + *
  • Create a chatroom
  • + *
  • Activate or Deactivate a chatroom
  • + *
  • *

- * */ @Slf4j public class StreamService extends OboStreamService { private final AuthSession authSession; - public StreamService(StreamsApi streamsApi, AuthSession authSession) { - super(streamsApi); + public StreamService(StreamsApi streamsApi, AuthSession authSession, RetryWithRecoveryBuilder retryBuilder) { + super(streamsApi, retryBuilder); this.authSession = authSession; } /** * Create a new single or multi party instant message conversation between the caller and specified users. - * + *

* The caller is implicitly included in the members of the created chat. - * + *

* Duplicate users will be included in the membership of the chat but * the duplication will be silently ignored. - * + *

* If there is an existing IM conversation with the same set of participants then * the id of that existing stream will be returned. - * + *

* If the given list of user ids contains only one id, an IM will be created, otherwise, a MIM will be created. * * @param uids List of user ids of the participants. @@ -93,11 +96,8 @@ public Stream create(Long... uids) { * @see Create Room V3 */ public V3RoomDetail create(V3RoomAttributes roomAttributes) { - try { - return streamsApi.v3RoomCreatePost(authSession.getSessionToken(), roomAttributes); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("createStream", + () -> streamsApi.v3RoomCreatePost(authSession.getSessionToken(), roomAttributes)); } /** @@ -108,11 +108,8 @@ public V3RoomDetail create(V3RoomAttributes roomAttributes) { * @see Search Rooms V3 */ public V3RoomSearchResults searchRooms(V2RoomSearchCriteria query) { - try { - return streamsApi.v3RoomSearchPost(authSession.getSessionToken(), query, null, null); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("searchRooms", + () -> streamsApi.v3RoomSearchPost(authSession.getSessionToken(), query, null, null)); } /** @@ -123,11 +120,8 @@ public V3RoomSearchResults searchRooms(V2RoomSearchCriteria query) { * @see Room Info V3 */ public V3RoomDetail getRoomInfo(String roomId) { - try { - return streamsApi.v3RoomIdInfoGet(roomId, authSession.getSessionToken()); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getRoomInfo", + () -> streamsApi.v3RoomIdInfoGet(roomId, authSession.getSessionToken())); } /** @@ -139,27 +133,21 @@ public V3RoomDetail getRoomInfo(String roomId) { * @see De/Reactivate Room */ public RoomDetail setRoomActive(String roomId, Boolean active) { - try { - return streamsApi.v1RoomIdSetActivePost(roomId, active, authSession.getSessionToken()); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("setRoomActive", + () -> streamsApi.v1RoomIdSetActivePost(roomId, active, authSession.getSessionToken())); } /** * Update the attributes of an existing chatroom. * - * @param roomId The id of the room to be updated + * @param roomId The id of the room to be updated * @param roomAttributes The attributes to be updated to the room * @return The information of the room after being updated. * @see Update Room V3 */ public V3RoomDetail updateRoom(String roomId, V3RoomAttributes roomAttributes) { - try { - return streamsApi.v3RoomIdUpdatePost(roomId, authSession.getSessionToken(), roomAttributes); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("updateRoom", + () -> streamsApi.v3RoomIdUpdatePost(roomId, authSession.getSessionToken(), roomAttributes)); } /** @@ -188,30 +176,29 @@ public V2StreamAttributes getStreamInfo(String streamId) { /** * List attachments in a particular stream. * - * @param streamId The stream id + * @param streamId The stream id * @param sinceInMillis Timestamp in millis of first required attachment - * @param toInMillis Timestamp in millis of last required attachment - * @param sort Attachment date sort direction : ASC or DESC (default to ASC) + * @param toInMillis Timestamp in millis of last required attachment + * @param sort Attachment date sort direction : ASC or DESC (default to ASC) * @return List of attachments in the stream with the given stream id. * @see List Attachments */ - public List getAttachmentsOfStream(String streamId, Long sinceInMillis, Long toInMillis, AttachmentSort sort) { - try { - return streamsApi.v1StreamsSidAttachmentsGet(streamId, authSession.getSessionToken(), sinceInMillis, toInMillis, null, sort.name()); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + public List getAttachmentsOfStream(String streamId, Long sinceInMillis, Long toInMillis, + AttachmentSort sort) { + return executeAndRetry("getAttachmentsOfStream", + () -> streamsApi.v1StreamsSidAttachmentsGet(streamId, authSession.getSessionToken(), sinceInMillis, toInMillis, + null, sort.name())); } /** * Create a new single or multi party instant message conversation. * At least two user IDs must be provided or an error response will be sent. - * + *

* The caller is not included in the members of the created chat. - * + *

* Duplicate users will be included in the membership of the chat but the * duplication will be silently ignored. - * + *

* If there is an existing IM conversation with the same set of participants then * the id of that existing stream will be returned. * @@ -220,26 +207,20 @@ public List getAttachmentsOfStream(String streamId, Long s * @see Create IM or MIM Non-inclusive */ public Stream createInstantMessageAdmin(List uids) { - try { - return streamsApi.v1AdminImCreatePost(authSession.getSessionToken(), uids); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("createInstantMessageAdmin", + () -> streamsApi.v1AdminImCreatePost(authSession.getSessionToken(), uids)); } /** * Deactivate or reactivate a chatroom via AC Portal. * * @param streamId The stream id - * @param active Deactivate or activate + * @param active Deactivate or activate * @return The information of the room after being deactivated or reactivated. */ public RoomDetail setRoomActiveAdmin(String streamId, Boolean active) { - try { - return streamsApi.v1AdminRoomIdSetActivePost(streamId, active, authSession.getSessionToken()); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("setRoomActiveAdmin", + () -> streamsApi.v1AdminRoomIdSetActivePost(streamId, active, authSession.getSessionToken())); } /** @@ -250,11 +231,8 @@ public RoomDetail setRoomActiveAdmin(String streamId, Boolean active) { * @see List Streams for Enterprise V2 */ public V2AdminStreamList listStreamsAdmin(V2AdminStreamFilter filter) { - try { - return streamsApi.v2AdminStreamsListPost(authSession.getSessionToken(), null, null, filter); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("listStreamsAdmin", + () -> streamsApi.v2AdminStreamsListPost(authSession.getSessionToken(), null, null, filter)); } /** @@ -266,10 +244,11 @@ public V2AdminStreamList listStreamsAdmin(V2AdminStreamFilter filter) { * @see Stream Members */ public V2MembershipList listStreamMembers(String streamId) { - try { - return streamsApi.v1AdminStreamIdMembershipListGet(streamId, authSession.getSessionToken(), null, null); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("listStreamMembers", + () -> streamsApi.v1AdminStreamIdMembershipListGet(streamId, authSession.getSessionToken(), null, null)); + } + + private T executeAndRetry(String name, SupplierWithApiException supplier) { + return RetryWithRecovery.executeAndRetry(retryBuilder, name, supplier); } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/OboUserService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/OboUserService.java index 67f36a7f1..7367ae86b 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/OboUserService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/OboUserService.java @@ -1,14 +1,16 @@ package com.symphony.bdk.core.service.user; -import com.symphony.bdk.http.api.ApiException; -import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; +import com.symphony.bdk.core.util.function.SupplierWithApiException; import com.symphony.bdk.gen.api.UserApi; import com.symphony.bdk.gen.api.UsersApi; import com.symphony.bdk.gen.api.model.UserSearchQuery; import com.symphony.bdk.gen.api.model.UserSearchResults; import com.symphony.bdk.gen.api.model.UserV2; import com.symphony.bdk.gen.api.model.V2UserList; +import com.symphony.bdk.http.api.ApiException; import lombok.NonNull; @@ -21,10 +23,12 @@ class OboUserService { protected final UserApi userApi; protected final UsersApi usersApi; + protected final RetryWithRecoveryBuilder retryBuilder; - protected OboUserService(UserApi userApi, UsersApi usersApi) { + protected OboUserService(UserApi userApi, UsersApi usersApi, RetryWithRecoveryBuilder retryBuilder) { this.userApi = userApi; this.usersApi = usersApi; + this.retryBuilder = retryBuilder; } /** @@ -35,17 +39,15 @@ protected OboUserService(UserApi userApi, UsersApi usersApi) { * @param local If true then a local DB search will be performed and only local pod users will be * returned. If absent or false then a directory search will be performed and users * from other pods who are visible to the calling user will also be returned. - * @return Users found by user ids - * @see Users Lookup V3 + * @return Users found by user ids + * @see Users Lookup V3 */ - public List searchUserByIds(@NonNull AuthSession authSession, @NonNull List uidList, @NonNull Boolean local) { - try { - String uids = uidList.stream().map(String::valueOf).collect(Collectors.joining(",")); - V2UserList v2UserList = usersApi.v3UsersGet(authSession.getSessionToken(), uids, null, null, local); - return v2UserList.getUsers(); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + public List searchUserByIds(@NonNull AuthSession authSession, @NonNull List uidList, + @NonNull Boolean local) { + String uids = uidList.stream().map(String::valueOf).collect(Collectors.joining(",")); + V2UserList v2UserList = executeAndRetry("searchUserByIds", + () -> usersApi.v3UsersGet(authSession.getSessionToken(), uids, null, null, local), authSession); + return v2UserList.getUsers(); } /** @@ -53,17 +55,14 @@ public List searchUserByIds(@NonNull AuthSession authSession, @NonNull L * * @param authSession Bot Session or Obo Session * @param uidList List of user ids - * @return Users found by user ids - * @see Users Lookup V3 + * @return Users found by user ids + * @see Users Lookup V3 */ public List searchUserByIds(@NonNull AuthSession authSession, @NonNull List uidList) { - try { - String uids = uidList.stream().map(String::valueOf).collect(Collectors.joining(",")); - V2UserList v2UserList = usersApi.v3UsersGet(authSession.getSessionToken(), uids, null, null, false); - return v2UserList.getUsers(); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + String uids = uidList.stream().map(String::valueOf).collect(Collectors.joining(",")); + V2UserList v2UserList = executeAndRetry("searchUserByIds", + () -> usersApi.v3UsersGet(authSession.getSessionToken(), uids, null, null, false), authSession); + return v2UserList.getUsers(); } /** @@ -74,17 +73,15 @@ public List searchUserByIds(@NonNull AuthSession authSession, @NonNull L * @param local If true then a local DB search will be performed and only local pod users will be * returned. If absent or false then a directory search will be performed and users * from other pods who are visible to the calling user will also be returned. - * @return Users found by emails. - * @see Users Lookup V3 + * @return Users found by emails. + * @see Users Lookup V3 */ - public List searchUserByEmails(@NonNull AuthSession authSession, @NonNull List emailList, @NonNull Boolean local) { - try { - String emails = String.join(",", emailList); - V2UserList v2UserList = usersApi.v3UsersGet(authSession.getSessionToken(), null, emails, null, local); - return v2UserList.getUsers(); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + public List searchUserByEmails(@NonNull AuthSession authSession, @NonNull List emailList, + @NonNull Boolean local) { + String emails = String.join(",", emailList); + V2UserList v2UserList = executeAndRetry("searchUserByEmails", + () -> usersApi.v3UsersGet(authSession.getSessionToken(), null, emails, null, local), authSession); + return v2UserList.getUsers(); } /** @@ -92,54 +89,53 @@ public List searchUserByEmails(@NonNull AuthSession authSession, @NonNul * * @param authSession Bot Session or Obo Session * @param emailList List of emails - * @return Users found by emails - * @see Users Lookup V3 + * @return Users found by emails + * @see Users Lookup V3 */ public List searchUserByEmails(@NonNull AuthSession authSession, @NonNull List emailList) { - try { - String emails = String.join(",", emailList); - V2UserList v2UserList = usersApi.v3UsersGet(authSession.getSessionToken(), null, emails, null, false); - return v2UserList.getUsers(); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + String emails = String.join(",", emailList); + V2UserList v2UserList = executeAndRetry("searchUserByEmails", + () -> usersApi.v3UsersGet(authSession.getSessionToken(), null, emails, null, false), authSession); + return v2UserList.getUsers(); } /** * {@link UserService#searchUserByUsernames(List)} * - * @param authSession Bot Session or Obo Session - * @param usernameList List of usernames - * @return Users found by usernames - * @see Users Lookup V3 + * @param authSession Bot Session or Obo Session + * @param usernameList List of usernames + * @return Users found by usernames + * @see Users Lookup V3 */ public List searchUserByUsernames(@NonNull AuthSession authSession, @NonNull List usernameList) { - try { - String usernames = String.join(",", usernameList); - V2UserList v2UserList = usersApi.v3UsersGet(authSession.getSessionToken(), null, null, usernames, true); - return v2UserList.getUsers(); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + String usernames = String.join(",", usernameList); + V2UserList v2UserList = executeAndRetry("searchUserByUsernames", + () -> usersApi.v3UsersGet(authSession.getSessionToken(), null, null, usernames, true), authSession); + return v2UserList.getUsers(); } /** * {@link UserService#searchUserBySearchQuery(UserSearchQuery, Boolean)} * - * @param authSession Bot Session or Obo Session - * @param query Searching query containing complicated information like title, location, company... - * @param local If true then a local DB search will be performed and only local pod users will be - * returned. If absent or false then a directory search will be performed and users - * from other pods who are visible to the calling user will also be returned. - * @return List of users found by query - * @see Search Users + * @param authSession Bot Session or Obo Session + * @param query Searching query containing complicated information like title, location, company... + * @param local If true then a local DB search will be performed and only local pod users will be + * returned. If absent or false then a directory search will be performed and users + * from other pods who are visible to the calling user will also be returned. + * @return List of users found by query + * @see Search Users */ - public List searchUserBySearchQuery(@NonNull AuthSession authSession, @NonNull UserSearchQuery query, @Nullable Boolean local) { - try { - UserSearchResults results = usersApi.v1UserSearchPost(authSession.getSessionToken(), query, null, null, local); - return results.getUsers(); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + public List searchUserBySearchQuery(@NonNull AuthSession authSession, @NonNull UserSearchQuery query, + @Nullable Boolean local) { + UserSearchResults results = executeAndRetry("searchUserBySearchQuery", + () -> usersApi.v1UserSearchPost(authSession.getSessionToken(), query, null, null, local), authSession); + return results.getUsers(); + } + + protected T executeAndRetry(String name, SupplierWithApiException supplier, AuthSession authSession) { + final RetryWithRecoveryBuilder retryBuilderWithAuthSession = RetryWithRecoveryBuilder.from(retryBuilder) + .clearRecoveryStrategies() // to remove refresh on bot session put by default + .recoveryStrategy(ApiException::isUnauthorized, authSession::refresh); + return RetryWithRecovery.executeAndRetry(retryBuilderWithAuthSession, name, supplier); } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/UserService.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/UserService.java index 1d4056376..fb8e88943 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/UserService.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/user/UserService.java @@ -1,10 +1,11 @@ package com.symphony.bdk.core.service.user; -import com.symphony.bdk.http.api.ApiException; -import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.retry.RetryWithRecovery; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.user.constant.RoleId; import com.symphony.bdk.core.service.user.mapper.UserDetailMapper; +import com.symphony.bdk.core.util.function.SupplierWithApiException; import com.symphony.bdk.gen.api.UserApi; import com.symphony.bdk.gen.api.UsersApi; import com.symphony.bdk.gen.api.model.Avatar; @@ -32,28 +33,28 @@ import javax.annotation.Nullable; + /** * Service class for managing users. - * + *

* This service is used for retrieving information about a particular user, * search users by ids, emails or usernames, perform some action related to * user like: *

    - *
  • Add or remove roles from an user
  • - *
  • Get or update avatar of an user
  • - *
  • Get, assign or unassign disclaimer to an user
  • - *
  • Get, update feature entitlements of an user
  • - *
  • Get, update status of an user
  • + *
  • Add or remove roles from an user
  • + *
  • Get or update avatar of an user
  • + *
  • Get, assign or unassign disclaimer to an user
  • + *
  • Get, update feature entitlements of an user
  • + *
  • Get, update status of an user
  • *

- * */ @Slf4j public class UserService extends OboUserService { private final AuthSession authSession; - public UserService(UserApi userApi, UsersApi usersApi, AuthSession authSession) { - super(userApi, usersApi); + public UserService(UserApi userApi, UsersApi usersApi, AuthSession authSession, RetryWithRecoveryBuilder retryBuilder) { + super(userApi, usersApi, retryBuilder); this.authSession = authSession; } @@ -61,94 +62,76 @@ public UserService(UserApi userApi, UsersApi usersApi, AuthSession authSession) * Retrieve user details of a particular user. * * @param uid User Id - * @return Details of the user. - * @see Get User v2 + * @return Details of the user. + * @see Get User v2 */ public V2UserDetail getUserDetailByUid(@NonNull Long uid) { - try { - return userApi.v2AdminUserUidGet(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getUserDetailByUid", () -> userApi.v2AdminUserUidGet(authSession.getSessionToken(), uid)); } /** * Retrieve all users in the company (pod). * * @return List of retrieved users - * @see List Users V2 + * @see List Users V2 */ public List listUsersDetail() { - try { - return userApi.v2AdminUserListGet(authSession.getSessionToken(), null, null); - - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("listUsersDetail", + () -> userApi.v2AdminUserListGet(authSession.getSessionToken(), null, null)); } /** * Retrieve a list of users in the company (pod) by a filter. * - * @param filter using to filter users by - * @return List of retrieved users - * @see Find Users V1 - * @see com.symphony.bdk.core.service.user.constant.UserFeature + * @param filter using to filter users by + * @return List of retrieved users + * @see Find Users V1 + * @see com.symphony.bdk.core.service.user.constant.UserFeature */ public List listUsersDetail(@NonNull UserFilter filter) { - try { - List userDetailList = userApi.v1AdminUserFindPost(authSession.getSessionToken(), filter, null, null); - return userDetailList.stream().map(UserDetailMapper.INSTANCE::userDetailToV2UserDetail).collect(Collectors.toList()); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + List userDetailList = executeAndRetry("listUsersDetail", + () -> userApi.v1AdminUserFindPost(authSession.getSessionToken(), filter, null, null)); + return userDetailList.stream() + .map(UserDetailMapper.INSTANCE::userDetailToV2UserDetail) + .collect(Collectors.toList()); } /** * Add a role to an user. * - * @param uid User Id - * @param roleId Role Id - * @see Add Role + * @param uid User Id + * @param roleId Role Id + * @see Add Role */ public void addRoleToUser(@NonNull Long uid, @NonNull RoleId roleId) { - try { - StringId stringId = new StringId().id(roleId.name()); - userApi.v1AdminUserUidRolesAddPost(authSession.getSessionToken(), uid, stringId); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + StringId stringId = new StringId().id(roleId.name()); + executeAndRetry("addRoleToUser", + () -> userApi.v1AdminUserUidRolesAddPost(authSession.getSessionToken(), uid, stringId)); } /** * Remove a role from an user. * - * @param uid User Id - * @param roleId Role Id - * @see Remove Role + * @param uid User Id + * @param roleId Role Id + * @see Remove Role */ public void removeRoleFromUser(@NonNull Long uid, @NonNull RoleId roleId) { - try { - StringId stringId = new StringId().id(roleId.name()); - userApi.v1AdminUserUidRolesRemovePost(authSession.getSessionToken(), uid, stringId); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + StringId stringId = new StringId().id(roleId.name()); + executeAndRetry("removeRoleFromUser", + () -> userApi.v1AdminUserUidRolesRemovePost(authSession.getSessionToken(), uid, stringId)); } /** * Get the url of avatar of an user * * @param uid User Id - * @return List of avatar urls of the user - * @see User Avatar + * @return List of avatar urls of the user + * @see User Avatar */ public List getAvatarFromUser(@NonNull Long uid) { - try { - return userApi.v1AdminUserUidAvatarGet(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getAvatarFromUser", + () -> userApi.v1AdminUserUidAvatarGet(authSession.getSessionToken(), uid)); } /** @@ -156,15 +139,12 @@ public List getAvatarFromUser(@NonNull Long uid) { * * @param uid User Id * @param image The avatar image for the user profile picture.The image must be a base64-encoded. - * @see Update User Avatar + * @see Update User Avatar */ public void updateAvatarOfUser(@NonNull Long uid, @NonNull String image) { - try { - AvatarUpdate avatarUpdate = new AvatarUpdate().image(image); - userApi.v1AdminUserUidAvatarUpdatePost(authSession.getSessionToken(), uid, avatarUpdate); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + AvatarUpdate avatarUpdate = new AvatarUpdate().image(image); + executeAndRetry("updateAvatarOfUser", + () -> userApi.v1AdminUserUidAvatarUpdatePost(authSession.getSessionToken(), uid, avatarUpdate)); } /** @@ -172,7 +152,7 @@ public void updateAvatarOfUser(@NonNull Long uid, @NonNull String image) { * * @param uid User Id * @param image The avatar image in bytes array for the user profile picture. - * @see Update User Avatar + * @see Update User Avatar */ public void updateAvatarOfUser(@NonNull Long uid, @NonNull byte[] image) { String imageBase64 = Base64.getEncoder().encodeToString(image); @@ -184,7 +164,7 @@ public void updateAvatarOfUser(@NonNull Long uid, @NonNull byte[] image) { * * @param uid User Id * @param imageStream The avatar image input stream for the user profile picture. - * @see Update User Avatar + * @see Update User Avatar */ public void updateAvatarOfUser(@NonNull Long uid, @NonNull InputStream imageStream) throws IOException { byte[] bytes = IOUtils.toByteArray(imageStream); @@ -195,60 +175,49 @@ public void updateAvatarOfUser(@NonNull Long uid, @NonNull InputStream imageStre * Get disclaimer assigned to an user. * * @param uid User Id - * @return Disclaimer assigned to the user. - * @see User Disclaimer + * @return Disclaimer assigned to the user. + * @see User Disclaimer */ public Disclaimer getDisclaimerAssignedToUser(@NonNull Long uid) { - try { - return userApi.v1AdminUserUidDisclaimerGet(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getDisclaimerAssignedToUser", + () -> userApi.v1AdminUserUidDisclaimerGet(authSession.getSessionToken(), uid)); } /** * Unassign disclaimer from an user. * * @param uid User Id - * @see Unassign User Disclaimer + * @see Unassign User Disclaimer */ public void unAssignDisclaimerFromUser(@NonNull Long uid) { - try { - userApi.v1AdminUserUidDisclaimerDelete(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + executeAndRetry("unAssignDisclaimerFromUser", + () -> userApi.v1AdminUserUidDisclaimerDelete(authSession.getSessionToken(), uid)); } /** * Assign disclaimer to an user. * - * @param uid User Id - * @param disclaimerId Disclaimer to be assigned - * @see Update User Disclaimer + * @param uid User Id + * @param disclaimerId Disclaimer to be assigned + * @see Update User Disclaimer */ public void assignDisclaimerToUser(@NonNull Long uid, @NonNull String disclaimerId) { - try { - StringId stringId = new StringId().id(disclaimerId); - userApi.v1AdminUserUidDisclaimerUpdatePost(authSession.getSessionToken(), uid, stringId); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + StringId stringId = new StringId().id(disclaimerId); + executeAndRetry("assignDisclaimerToUser", + () -> userApi.v1AdminUserUidDisclaimerUpdatePost(authSession.getSessionToken(), uid, stringId)); + } /** * Get delegates assigned to an user. * * @param uid User Id - * @return List of delegates assigned to an user. - * @see User Delegates + * @return List of delegates assigned to an user. + * @see User Delegates */ public List getDelegatesAssignedToUser(@NonNull Long uid) { - try { - return userApi.v1AdminUserUidDelegatesGet(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getDelegatesAssignedToUser", + () -> userApi.v1AdminUserUidDelegatesGet(authSession.getSessionToken(), uid)); } /** @@ -257,75 +226,61 @@ public List getDelegatesAssignedToUser(@NonNull Long uid) { * @param uid User Id * @param delegatedUserId Delegated user Id to be assigned * @param actionEnum Action to be performed - * @see Update User Delegates + * @see Update User Delegates */ - public void updateDelegatesAssignedToUser(@NonNull Long uid, @NonNull Long delegatedUserId, @NonNull DelegateAction.ActionEnum actionEnum) { - try { - DelegateAction delegateAction = new DelegateAction().action(actionEnum).userId(delegatedUserId); - userApi.v1AdminUserUidDelegatesUpdatePost(authSession.getSessionToken(), uid, delegateAction); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + public void updateDelegatesAssignedToUser(@NonNull Long uid, @NonNull Long delegatedUserId, + @NonNull DelegateAction.ActionEnum actionEnum) { + DelegateAction delegateAction = new DelegateAction().action(actionEnum).userId(delegatedUserId); + executeAndRetry("updateDelegatesAssignedToUser", + () -> userApi.v1AdminUserUidDelegatesUpdatePost(authSession.getSessionToken(), uid, delegateAction)); } /** * Get feature entitlements of an user. * * @param uid User Id - * @return List of feature entitlements of the user. - * @see User Features + * @return List of feature entitlements of the user. + * @see User Features */ public List getFeatureEntitlementsOfUser(@NonNull Long uid) { - try { - return userApi.v1AdminUserUidFeaturesGet(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getFeatureEntitlementsOfUser", + () -> userApi.v1AdminUserUidFeaturesGet(authSession.getSessionToken(), uid)); } /** * Update feature entitlements of an user. * - * @param uid User Id - * @param features List of feature entitlements to be updated - * @see Update User Features + * @param uid User Id + * @param features List of feature entitlements to be updated + * @see Update User Features */ public void updateFeatureEntitlementsOfUser(@NonNull Long uid, @NonNull List features) { - try { - userApi.v1AdminUserUidFeaturesUpdatePost(authSession.getSessionToken(), uid, features); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + executeAndRetry("updateFeatureEntitlementsOfUser", + () -> userApi.v1AdminUserUidFeaturesUpdatePost(authSession.getSessionToken(), uid, features)); } /** * Get status of an user. * * @param uid User Id - * @return Status of the user. - * @see User Status + * @return Status of the user. + * @see User Status */ public UserStatus getStatusOfUser(@NonNull Long uid) { - try { - return userApi.v1AdminUserUidStatusGet(authSession.getSessionToken(), uid); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + return executeAndRetry("getStatusOfUser", + () -> userApi.v1AdminUserUidStatusGet(authSession.getSessionToken(), uid)); } /** * Update the status of an user * - * @param uid User Id - * @param status Status to be updated to the user - * @see Update User Status + * @param uid User Id + * @param status Status to be updated to the user + * @see Update User Status */ public void updateStatusOfUser(@NonNull Long uid, @NonNull UserStatus status) { - try { - userApi.v1AdminUserUidStatusUpdatePost(authSession.getSessionToken(), uid, status); - } catch (ApiException apiException) { - throw new ApiRuntimeException(apiException); - } + executeAndRetry("updateStatusOfUser", + () -> userApi.v1AdminUserUidStatusUpdatePost(authSession.getSessionToken(), uid, status)); } /** @@ -335,8 +290,8 @@ public void updateStatusOfUser(@NonNull Long uid, @NonNull UserStatus status) { * @param local If true then a local DB search will be performed and only local pod users will be * returned. If absent or false then a directory search will be performed and users * from other pods who are visible to the calling user will also be returned. - * @return Users found by user ids - * @see Users Lookup V3 + * @return Users found by user ids + * @see Users Lookup V3 */ public List searchUserByIds(@NonNull List uidList, @NonNull Boolean local) { return this.searchUserByIds(this.authSession, uidList, local); @@ -346,8 +301,8 @@ public List searchUserByIds(@NonNull List uidList, @NonNull Boolea * Search user by list of user ids * * @param uidList List of user ids - * @return Users found by user ids - * @see Users Lookup V3 + * @return Users found by user ids + * @see Users Lookup V3 */ public List searchUserByIds(@NonNull List uidList) { return this.searchUserByIds(this.authSession, uidList); @@ -360,8 +315,8 @@ public List searchUserByIds(@NonNull List uidList) { * @param local If true then a local DB search will be performed and only local pod users will be * returned. If absent or false then a directory search will be performed and users * from other pods who are visible to the calling user will also be returned. - * @return Users found by emails. - * @see Users Lookup V3 + * @return Users found by emails. + * @see Users Lookup V3 */ public List searchUserByEmails(@NonNull List emailList, @NonNull Boolean local) { return this.searchUserByEmails(this.authSession, emailList, local); @@ -371,8 +326,8 @@ public List searchUserByEmails(@NonNull List emailList, @NonNull * Search user by list of email addresses. * * @param emailList List of email addresses - * @return Users found by emails. - * @see Users Lookup V3 + * @return Users found by emails. + * @see Users Lookup V3 */ public List searchUserByEmails(@NonNull List emailList) { return this.searchUserByEmails(this.authSession, emailList); @@ -381,9 +336,9 @@ public List searchUserByEmails(@NonNull List emailList) { /** * Search user by list of usernames. * - * @param usernameList List of usernames - * @return Users found by usernames - * @see Users Lookup V3 + * @param usernameList List of usernames + * @return Users found by usernames + * @see Users Lookup V3 */ public List searchUserByUsernames(@NonNull List usernameList) { return this.searchUserByUsernames(this.authSession, usernameList); @@ -396,10 +351,14 @@ public List searchUserByUsernames(@NonNull List usernameList) { * @param local If true then a local DB search will be performed and only local pod users will be * returned. If absent or false then a directory search will be performed and users * from other pods who are visible to the calling user will also be returned. - * @return List of users found by query - * @see Search Users + * @return List of users found by query + * @see Search Users */ public List searchUserBySearchQuery(@NonNull UserSearchQuery query, @Nullable Boolean local) { return this.searchUserBySearchQuery(this.authSession, query, local); } + + private T executeAndRetry(String name, SupplierWithApiException supplier) { + return RetryWithRecovery.executeAndRetry(retryBuilder, name, supplier); + } } diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/SupplierWithApiException.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/SupplierWithApiException.java deleted file mode 100644 index 44b3858d1..000000000 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/SupplierWithApiException.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.symphony.bdk.core.util; - -import com.symphony.bdk.http.api.ApiException; -import com.symphony.bdk.http.api.ApiRuntimeException; - -import org.apiguardian.api.API; - -/** - * Functional interface which supplies a T object and may throw an {@link ApiException}. - * @param the type returned by the suuplier. - */ -@FunctionalInterface -@API(status = API.Status.INTERNAL) -public interface SupplierWithApiException { - T get() throws ApiException; - - /** - * Method which wraps {@link #get()} and throws an {@link ApiRuntimeException} if {@link ApiException} is thrown. - * @param supplier the supplier - * @param the type returned by the supplier - * @return the value returned by the supplier - */ - static T callAndCatchApiException(SupplierWithApiException supplier) { - try { - return supplier.get(); - } catch (ApiException e) { - throw new ApiRuntimeException(e); - } - } -} diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/function/ConsumerWithThrowable.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/function/ConsumerWithThrowable.java new file mode 100644 index 000000000..45b9fe7ab --- /dev/null +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/function/ConsumerWithThrowable.java @@ -0,0 +1,14 @@ +package com.symphony.bdk.core.util.function; + + +import org.apiguardian.api.API; + +/** + * Functional interface which may throw a {@link Throwable}. + * This is used to specify recovery functions in {@link ConsumerWithThrowable}. + */ +@FunctionalInterface +@API(status = API.Status.INTERNAL) +public interface ConsumerWithThrowable { + void consume() throws Throwable; +} diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/function/SupplierWithApiException.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/function/SupplierWithApiException.java new file mode 100644 index 000000000..8e91be276 --- /dev/null +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/util/function/SupplierWithApiException.java @@ -0,0 +1,17 @@ +package com.symphony.bdk.core.util.function; + + +import com.symphony.bdk.http.api.ApiException; + +import org.apiguardian.api.API; + +/** + * Functional interface which supplies a T object and may throw an {@link ApiException}. + * + * @param the type returned by the supplier. + */ +@FunctionalInterface +@API(status = API.Status.INTERNAL) +public interface SupplierWithApiException { + T get() throws ApiException; +} diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/retry/resilience4j/Resilience4jRetryWithRecoveryTest.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/retry/resilience4j/Resilience4jRetryWithRecoveryTest.java new file mode 100644 index 000000000..435031aeb --- /dev/null +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/retry/resilience4j/Resilience4jRetryWithRecoveryTest.java @@ -0,0 +1,218 @@ +package com.symphony.bdk.core.retry.resilience4j; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.util.function.ConsumerWithThrowable; +import com.symphony.bdk.core.util.function.SupplierWithApiException; +import com.symphony.bdk.http.api.ApiException; + +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import java.util.Collections; + +/** + * Test class for {@link Resilience4jRetryWithRecovery} + */ +class Resilience4jRetryWithRecoveryTest { + + //to be able to use Mockito mocks around lambdas. Otherwise, does not work, even with mockito-inline + private static class ConcreteSupplier implements SupplierWithApiException { + @Override + public String get() throws ApiException { + return ""; + } + } + + + private static class ConcreteConsumer implements ConsumerWithThrowable { + @Override + public void consume() throws Throwable { + return; + } + } + + BdkRetryConfig getRetryConfig() { + final BdkRetryConfig bdkRetryConfig = new BdkRetryConfig(); + bdkRetryConfig.setMultiplier(1); + bdkRetryConfig.setInitialIntervalMillis(10); + + return bdkRetryConfig; + } + + @Test + void testSupplierWithNoExceptionReturnsValue() throws Throwable { + String value = "string"; + + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenReturn(value); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, (t) -> false, + Collections.emptyMap()); + + assertEquals(value, r.execute()); + verify(supplier, times(1)).get(); + } + + @Test + void testSupplierWithExceptionShouldRetry() throws Throwable { + String value = "string"; + + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()) + .thenThrow(new ApiException(400, "error")) + .thenReturn(value); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, + (t) -> t instanceof ApiException && ((ApiException) t).isClientError(), + Collections.emptyMap()); + + assertEquals(value, r.execute()); + verify(supplier, times(2)).get(); + } + + @Test + void testSupplierWithExceptionAndNoRetryShouldFailWithException() throws Throwable { + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(new ApiException(400, "error")); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, + (t) -> false, Collections.emptyMap()); + + assertThrows(ApiException.class, () -> r.execute()); + verify(supplier, times(1)).get(); + } + + @Test + void testMaxAttemptsReachedShouldFailWithException() throws ApiException { + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(new ApiException(400, "error")); + + final BdkRetryConfig retryConfig = getRetryConfig(); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", retryConfig, supplier, (t) -> true, + Collections.emptyMap()); + + assertThrows(ApiException.class, () -> r.execute()); + verify(supplier, times(retryConfig.getMaxAttempts())).get(); + } + + @Test + void testExceptionNotMatchingRetryPredicateShouldBeForwarded() throws ApiException { + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(new ApiException(400, "error")); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, + (t) -> t instanceof ApiException && ((ApiException) t).isServerError(), + Collections.emptyMap()); + + assertThrows(ApiException.class, () -> r.execute()); + verify(supplier, times(1)).get(); + } + + @Test + void testIgnoredExceptionShouldReturnNull() throws Throwable { + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(new ApiException(400, "error")); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, (t) -> true, + (e) -> true, Collections.emptyMap()); + + assertNull(r.execute()); + verify(supplier, times(1)).get(); + } + + @Test + void testMatchingExceptionShouldTriggerRecoveryAndRetry() throws Throwable { + final String value = "string"; + + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(new ApiException(400, "error")).thenReturn(value); + + ConcreteConsumer consumer = mock(ConcreteConsumer.class); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, (t) -> true, + Collections.singletonMap(e -> true, consumer)); + + assertEquals(value, r.execute()); + + InOrder inOrder = inOrder(supplier, consumer); + inOrder.verify(supplier).get(); + inOrder.verify(consumer).consume(); + inOrder.verify(supplier).get(); + verifyNoMoreInteractions(supplier, consumer); + } + + @Test + void testNonMatchingExceptionShouldNotTriggerRecoveryAndRetry() throws Throwable { + final String value = "string"; + + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(new ApiException(500, "error")).thenReturn(value); + + ConcreteConsumer consumer = mock(ConcreteConsumer.class); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, (t) -> true, + Collections.singletonMap(e -> e.isClientError(), consumer)); + + assertEquals(value, r.execute()); + verify(supplier, times(2)).get(); + verifyNoInteractions(consumer); + } + + @Test + void testThrowableInRecoveryAndNotMatchingRetryPredicateShouldBeForwarded() throws Throwable { + final String value = "string"; + final ApiException error = new ApiException(400, "error"); + + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(error).thenReturn(value); + + ConcreteConsumer consumer = mock(ConcreteConsumer.class); + doThrow(new IndexOutOfBoundsException()).when(consumer).consume(); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, + (t) -> t instanceof ApiException, Collections.singletonMap(ApiException::isClientError, consumer)); + + assertThrows(IndexOutOfBoundsException.class, () -> r.execute()); + + InOrder inOrder = inOrder(supplier, consumer); + inOrder.verify(supplier).get(); + inOrder.verify(consumer).consume(); + verifyNoMoreInteractions(supplier, consumer); + } + + @Test + void testThrowableInRecoveryAndMatchingRetryPredicateShouldLeadToRetry() throws Throwable { + final String value = "string"; + final ApiException error = new ApiException(400, "error"); + + SupplierWithApiException supplier = mock(ConcreteSupplier.class); + when(supplier.get()).thenThrow(error).thenReturn(value); + + ConcreteConsumer consumer = mock(ConcreteConsumer.class); + doThrow(new IndexOutOfBoundsException()).when(consumer).consume(); + + Resilience4jRetryWithRecovery r = new Resilience4jRetryWithRecovery<>("name", getRetryConfig(), supplier, + (t) -> true, Collections.singletonMap(ApiException::isClientError, consumer)); + + assertEquals(value, r.execute()); + + InOrder inOrder = inOrder(supplier, consumer); + inOrder.verify(supplier).get(); + inOrder.verify(consumer).consume(); + inOrder.verify(supplier).get(); + verifyNoMoreInteractions(supplier, consumer); + } +} diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/MessageServiceTest.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/MessageServiceTest.java index 43bea2e99..2c32de29e 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/MessageServiceTest.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/MessageServiceTest.java @@ -18,6 +18,8 @@ import com.symphony.bdk.http.api.ApiException; import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.stream.constant.AttachmentSort; import com.symphony.bdk.core.test.JsonHelper; import com.symphony.bdk.core.test.MockApiClient; @@ -94,7 +96,7 @@ void setUp() { messageService = new MessageService(new MessagesApi(agentClient), new MessageApi(podClient), new MessageSuppressionApi(podClient), streamsApi, new PodApi(podClient), - attachmentsApi, new DefaultApi(podClient), authSession, templateEngine); + attachmentsApi, new DefaultApi(podClient), authSession, templateEngine, new RetryWithRecoveryBuilder()); } @Test diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/SessionServiceTest.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/SessionServiceTest.java index 02db9d9df..74b9b06bc 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/SessionServiceTest.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/SessionServiceTest.java @@ -8,6 +8,8 @@ import com.symphony.bdk.http.api.ApiException; import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.gen.api.SessionApi; import com.symphony.bdk.gen.api.model.UserV2; @@ -35,7 +37,12 @@ class SessionServiceTest { @BeforeEach void setUp() { - this.service = new SessionService(this.sessionApi); + final BdkRetryConfig retryConfig = new BdkRetryConfig(); + retryConfig.setMaxAttempts(2); + retryConfig.setMultiplier(1); + retryConfig.setInitialIntervalMillis(10); + + this.service = new SessionService(this.sessionApi, new RetryWithRecoveryBuilder().retryConfig(retryConfig)); } @Test diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1Test.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1Test.java index 6d8cad141..0f0aa0b1e 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1Test.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV1Test.java @@ -273,14 +273,6 @@ void retrieveDatafeedIdFromEmptyFile(@TempDir Path tempDir) { assertFalse(datafeedId.isPresent()); } - @Test - void getRetryInstanceTest() { - Retry retry = this.datafeedService.getRetryInstance("Test retry"); - assertNotNull(retry); - assertEquals("Test retry", retry.getName()); - assertEquals(2, retry.getRetryConfig().getMaxAttempts()); - } - @Test void handleV4EventTest() { List events = new ArrayList<>(); diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2Test.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2Test.java index a90c34a28..3d960ed59 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2Test.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedServiceV2Test.java @@ -18,6 +18,7 @@ import com.symphony.bdk.core.config.model.BdkDatafeedConfig; import com.symphony.bdk.core.config.model.BdkRetryConfig; import com.symphony.bdk.core.service.datafeed.RealTimeEventListener; +import com.symphony.bdk.core.service.datafeed.exception.NestedRetryException; import com.symphony.bdk.gen.api.DatafeedApi; import com.symphony.bdk.gen.api.model.AckId; import com.symphony.bdk.gen.api.model.V4Event; @@ -161,7 +162,7 @@ void testStartAuthRefreshListDatafeed() throws ApiException, AuthUnauthorizedExc @Test void testStartServerErrorListDatafeed() throws ApiException { - when(datafeedApi.listDatafeed("1234", "1234")).thenThrow(new ApiException(500, "server-error")); + when(datafeedApi.listDatafeed("1234", "1234")).thenThrow(new ApiException(502, "server-error")); assertThrows(ApiException.class, this.datafeedService::start); verify(datafeedApi, times(2)).listDatafeed("1234", "1234"); @@ -170,15 +171,10 @@ void testStartServerErrorListDatafeed() throws ApiException { @Test void testStartErrorListDatafeedThenRetrySuccess() throws ApiException, AuthUnauthorizedException { AtomicInteger count = new AtomicInteger(0); - when(datafeedApi.listDatafeed("1234", "1234")).thenAnswer(invocationOnMock -> { - if (count.getAndIncrement() == 0) { - throw new ApiException(500, "server-error"); - } else { - List datafeeds = new ArrayList<>(); - datafeeds.add(new V5Datafeed().id("test-id")); - return datafeeds; - } - }); + when(datafeedApi.listDatafeed("1234", "1234")) + .thenThrow(new ApiException(502, "server-error")) + .thenReturn(Collections.singletonList(new V5Datafeed().id("test-id"))); + AckId ackId = datafeedService.getAckId(); when(datafeedApi.readDatafeed("test-id", "1234", "1234", ackId)) .thenReturn(new V5EventList().addEventsItem(new V4Event().type(RealTimeEventType.MESSAGESENT.name()).payload(new V4Payload())).ackId("ack-id")); @@ -213,7 +209,7 @@ void testStartClientErrorCreateDatafeed() throws ApiException { @Test void testStartServerErrorCreateDatafeed() throws ApiException { when(datafeedApi.listDatafeed("1234", "1234")).thenReturn(Collections.emptyList()); - when(datafeedApi.createDatafeed("1234", "1234")).thenThrow(new ApiException(500, "server-error")); + when(datafeedApi.createDatafeed("1234", "1234")).thenThrow(new ApiException(502, "server-error")); assertThrows(ApiException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed("1234", "1234"); @@ -263,10 +259,32 @@ void testStartAuthErrorReadDatafeed() throws ApiException, AuthUnauthorizedExcep @Test void testStartServerErrorReadDatafeed() throws ApiException { + AckId ackId = datafeedService.getAckId(); + when(datafeedApi.listDatafeed("1234", "1234")).thenReturn(Collections.singletonList(new V5Datafeed().id("test-id"))); + when(datafeedApi.readDatafeed("test-id", "1234", "1234", ackId)).thenThrow(new ApiException(502, "client-error")); + + assertThrows(ApiException.class, this.datafeedService::start); + verify(datafeedApi, times(1)).listDatafeed("1234", "1234"); + verify(datafeedApi, times(2)).readDatafeed("test-id", "1234", "1234", ackId); + } + + @Test + void testStartInternalServerErrorReadDatafeedShouldNotBeRetried() throws ApiException { AckId ackId = datafeedService.getAckId(); when(datafeedApi.listDatafeed("1234", "1234")).thenReturn(Collections.singletonList(new V5Datafeed().id("test-id"))); when(datafeedApi.readDatafeed("test-id", "1234", "1234", ackId)).thenThrow(new ApiException(500, "client-error")); + assertThrows(ApiException.class, this.datafeedService::start); + verify(datafeedApi, times(1)).listDatafeed("1234", "1234"); + verify(datafeedApi, times(1)).readDatafeed("test-id", "1234", "1234", ackId); + } + + @Test + void testStartTooManyRequestsReadDatafeedShouldBeRetried() throws ApiException { + AckId ackId = datafeedService.getAckId(); + when(datafeedApi.listDatafeed("1234", "1234")).thenReturn(Collections.singletonList(new V5Datafeed().id("test-id"))); + when(datafeedApi.readDatafeed("test-id", "1234", "1234", ackId)).thenThrow(new ApiException(429, "too-many-requests")); + assertThrows(ApiException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed("1234", "1234"); verify(datafeedApi, times(2)).readDatafeed("test-id", "1234", "1234", ackId); @@ -297,9 +315,9 @@ void testStartServerErrorDeleteDatafeed() throws ApiException { when(datafeedApi.listDatafeed("1234", "1234")).thenReturn(Collections.singletonList(new V5Datafeed().id("test-id"))); when(datafeedApi.createDatafeed("1234", "1234")).thenReturn(new V5Datafeed().id("recreate-df-id")); when(datafeedApi.readDatafeed("test-id", "1234", "1234", ackId)).thenThrow(new ApiException(400, "client-error")); - when(datafeedApi.deleteDatafeed("test-id", "1234", "1234")).thenThrow(new ApiException(500, "client-error")); + when(datafeedApi.deleteDatafeed("test-id", "1234", "1234")).thenThrow(new ApiException(502, "client-error")); - assertThrows(ApiException.class, this.datafeedService::start); + assertThrows(NestedRetryException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed("1234", "1234"); verify(datafeedApi, times(1)).readDatafeed("test-id", "1234", "1234", ackId); verify(datafeedApi, times(2)).deleteDatafeed("test-id", "1234", "1234"); @@ -314,7 +332,7 @@ void testStartAuthErrorDeleteDatafeed() throws ApiException, AuthUnauthorizedExc when(datafeedApi.deleteDatafeed("test-id", "1234", "1234")).thenThrow(new ApiException(401, "client-error")); doThrow(AuthUnauthorizedException.class).when(authSession).refresh(); - assertThrows(ApiException.class, this.datafeedService::start); + assertThrows(NestedRetryException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed("1234", "1234"); verify(datafeedApi, times(1)).readDatafeed("test-id", "1234", "1234", ackId); verify(datafeedApi, times(1)).deleteDatafeed("test-id", "1234", "1234"); diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/stream/StreamServiceTest.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/stream/StreamServiceTest.java index 4d15f6629..76dee4770 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/stream/StreamServiceTest.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/stream/StreamServiceTest.java @@ -8,6 +8,8 @@ import com.symphony.bdk.http.api.ApiClient; import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.stream.constant.AttachmentSort; import com.symphony.bdk.core.test.MockApiClient; import com.symphony.bdk.core.test.JsonHelper; @@ -58,7 +60,7 @@ void setUp() { this.mockApiClient = new MockApiClient(); AuthSession authSession = mock(AuthSession.class); ApiClient podClient = mockApiClient.getApiClient("/pod"); - this.service = new StreamService(new StreamsApi(podClient), authSession); + this.service = new StreamService(new StreamsApi(podClient), authSession, new RetryWithRecoveryBuilder<>()); when(authSession.getSessionToken()).thenReturn("1234"); when(authSession.getKeyManagerToken()).thenReturn("1234"); diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/user/UserServiceTest.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/user/UserServiceTest.java index 7505ea114..094a0b425 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/user/UserServiceTest.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/user/UserServiceTest.java @@ -14,6 +14,8 @@ import com.symphony.bdk.http.api.ApiException; import com.symphony.bdk.http.api.ApiRuntimeException; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.config.model.BdkRetryConfig; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.user.constant.RoleId; import com.symphony.bdk.core.service.user.constant.UserFeature; import com.symphony.bdk.core.service.user.mapper.UserDetailMapper; @@ -63,7 +65,6 @@ class UserServiceTest { private static final String UPDATE_FEATURE_ENTITLEMENTS_OF_USER = "/pod/v1/admin/user/{uid}/features/update"; private static final String GET_STATUS_OF_USER = "/pod/v1/admin/user/{uid}/status"; private static final String UPDATE_STATUS_OF_USER = "/pod/v1/admin/user/{uid}/status/update"; - private static final String GET_USER_V2 = "/pod/v2/user"; private static final String SEARCH_USERS_V3 = "/pod/v3/users"; private static final String SEARCH_USER_BY_QUERY = "/pod/v1/user/search"; @@ -81,7 +82,7 @@ void init() { this.spiedUserApi = spy(userApi); UsersApi usersApi = new UsersApi(podClient); this.spiedUsersApi = spy(usersApi); - this.service = new UserService(this.spiedUserApi, this.spiedUsersApi, authSession); + this.service = new UserService(this.spiedUserApi, this.spiedUsersApi, authSession, new RetryWithRecoveryBuilder()); when(authSession.getSessionToken()).thenReturn("1234"); when(authSession.getKeyManagerToken()).thenReturn("1234"); diff --git a/symphony-bdk-http/symphony-bdk-http-api/src/main/java/com/symphony/bdk/http/api/ApiException.java b/symphony-bdk-http/symphony-bdk-http-api/src/main/java/com/symphony/bdk/http/api/ApiException.java index 5a5c90615..91323934c 100644 --- a/symphony-bdk-http/symphony-bdk-http-api/src/main/java/com/symphony/bdk/http/api/ApiException.java +++ b/symphony-bdk-http/symphony-bdk-http-api/src/main/java/com/symphony/bdk/http/api/ApiException.java @@ -57,6 +57,15 @@ public ApiException(int code, String message, Map> response this.responseBody = responseBody; } + /** + * Indicates if the error is not a fatal one and if the api call can be subsequently retried. + * + * @return true if it error code is 401 unauthorized or 429 too many requests or 5xx greater than 500. + */ + public boolean isMinorError() { + return isServerError() || isUnauthorized() || isTooManyRequestsError(); + } + /** * Check if response status is unauthorized or not. * @@ -76,11 +85,20 @@ public boolean isClientError() { } /** - * Check if response status is server error or not + * Check if response status is a server error (5xx) but not an internal server error (500) * - * @return true if response status equals or greater than 500, false otherwise + * @return true if response status strictly greater than 500, false otherwise */ public boolean isServerError() { - return this.code >= HttpURLConnection.HTTP_INTERNAL_ERROR; + return this.code > HttpURLConnection.HTTP_INTERNAL_ERROR; + } + + /** + * Check if response status corresponds to a too many requests error (429) + * + * @return true if error code is 429 + */ + public boolean isTooManyRequestsError() { + return this.code == 429; } } diff --git a/symphony-bdk-spring/bdk-core-starter/src/main/java/com/symphony/bdk/spring/config/BdkServiceConfig.java b/symphony-bdk-spring/bdk-core-starter/src/main/java/com/symphony/bdk/spring/config/BdkServiceConfig.java index f51d1f300..b10363451 100644 --- a/symphony-bdk-spring/bdk-core-starter/src/main/java/com/symphony/bdk/spring/config/BdkServiceConfig.java +++ b/symphony-bdk-spring/bdk-core-starter/src/main/java/com/symphony/bdk/spring/config/BdkServiceConfig.java @@ -1,6 +1,8 @@ package com.symphony.bdk.spring.config; import com.symphony.bdk.core.auth.AuthSession; +import com.symphony.bdk.core.config.model.BdkConfig; +import com.symphony.bdk.core.retry.RetryWithRecoveryBuilder; import com.symphony.bdk.core.service.MessageService; import com.symphony.bdk.core.service.SessionService; import com.symphony.bdk.core.service.stream.StreamService; @@ -15,6 +17,7 @@ import com.symphony.bdk.gen.api.StreamsApi; import com.symphony.bdk.gen.api.UserApi; import com.symphony.bdk.gen.api.UsersApi; +import com.symphony.bdk.http.api.ApiException; import com.symphony.bdk.template.api.TemplateEngine; import com.symphony.bdk.template.freemarker.FreeMarkerEngine; @@ -30,20 +33,20 @@ public class BdkServiceConfig { @Bean @ConditionalOnMissingBean - public SessionService sessionService(SessionApi sessionApi) { - return new SessionService(sessionApi); + public SessionService sessionService(SessionApi sessionApi, BdkConfig config) { + return new SessionService(sessionApi, new RetryWithRecoveryBuilder<>().retryConfig(config.getRetry())); } @Bean @ConditionalOnMissingBean - public StreamService streamService(StreamsApi streamsApi, AuthSession botSession) { - return new StreamService(streamsApi, botSession); + public StreamService streamService(StreamsApi streamsApi, AuthSession botSession, BdkConfig config) { + return new StreamService(streamsApi, botSession, getRetryBuilder(config, botSession)); } @Bean @ConditionalOnMissingBean - public UserService userService(UserApi userApi, UsersApi usersApi, AuthSession botSession) { - return new UserService(userApi, usersApi, botSession); + public UserService userService(UserApi userApi, UsersApi usersApi, AuthSession botSession, BdkConfig config) { + return new UserService(userApi, usersApi, botSession, getRetryBuilder(config, botSession)); } @Bean @@ -63,9 +66,16 @@ public MessageService messageService( final AttachmentsApi attachmentsApi, final DefaultApi defaultApi, final AuthSession botSession, - final TemplateEngine templateEngine + final TemplateEngine templateEngine, + final BdkConfig config ) { return new MessageService(messagesApi, messageApi, messageSuppressionApi, streamsApi, podApi, attachmentsApi, - defaultApi, botSession, templateEngine); + defaultApi, botSession, templateEngine, getRetryBuilder(config, botSession)); + } + + private RetryWithRecoveryBuilder getRetryBuilder(BdkConfig config, AuthSession botSession) { + return new RetryWithRecoveryBuilder<>() + .retryConfig(config.getRetry()) + .recoveryStrategy(ApiException::isUnauthorized, botSession::refresh); } }