Skip to content

Commit

Permalink
Added exponential backoff for handling connection errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Yong Sheng Tan committed Sep 6, 2019
1 parent 37b7f1a commit 3eed78a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
16 changes: 6 additions & 10 deletions src/main/java/clients/symphony/api/DatafeedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ public final class DatafeedClient extends APIClient {
private final Logger logger = LoggerFactory.getLogger(DatafeedClient.class);
private SymBotClient botClient;
private SymConfig config;
private static String agentTarget;

public DatafeedClient(SymBotClient client) {
this.botClient = client;
this.config = client.getConfig();
agentTarget = CommonConstants.HTTPS_PREFIX + config.getAgentHost() + ":" + config.getAgentPort();
}

public String createDatafeed() throws SymClientException {
Response response = null;
StringId datafeedId = null;
try {
logger.info("Creating new datafeed for bot {} .....", botClient.getBotUserInfo().getUsername());
response = botClient.getAgentClient().target(CommonConstants.HTTPS_PREFIX
+ config.getAgentHost() + ":" + config.getAgentPort())
response = botClient.getAgentClient().target(agentTarget)
.path(AgentConstants.CREATEDATAFEED)
.request(MediaType.APPLICATION_JSON)
.header("sessionToken",
botClient.getSymAuth().getSessionToken())
.header("sessionToken", botClient.getSymAuth().getSessionToken())
.header("keyManagerToken", botClient.getSymAuth().getKmToken())
.post(null);
if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL) {
Expand All @@ -61,16 +61,12 @@ public String createDatafeed() throws SymClientException {
}
}

public List<DatafeedEvent> readDatafeed(String id)
throws SymClientException {
public List<DatafeedEvent> readDatafeed(String id) throws SymClientException {
List<DatafeedEvent> datafeedEvents = null;
Response response = null;
logger.debug("Reading datafeed {}", id);
try {
WebTarget webTarget = botClient.getAgentClient().target(
CommonConstants.HTTPS_PREFIX
+ config.getAgentHost()
+ ":" + config.getAgentPort());
WebTarget webTarget = botClient.getAgentClient().target(agentTarget);
response = webTarget
.path(AgentConstants.READDATAFEED.replace("{id}", id))
.request(MediaType.APPLICATION_JSON)
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/services/DatafeedEventsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public DatafeedEventsService(SymBotClient client) {

int threadPoolSize = client.getConfig().getDatafeedEventsThreadpoolSize();
THREADPOOL_SIZE = threadPoolSize > 0 ? threadPoolSize : 5;
int errorTimeout = client.getConfig().getDatafeedEventsErrorTimeout();
TIMEOUT_NO_OF_SECS = errorTimeout > 0 ? errorTimeout : 30;
resetTimeout();

while (datafeedId == null) {
try {
datafeedId = datafeedClient.createDatafeed();
resetTimeout();
} catch (ProcessingException e) {
handleError(e);
}
Expand All @@ -56,6 +56,11 @@ public DatafeedEventsService(SymBotClient client) {
stop.set(false);
}

private void resetTimeout() {
int errorTimeout = this.botClient.getConfig().getDatafeedEventsErrorTimeout();
TIMEOUT_NO_OF_SECS = errorTimeout > 0 ? errorTimeout : 30;
}

public void addListeners(DatafeedListener... listeners) {
for (DatafeedListener listener : listeners) {
if (listener instanceof RoomListener) {
Expand Down Expand Up @@ -126,7 +131,9 @@ public void readDatafeed() {
CompletableFuture<Object> future = CompletableFuture
.supplyAsync(() -> {
try {
return datafeedClient.readDatafeed(datafeedId);
List<DatafeedEvent> events = datafeedClient.readDatafeed(datafeedId);
resetTimeout();
return events;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -170,7 +177,11 @@ public void restartDatafeedService() {
private void handleError(Throwable e) {
if (e.getMessage().endsWith("SocketTimeoutException: Read timed out")) {
int connectionTimeoutSeconds = botClient.getConfig().getConnectionTimeout() / 1000;
logger.info("Connection timed out after {} seconds", connectionTimeoutSeconds);
logger.error("Connection timed out after {} seconds", connectionTimeoutSeconds);
} else if (e.getMessage().endsWith("Origin Error")) {
logger.error("Pod is unavailable");
} else if (e.getMessage().contains("Could not find a datafeed with the id")) {
logger.error(e.getMessage());
} else {
logger.error("HandlerError error", e);
}
Expand All @@ -182,6 +193,7 @@ private void handleError(Throwable e) {
((SymLoadBalancedConfig) config).rotateAgent();
}
datafeedId = datafeedClient.createDatafeed();
resetTimeout();
} catch (SymClientException e1) {
sleep();
handleError(e);
Expand All @@ -191,6 +203,7 @@ private void handleError(Throwable e) {
private void sleep() {
try {
TimeUnit.SECONDS.sleep(TIMEOUT_NO_OF_SECS);
TIMEOUT_NO_OF_SECS *= 2;
} catch (InterruptedException ie) {
logger.error("Error trying to sleep ", ie);
}
Expand Down

0 comments on commit 3eed78a

Please sign in to comment.