diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 43005eea81..a9de311641 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -21,14 +21,16 @@ import java.net.SocketAddress; import java.time.Duration; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Supplier; import io.lettuce.core.ClientOptions; import io.lettuce.core.ConnectionBuilder; import io.lettuce.core.ConnectionEvents; +import io.lettuce.core.RedisException; import io.lettuce.core.event.EventBus; import io.lettuce.core.event.connection.ReconnectAttemptEvent; import io.lettuce.core.event.connection.ReconnectFailedEvent; @@ -84,9 +86,9 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private final String epid; - private final boolean useAutoBatchFlushEndpoint; + private final boolean useAutoBatchFlush; - private final Endpoint endpoint; + private final Consumer> endpointFailedToReconnectNotifier; private Channel channel; @@ -148,8 +150,15 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo this.eventBus = eventBus; this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI); this.epid = endpoint.getId(); - this.endpoint = endpoint; - this.useAutoBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint; + if (endpoint instanceof AutoBatchFlushEndpoint) { + this.useAutoBatchFlush = true; + endpointFailedToReconnectNotifier = throwableSupplier -> ((AutoBatchFlushEndpoint) endpoint) + .notifyReconnectFailed(throwableSupplier.get()); + } else { + this.useAutoBatchFlush = false; + endpointFailedToReconnectNotifier = ignoredThrowableSupplier -> { + }; + } Mono wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr) .onErrorResume(t -> { @@ -215,20 +224,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { channel = null; if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) { - if (!isEventLoopGroupActive()) { - logger.debug("isEventLoopGroupActive() == false"); - return; - } - - if (!isListenOnChannelInactive()) { - logger.debug("Skip reconnect scheduling, listener disabled"); - return; - } - - if (!useAutoBatchFlushEndpoint) { + if (!useAutoBatchFlush) { this.scheduleReconnect(); + } else { + doReconnectOnAutoBatchFlushEndpointQuiescence = this::scheduleReconnect; } - doReconnectOnAutoBatchFlushEndpointQuiescence = this::scheduleReconnect; // otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence } else { logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx); @@ -237,7 +237,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } - boolean willReconnect() { + boolean willReconnectOnAutoBatchFlushEndpointQuiescence() { return doReconnectOnAutoBatchFlushEndpointQuiescence != null; } @@ -261,14 +261,16 @@ public void scheduleReconnect() { logger.debug("{} scheduleReconnect()", logPrefix()); if (!isEventLoopGroupActive()) { - logger.debug("isEventLoopGroupActive() == false"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "isEventLoopGroupActive() == false"; + logger.debug(errMsg); + notifyEndpointFailedToReconnect(errMsg); return; } if (!isListenOnChannelInactive()) { - logger.debug("Skip reconnect scheduling, listener disabled"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "Skip reconnect scheduling, listener disabled"; + logger.debug(errMsg); + notifyEndpointFailedToReconnect(errMsg); return; } @@ -285,8 +287,9 @@ public void scheduleReconnect() { reconnectScheduleTimeout = null; if (!isEventLoopGroupActive()) { - logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "Cannot execute scheduled reconnect timer, reconnect workers are terminated"; + logger.warn(errMsg); + notifyEndpointFailedToReconnect(errMsg); return; } @@ -302,18 +305,12 @@ public void scheduleReconnect() { } } else { logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix()); - notifyEndpointFailedToConnectIfNeeded(); + notifyEndpointFailedToReconnect("Skipping scheduleReconnect() because I have an active channel"); } } - private void notifyEndpointFailedToConnectIfNeeded() { - notifyEndpointFailedToConnectIfNeeded(new CancellationException()); - } - - private void notifyEndpointFailedToConnectIfNeeded(Exception e) { - if (useAutoBatchFlushEndpoint) { - ((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e); - } + void notifyEndpointFailedToReconnect(String msg) { + endpointFailedToReconnectNotifier.accept(() -> new RedisException(msg)); } /** @@ -335,26 +332,29 @@ public void run(int attempt) throws Exception { * @param delay retry delay. * @throws Exception when reconnection fails. */ - private void run(int attempt, Duration delay) throws Exception { + private void run(int attempt, Duration delay) { reconnectSchedulerSync.set(false); reconnectScheduleTimeout = null; if (!isEventLoopGroupActive()) { - logger.debug("isEventLoopGroupActive() == false"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "isEventLoopGroupActive() == false"; + logger.debug(errMsg); + notifyEndpointFailedToReconnect(errMsg); return; } if (!isListenOnChannelInactive()) { - logger.debug("Skip reconnect scheduling, listener disabled"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "Skip reconnect scheduling, listener disabled"; + logger.debug(errMsg); + notifyEndpointFailedToReconnect(errMsg); return; } if (isReconnectSuspended()) { - logger.debug("Skip reconnect scheduling, reconnect is suspended"); - notifyEndpointFailedToConnectIfNeeded(); + final String msg = "Skip reconnect scheduling, reconnect is suspended"; + logger.debug(msg); + notifyEndpointFailedToReconnect(msg); return; } @@ -411,13 +411,14 @@ private void run(int attempt, Duration delay) throws Exception { if (!isReconnectSuspended()) { scheduleReconnect(); } else { - notifyEndpointFailedToConnectIfNeeded(); + endpointFailedToReconnectNotifier + .accept(() -> new RedisException("got error and then reconnect is suspended", t)); } }); } catch (Exception e) { logger.log(warnLevel, "Cannot reconnect: {}", e.toString()); eventBus.publish(new ReconnectFailedEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, e, attempt)); - notifyEndpointFailedToConnectIfNeeded(e); + endpointFailedToReconnectNotifier.accept(() -> e); } } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java index 6d175baa79..e17c1c9809 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java @@ -390,7 +390,8 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel, return; } - boolean willReconnect = connectionWatchdog != null && connectionWatchdog.willReconnect(); + boolean willReconnect = connectionWatchdog != null + && connectionWatchdog.willReconnectOnAutoBatchFlushEndpointQuiescence(); RedisException exception = null; // Unlike DefaultEndpoint, here we don't check reliability since connectionWatchdog.willReconnect() already does it. if (isClosed()) {