Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APP-2821: Implement a configurable retry mechanism #232

Merged
merged 21 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import lombok.Getter;
import org.apiguardian.api.API;

import javax.net.ssl.HttpsURLConnection;
import javax.ws.rs.core.GenericType;
import java.net.HttpURLConnection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -75,11 +77,15 @@ public boolean isClientError() {
}

/**
* Check if response status is server error or not
* Check if response status is server error and not an internal server error
*
* @return true if response status equals or greater than 500, false otherwise
* @return true if response status strictly greater than 500, false otherwise
*/
public boolean isServerError() {
return this.code >= HttpURLConnection.HTTP_INTERNAL_ERROR;
return this.code > HttpURLConnection.HTTP_INTERNAL_ERROR;
symphony-elias marked this conversation as resolved.
Show resolved Hide resolved
}

public boolean isTooManyRequestsError() {
return this.code == 429;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public class BdkConfig {
public boolean isOboConfigured() {
return app.isConfigured();
}

public BdkRetryConfig getDatafeedRetryConfig() {
return datafeed.getRetry() == null ? retry : datafeed.getRetry();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.symphony.bdk.core.service;


import static com.symphony.bdk.core.util.SupplierWithApiException.callAndCatchApiException;
import static com.symphony.bdk.core.util.function.SupplierWithApiException.callAndCatchApiException;

import com.symphony.bdk.core.api.invoker.util.ApiUtils;
import com.symphony.bdk.core.service.pagination.PaginatedApi;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.symphony.bdk.core.service.datafeed.exception;

public class NestedRetryException extends RuntimeException {
public NestedRetryException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@

import com.symphony.bdk.core.api.invoker.ApiException;
import com.symphony.bdk.core.auth.AuthSession;
import com.symphony.bdk.core.auth.exception.AuthUnauthorizedException;
import com.symphony.bdk.core.config.model.BdkConfig;
import com.symphony.bdk.core.config.model.BdkRetryConfig;
import com.symphony.bdk.core.service.datafeed.DatafeedService;
import com.symphony.bdk.core.service.datafeed.RealTimeEventListener;
import com.symphony.bdk.core.util.BdkExponentialFunction;
import com.symphony.bdk.core.util.function.ConsumerWithThrowable;
import com.symphony.bdk.core.util.function.RetryWithRecoveryBuilder;
import com.symphony.bdk.gen.api.DatafeedApi;
import com.symphony.bdk.gen.api.model.V4Event;

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import javax.ws.rs.ProcessingException;

Expand All @@ -26,93 +28,92 @@
@Slf4j
abstract class AbstractDatafeedService implements DatafeedService {

protected final AuthSession authSession;
protected final BdkConfig bdkConfig;
protected final List<RealTimeEventListener> listeners;
protected final RetryConfig retryConfig;
protected DatafeedApi datafeedApi;

public AbstractDatafeedService(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) {
this.datafeedApi = datafeedApi;
this.listeners = new ArrayList<>();
this.authSession = authSession;
this.bdkConfig = config;
BdkRetryConfig bdkRetryConfig = this.bdkConfig.getDatafeed().getRetry() == null ? this.bdkConfig.getRetry() : this.bdkConfig.getDatafeed().getRetry();
this.retryConfig = RetryConfig.custom()
.maxAttempts(bdkRetryConfig.getMaxAttempts())
.intervalFunction(BdkExponentialFunction.ofExponentialBackoff(bdkRetryConfig))
.retryOnException(e -> {
if (e instanceof ApiException) {
ApiException apiException = (ApiException) e;
return apiException.isServerError() || apiException.isUnauthorized();
}
return e instanceof ProcessingException;
})
.build();
}
protected final AuthSession authSession;
protected final BdkConfig bdkConfig;
protected final List<RealTimeEventListener> listeners;
protected final RetryWithRecoveryBuilder retryWithRecoveryBuilder;
protected DatafeedApi datafeedApi;

/**
* {@inheritDoc}
*/
@Override
public void subscribe(RealTimeEventListener listener) {
listeners.add(listener);
}
public AbstractDatafeedService(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config) {
this.datafeedApi = datafeedApi;
this.listeners = new ArrayList<>();
this.authSession = authSession;
this.bdkConfig = config;
this.retryWithRecoveryBuilder = new RetryWithRecoveryBuilder<>()
.retryConfig(config.getDatafeedRetryConfig())
.retryOnException(this::isNetworkOrServerOrUnauthorizedError)
.recoveryStrategy(ApiException::isUnauthorized, this::refresh);
}

/**
* {@inheritDoc}
*/
@Override
public void unsubscribe(RealTimeEventListener listener) {
listeners.remove(listener);
}
/**
* {@inheritDoc}
*/
@Override
public void subscribe(RealTimeEventListener listener) {
listeners.add(listener);
}

/**
* {@inheritDoc}
*/
@Override
public void unsubscribe(RealTimeEventListener listener) {
listeners.remove(listener);
}

/**
* Handle a received listener by using the subscribed {@link RealTimeEventListener}.
*
* @param events List of Datafeed events to be handled
*
*/
protected void handleV4EventList(List<V4Event> events) {
for (V4Event event : events) {
if (event == null || event.getType() == null) {
continue;
}
if (this.isSelfGeneratedEvent(event)) {
continue;
}
try {
RealTimeEventType eventType = RealTimeEventType.valueOf(event.getType());
for (RealTimeEventListener listener : listeners) {
eventType.dispatch(listener, event);
}
} catch (IllegalArgumentException e) {
log.warn("Receive events with unknown type: {}", event.getType());
}
/**
* Handle a received listener by using the subscribed {@link RealTimeEventListener}.
*
* @param events List of Datafeed events to be handled
*/
protected void handleV4EventList(List<V4Event> events) {
for (V4Event event : events) {
if (event == null || event.getType() == null) {
continue;
}
if (this.isSelfGeneratedEvent(event)) {
continue;
}
try {
RealTimeEventType eventType = RealTimeEventType.valueOf(event.getType());
for (RealTimeEventListener listener : listeners) {
eventType.dispatch(listener, event);
}
} catch (IllegalArgumentException e) {
log.warn("Receive events with unknown type: {}", event.getType());
}
}
}

private boolean isSelfGeneratedEvent(V4Event event) {
return event.getInitiator() != null && event.getInitiator().getUser() != null
&& event.getInitiator().getUser().getUsername() != null
&& event.getInitiator().getUser().getUsername().equals(this.bdkConfig.getBot().getUsername());
}
private boolean isSelfGeneratedEvent(V4Event event) {
return event.getInitiator() != null && event.getInitiator().getUser() != null
&& event.getInitiator().getUser().getUsername() != null
&& event.getInitiator().getUser().getUsername().equals(this.bdkConfig.getBot().getUsername());
thibauult marked this conversation as resolved.
Show resolved Hide resolved
}

protected Retry getRetryInstance(String name, RetryConfig... config) {
Retry retry = config.length == 0 ? Retry.of(name, this.retryConfig) : Retry.of(name, config[0]);
retry.getEventPublisher().onRetry(event -> {
long intervalInMillis = event.getWaitInterval().toMillis();
double interval = intervalInMillis / 1000.0;
if (event.getLastThrowable() != null) {
log.debug("Datafeed service failed due to {}", event.getLastThrowable().getMessage());
}
log.info("Retry in {} secs...", interval);
});
return retry;
protected boolean isNetworkOrServerOrUnauthorizedOrClientError(Throwable t) {
if (t instanceof ApiException) {
ApiException apiException = (ApiException) t;
return apiException.isServerError() || apiException.isUnauthorized() || apiException.isTooManyRequestsError() || apiException.isClientError();
}
return t instanceof ProcessingException;
}

protected void setDatafeedApi(DatafeedApi datafeedApi) {
this.datafeedApi = datafeedApi;
protected boolean isNetworkOrServerOrUnauthorizedError(Throwable t) {
if (t instanceof ApiException) {
ApiException apiException = (ApiException) t;
return apiException.isServerError() || apiException.isUnauthorized() || apiException.isTooManyRequestsError();
}
return t instanceof ProcessingException;
}

protected void refresh() throws AuthUnauthorizedException {
log.info("Re-authenticate and try again");
authSession.refresh();
}

protected void setDatafeedApi(DatafeedApi datafeedApi) {
this.datafeedApi = datafeedApi;
}

}
Loading