Skip to content

Commit

Permalink
re-check reconnecting state before emitting
Browse files Browse the repository at this point in the history
 - since the connectExecutor thread starts a reconnect which in turn causes the websocket connection to go into onConnected() method in the default(callback). This can cause an emit of message before reconnecting boolean is set to false
 - introduced a mechanism in order to make sure and retry several times to get the correct value

Signed-off-by: Kalin Kostashki <kalin.kostashki@bosch.com>
  • Loading branch information
kalinkostashki committed Oct 21, 2024
1 parent 7af0486 commit 176db08
Showing 1 changed file with 42 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -104,21 +94,22 @@ public final class WebSocketMessagingProvider extends WebSocketAdapter implement
private final AtomicBoolean manuallyPerformReconnect = new AtomicBoolean(false);

private Runnable channelCloser;
@Nullable private Throwable lastReceivedDittoProtocolError = null;
@Nullable
private Throwable lastReceivedDittoProtocolError = null;
private CountDownLatch lastReceivedDittoProtocolErrorLatch = new CountDownLatch(1);

/**
* Constructs a new {@code WsMessagingProvider}.
*
* @param adaptableBus the bus to publish all messages to.
* @param adaptableBus the bus to publish all messages to.
* @param messagingConfiguration the specific configuration to apply.
* @param authenticationProvider provider for the authentication method with which to open the websocket.
* @param callbackExecutor the executor service to run callbacks with.
* @param callbackExecutor the executor service to run callbacks with.
*/
private WebSocketMessagingProvider(final AdaptableBus adaptableBus,
final MessagingConfiguration messagingConfiguration,
final AuthenticationProvider<WebSocket> authenticationProvider,
final ExecutorService callbackExecutor) {
final MessagingConfiguration messagingConfiguration,
final AuthenticationProvider<WebSocket> authenticationProvider,
final ExecutorService callbackExecutor) {
this.adaptableBus = adaptableBus;
this.messagingConfiguration = messagingConfiguration;
this.authenticationProvider = authenticationProvider;
Expand All @@ -129,7 +120,8 @@ private WebSocketMessagingProvider(final AdaptableBus adaptableBus,
subscriptionMessages = new ConcurrentHashMap<>();
webSocket = new AtomicReference<>();

channelCloser = () -> {};
channelCloser = () -> {
};
disconnectionHandler = new DisconnectedContext.DisconnectionHandler() {

@Override
Expand Down Expand Up @@ -171,14 +163,14 @@ private static ScheduledExecutorService createConnectExecutor(final String sessi
*
* @param messagingConfiguration configuration of messaging.
* @param authenticationProvider provides authentication.
* @param defaultExecutor the executor for messages.
* @param scheduledExecutor the scheduled executor for scheduling tasks.
* @param defaultExecutor the executor for messages.
* @param scheduledExecutor the scheduled executor for scheduling tasks.
* @return the provider.
*/
public static WebSocketMessagingProvider newInstance(final MessagingConfiguration messagingConfiguration,
final AuthenticationProvider<WebSocket> authenticationProvider,
final ExecutorService defaultExecutor,
final ScheduledExecutorService scheduledExecutor) {
final AuthenticationProvider<WebSocket> authenticationProvider,
final ExecutorService defaultExecutor,
final ScheduledExecutorService scheduledExecutor) {
checkNotNull(messagingConfiguration, "messagingConfiguration");
checkNotNull(authenticationProvider, "authenticationProvider");
checkNotNull(defaultExecutor, "defaultExecutor");
Expand Down Expand Up @@ -364,15 +356,32 @@ public void onConnected(final WebSocket websocket, final Map<String, List<String
if (!subscriptionMessages.isEmpty()) {
LOGGER.info("Client <{}>: Subscribing again for messages from backend after reconnection",
sessionId);
subscriptionMessages.values().forEach(this::emit);
CompletableFuture<Boolean> isReconnecting = new CompletableFuture<>();
Runnable checkTask = () -> {
if (!reconnecting.get()) {
isReconnecting.complete(true); // Complete the future if flag is true
}
};
ScheduledFuture<?> future = connectExecutor.scheduleAtFixedRate(checkTask, 0, 20, TimeUnit.MILLISECONDS);
try {
if (Boolean.TRUE.equals(isReconnecting.get(160, TimeUnit.MILLISECONDS))) { // Ensures 4 retries of the scheduleAtFixedRate method.
future.cancel(true);
LOGGER.debug("Reconnecting is completed -> emitting subscriptionMessages: {}", subscriptionMessages);
subscriptionMessages.values().forEach(this::emit);
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
isReconnecting.complete(false);
future.cancel(true);
LOGGER.error("Reconnecting failed: {}", e.getMessage());
}
}
});
}

@Override
public void onDisconnected(final WebSocket websocket, final WebSocketFrame serverCloseFrame,
final WebSocketFrame clientCloseFrame,
final boolean closedByServer) {
final WebSocketFrame clientCloseFrame,
final boolean closedByServer) {

callbackExecutor.execute(() -> {
if (closedByServer) {
Expand All @@ -390,8 +399,7 @@ public void onDisconnected(final WebSocket websocket, final WebSocketFrame serve
sessionId, messagingConfiguration.getEndpointUri());
awaitLastReceivedDittoProtocolError();
handleReconnectionIfEnabled(DisconnectedContext.Source.CLIENT, lastReceivedDittoProtocolError);
}
else {
} else {
// only when close() was called we should end here
LOGGER.info("Client <{}>: WebSocket connection to endpoint <{}> was closed by user",
sessionId, messagingConfiguration.getEndpointUri());
Expand Down Expand Up @@ -427,15 +435,15 @@ public void onError(final WebSocket websocket, final WebSocketException cause) {
}

private CompletionStage<WebSocket> connectWithPotentialRetries(final String actionName,
final Supplier<WebSocket> webSocket,
final CompletableFuture<WebSocket> future,
final boolean retry) {
final Supplier<WebSocket> webSocket,
final CompletableFuture<WebSocket> future,
final boolean retry) {

try {
final Predicate<Throwable> isRecoverable =
retry ? WebSocketMessagingProvider::isRecoverable : exception -> false;
return Retry.retryTo(actionName,
() -> initiateConnection(webSocket.get()))
() -> initiateConnection(webSocket.get()))
.inClientSession(sessionId)
.withExecutors(connectExecutor, callbackExecutor)
.notifyOnError(messagingConfiguration.getConnectionErrorHandler().orElse(null))
Expand All @@ -448,7 +456,7 @@ private CompletionStage<WebSocket> connectWithPotentialRetries(final String acti
}

private void handleReconnectionIfEnabled(final DisconnectedContext.Source disconnectionSource,
@Nullable final Throwable throwableSupplier) {
@Nullable final Throwable throwableSupplier) {

final Optional<Consumer<DisconnectedContext>> disconnectedListener =
messagingConfiguration.getDisconnectedListener();
Expand Down Expand Up @@ -492,7 +500,7 @@ private void doReconnect() {

private void reconnectWithRetries() {
this.connectWithPotentialRetries("reconnect WebSocket", this::recreateWebSocket, new CompletableFuture<>(),
messagingConfiguration.isReconnectEnabled() || manuallyPerformReconnect.get())
messagingConfiguration.isReconnectEnabled() || manuallyPerformReconnect.get())
.thenAccept(reconnectedWebSocket -> {
setWebSocket(reconnectedWebSocket);
reconnecting.set(false);
Expand Down

0 comments on commit 176db08

Please sign in to comment.