diff --git a/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java b/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java index e8106fefd6..2f4b04b4c0 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java @@ -307,11 +307,14 @@ private void openConnections(ConnectionTracker tracker, Iterable redis for (RedisURI redisURI : redisURIs) { - if (redisURI.getHost() == null || tracker.contains(redisURI) || !isEventLoopActive()) { - continue; - } + CompletableFuture> sync = new CompletableFuture<>(); try { + + if (redisURI.getHost() == null || tracker.contains(redisURI) || !isEventLoopActive()) { + continue; + } + SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(redisURI); ConnectionFuture> connectionFuture = nodeConnectionFactory @@ -319,7 +322,6 @@ private void openConnections(ConnectionTracker tracker, Iterable redis // Note: timeout skew due to potential socket address resolution and connection work possible. - CompletableFuture> sync = new CompletableFuture<>(); Timeout cancelTimeout = clientResources.timer().newTimeout(it -> { String message = String.format("Unable to connect to [%s]: Timeout after %s", socketAddress, @@ -360,7 +362,10 @@ private void openConnections(ConnectionTracker tracker, Iterable redis tracker.addConnection(redisURI, sync); } catch (RuntimeException e) { - logger.warn(String.format("Unable to connect to [%s]", redisURI), e); + String message = String.format("Unable to connect to [%s]", redisURI); + logger.warn(message, e); + sync.completeExceptionally(new RedisConnectionException(message, e)); + tracker.addConnection(redisURI, sync); } } } diff --git a/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java b/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java index 48cc3e8acc..7c4a920584 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java @@ -21,6 +21,7 @@ import static io.lettuce.TestTags.UNIT_TEST; import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import static org.mockito.Mockito.anyLong; @@ -40,6 +41,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -455,6 +457,20 @@ void shouldCloseConnections() { verify(connection2).closeAsync(); } + /** + * @see Issue link + */ + @Test + @org.junit.jupiter.api.Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD) + void shouldHandleInvalidUrisWithoutDeadlock() { + List seed = Arrays.asList(RedisURI.create("redis://localhost:$(INVALID_DATA):CONFIG"), + RedisURI.create("redis://localhost:$(INVALID_DATA):CONFIG")); + CompletionException completionException = Assertions.assertThrows(CompletionException.class, + () -> sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join()); + assertThat(completionException) + .hasRootCauseInstanceOf(DefaultClusterTopologyRefresh.CannotRetrieveClusterPartitions.class); + } + @Test void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() {