From 3eed78ad58cdcbee26e7d4d35ad973c3a659bb1a Mon Sep 17 00:00:00 2001 From: Yong Sheng Tan Date: Fri, 6 Sep 2019 15:16:22 +0800 Subject: [PATCH] Added exponential backoff for handling connection errors --- .../clients/symphony/api/DatafeedClient.java | 16 ++++++-------- .../java/services/DatafeedEventsService.java | 21 +++++++++++++++---- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/main/java/clients/symphony/api/DatafeedClient.java b/src/main/java/clients/symphony/api/DatafeedClient.java index ed97d01ad..d2e5708c2 100755 --- a/src/main/java/clients/symphony/api/DatafeedClient.java +++ b/src/main/java/clients/symphony/api/DatafeedClient.java @@ -22,10 +22,12 @@ public final class DatafeedClient extends APIClient { private final Logger logger = LoggerFactory.getLogger(DatafeedClient.class); private SymBotClient botClient; private SymConfig config; + private static String agentTarget; public DatafeedClient(SymBotClient client) { this.botClient = client; this.config = client.getConfig(); + agentTarget = CommonConstants.HTTPS_PREFIX + config.getAgentHost() + ":" + config.getAgentPort(); } public String createDatafeed() throws SymClientException { @@ -33,12 +35,10 @@ public String createDatafeed() throws SymClientException { StringId datafeedId = null; try { logger.info("Creating new datafeed for bot {} .....", botClient.getBotUserInfo().getUsername()); - response = botClient.getAgentClient().target(CommonConstants.HTTPS_PREFIX - + config.getAgentHost() + ":" + config.getAgentPort()) + response = botClient.getAgentClient().target(agentTarget) .path(AgentConstants.CREATEDATAFEED) .request(MediaType.APPLICATION_JSON) - .header("sessionToken", - botClient.getSymAuth().getSessionToken()) + .header("sessionToken", botClient.getSymAuth().getSessionToken()) .header("keyManagerToken", botClient.getSymAuth().getKmToken()) .post(null); if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL) { @@ -61,16 +61,12 @@ public String createDatafeed() throws SymClientException { } } - public List readDatafeed(String id) - throws SymClientException { + public List readDatafeed(String id) throws SymClientException { List datafeedEvents = null; Response response = null; logger.debug("Reading datafeed {}", id); try { - WebTarget webTarget = botClient.getAgentClient().target( - CommonConstants.HTTPS_PREFIX - + config.getAgentHost() - + ":" + config.getAgentPort()); + WebTarget webTarget = botClient.getAgentClient().target(agentTarget); response = webTarget .path(AgentConstants.READDATAFEED.replace("{id}", id)) .request(MediaType.APPLICATION_JSON) diff --git a/src/main/java/services/DatafeedEventsService.java b/src/main/java/services/DatafeedEventsService.java index f98bcdc75..68e8dc617 100755 --- a/src/main/java/services/DatafeedEventsService.java +++ b/src/main/java/services/DatafeedEventsService.java @@ -42,12 +42,12 @@ public DatafeedEventsService(SymBotClient client) { int threadPoolSize = client.getConfig().getDatafeedEventsThreadpoolSize(); THREADPOOL_SIZE = threadPoolSize > 0 ? threadPoolSize : 5; - int errorTimeout = client.getConfig().getDatafeedEventsErrorTimeout(); - TIMEOUT_NO_OF_SECS = errorTimeout > 0 ? errorTimeout : 30; + resetTimeout(); while (datafeedId == null) { try { datafeedId = datafeedClient.createDatafeed(); + resetTimeout(); } catch (ProcessingException e) { handleError(e); } @@ -56,6 +56,11 @@ public DatafeedEventsService(SymBotClient client) { stop.set(false); } + private void resetTimeout() { + int errorTimeout = this.botClient.getConfig().getDatafeedEventsErrorTimeout(); + TIMEOUT_NO_OF_SECS = errorTimeout > 0 ? errorTimeout : 30; + } + public void addListeners(DatafeedListener... listeners) { for (DatafeedListener listener : listeners) { if (listener instanceof RoomListener) { @@ -126,7 +131,9 @@ public void readDatafeed() { CompletableFuture future = CompletableFuture .supplyAsync(() -> { try { - return datafeedClient.readDatafeed(datafeedId); + List events = datafeedClient.readDatafeed(datafeedId); + resetTimeout(); + return events; } catch (Exception e) { throw new RuntimeException(e); } @@ -170,7 +177,11 @@ public void restartDatafeedService() { private void handleError(Throwable e) { if (e.getMessage().endsWith("SocketTimeoutException: Read timed out")) { int connectionTimeoutSeconds = botClient.getConfig().getConnectionTimeout() / 1000; - logger.info("Connection timed out after {} seconds", connectionTimeoutSeconds); + logger.error("Connection timed out after {} seconds", connectionTimeoutSeconds); + } else if (e.getMessage().endsWith("Origin Error")) { + logger.error("Pod is unavailable"); + } else if (e.getMessage().contains("Could not find a datafeed with the id")) { + logger.error(e.getMessage()); } else { logger.error("HandlerError error", e); } @@ -182,6 +193,7 @@ private void handleError(Throwable e) { ((SymLoadBalancedConfig) config).rotateAgent(); } datafeedId = datafeedClient.createDatafeed(); + resetTimeout(); } catch (SymClientException e1) { sleep(); handleError(e); @@ -191,6 +203,7 @@ private void handleError(Throwable e) { private void sleep() { try { TimeUnit.SECONDS.sleep(TIMEOUT_NO_OF_SECS); + TIMEOUT_NO_OF_SECS *= 2; } catch (InterruptedException ie) { logger.error("Error trying to sleep ", ie); }