From 1872cb8163f5633675dae8fd04db32ed21501cd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Thu, 14 Dec 2023 15:54:01 +0100 Subject: [PATCH 1/4] Fix liveness checking for unresponsive connections The driver performs liveness checking for connections when remove them from the pool. When this connections are not responsive, the driver hangs waiting for the result of the `SUCCESS` message get back. This problem occurs becaue driver are not taking in consideration the connection hint `connection.recv_timeout_seconds` in the liveness check routinge. The problem is solved by add the ConnectionReadTimeoutHandler to the pipeline also in case of liveness check ping. --- .../inbound/InboundMessageDispatcher.java | 4 ++ .../async/pool/NettyChannelHealthChecker.java | 22 +++++- .../pool/NettyChannelHealthCheckerTest.java | 67 +++++++++++++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 8cefed3be9..53e151c734 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -164,6 +164,10 @@ public void handleIgnoredMessage() { handler.onFailure(error); } + public HandlerHook getBeforeLastHandlerHook() { + return this.beforeLastHandlerHook; + } + private Optional getPendingResetHandler() { return handlers.stream() .filter(h -> h instanceof ResetResponseHandler) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java index 062f0255b2..09f093597d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java @@ -28,10 +28,14 @@ import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.PromiseNotifier; import java.time.Clock; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.internal.async.connection.AuthorizationStateListener; +import org.neo4j.driver.internal.async.connection.ChannelAttributes; +import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.handlers.PingResponseHandler; import org.neo4j.driver.internal.messaging.request.ResetMessage; import org.neo4j.driver.internal.messaging.v51.BoltProtocolV51; @@ -164,8 +168,24 @@ private boolean hasBeenIdleForTooLong(Channel channel) { private Future ping(Channel channel) { Promise result = channel.eventLoop().newPromise(); - messageDispatcher(channel).enqueue(new PingResponseHandler(result, channel, logging)); + var messageDispatcher = messageDispatcher(channel); + messageDispatcher.enqueue(new PingResponseHandler(result, channel, logging)); channel.writeAndFlush(ResetMessage.RESET, channel.voidPromise()); + attachConnectionReadTimeoutHandler(channel, messageDispatcher); return result; } + + private void attachConnectionReadTimeoutHandler(Channel channel, InboundMessageDispatcher messageDispatcher) { + ChannelAttributes.connectionReadTimeout(channel).ifPresent(connectionReadTimeout -> { + var connectionReadTimeoutHandler = + new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS); + channel.pipeline().addFirst(connectionReadTimeoutHandler); + log.debug("Added ConnectionReadTimeoutHandler"); + messageDispatcher.setBeforeLastHandlerHook((messageType) -> { + channel.pipeline().remove(connectionReadTimeoutHandler); + messageDispatcher.setBeforeLastHandlerHook(null); + log.debug("Removed ConnectionReadTimeoutHandler"); + }); + }); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java index b44553e3ac..1c7e47b3cb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java @@ -22,6 +22,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; @@ -29,6 +32,7 @@ import static org.mockito.Mockito.times; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authContext; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthContext; +import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher; @@ -52,9 +56,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.neo4j.driver.AuthTokenManager; import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.handlers.PingResponseHandler; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; import org.neo4j.driver.internal.messaging.request.ResetMessage; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; @@ -248,6 +255,65 @@ void shouldKeepIdleConnectionWhenPingSucceeds() { testPing(true); } + @Test + void shouldHandlePingWithConnectionReceiveTimeout() { + var idleTimeBeforeConnectionTest = 1000; + var connectionReadTimeout = 60L; + var settings = new PoolSettings( + DEFAULT_MAX_CONNECTION_POOL_SIZE, + DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, + NOT_CONFIGURED, + idleTimeBeforeConnectionTest); + var clock = Clock.systemUTC(); + var healthChecker = newHealthChecker(settings, clock); + + setCreationTimestamp(channel, clock.millis()); + setConnectionReadTimeout(channel, connectionReadTimeout); + setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2); + + var healthy = healthChecker.isHealthy(channel); + channel.runPendingTasks(); + + var firstElementOnPipeline = channel.pipeline().first(); + assertInstanceOf(ConnectionReadTimeoutHandler.class, firstElementOnPipeline); + assertNotNull(dispatcher.getBeforeLastHandlerHook()); + var readTimeoutHandler = (ConnectionReadTimeoutHandler) firstElementOnPipeline; + assertEquals(connectionReadTimeout * 1000L, readTimeoutHandler.getReaderIdleTimeInMillis()); + assertEquals(ResetMessage.RESET, single(channel.outboundMessages())); + assertFalse(healthy.isDone()); + + dispatcher.handleSuccessMessage(Collections.emptyMap()); + assertThat(await(healthy), is(true)); + assertNull(channel.pipeline().first()); + assertNull(dispatcher.getBeforeLastHandlerHook()); + } + + @Test + void shouldHandlePingWithoutConnectionReceiveTimeout() { + var idleTimeBeforeConnectionTest = 1000; + var settings = new PoolSettings( + DEFAULT_MAX_CONNECTION_POOL_SIZE, + DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, + NOT_CONFIGURED, + idleTimeBeforeConnectionTest); + var clock = Clock.systemUTC(); + var healthChecker = newHealthChecker(settings, clock); + + setCreationTimestamp(channel, clock.millis()); + setLastUsedTimestamp(channel, clock.millis() - idleTimeBeforeConnectionTest * 2); + + var healthy = healthChecker.isHealthy(channel); + channel.runPendingTasks(); + + assertNull(channel.pipeline().first()); + assertEquals(ResetMessage.RESET, single(channel.outboundMessages())); + assertFalse(healthy.isDone()); + + dispatcher.handleSuccessMessage(Collections.emptyMap()); + assertThat(await(healthy), is(true)); + assertNull(channel.pipeline().first()); + } + @Test void shouldDropIdleConnectionWhenPingFails() { testPing(false); @@ -291,6 +357,7 @@ private void testPing(boolean resetMessageSuccessful) { } } + private void testActiveConnectionCheck(boolean channelActive) { var settings = new PoolSettings( DEFAULT_MAX_CONNECTION_POOL_SIZE, From 9940d33f9ecc403b7b431ce49b87f985a645755e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Thu, 14 Dec 2023 16:23:51 +0100 Subject: [PATCH 2/4] Remove unused imports --- .../internal/async/pool/NettyChannelHealthCheckerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java index 1c7e47b3cb..8312d5fb8c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java @@ -56,12 +56,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import org.neo4j.driver.AuthTokenManager; import org.neo4j.driver.AuthTokens; import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; -import org.neo4j.driver.internal.handlers.PingResponseHandler; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; import org.neo4j.driver.internal.messaging.request.ResetMessage; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; From 148be835d2b098c337e3acf37f0a9d23a0f197b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Thu, 14 Dec 2023 17:00:09 +0100 Subject: [PATCH 3/4] apply code style --- .../internal/async/inbound/InboundMessageDispatcher.java | 2 +- .../internal/async/pool/NettyChannelHealthCheckerTest.java | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 53e151c734..6e296c83d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -165,7 +165,7 @@ public void handleIgnoredMessage() { } public HandlerHook getBeforeLastHandlerHook() { - return this.beforeLastHandlerHook; + return this.beforeLastHandlerHook; } private Optional getPendingResetHandler() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java index 8312d5fb8c..b99d7ff599 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java @@ -21,10 +21,10 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; @@ -355,7 +355,6 @@ private void testPing(boolean resetMessageSuccessful) { } } - private void testActiveConnectionCheck(boolean channelActive) { var settings = new PoolSettings( DEFAULT_MAX_CONNECTION_POOL_SIZE, From c2a5938465ebd0c7f8e14b06a8257bb8b57149c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Mon, 18 Dec 2023 10:57:49 +0100 Subject: [PATCH 4/4] Update driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java Co-authored-by: Dmitriy Tverdiakov <11927660+injectives@users.noreply.github.com> --- .../driver/internal/async/pool/NettyChannelHealthChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java index 09f093597d..af3b916a17 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java @@ -170,8 +170,8 @@ private Future ping(Channel channel) { Promise result = channel.eventLoop().newPromise(); var messageDispatcher = messageDispatcher(channel); messageDispatcher.enqueue(new PingResponseHandler(result, channel, logging)); - channel.writeAndFlush(ResetMessage.RESET, channel.voidPromise()); attachConnectionReadTimeoutHandler(channel, messageDispatcher); + channel.writeAndFlush(ResetMessage.RESET, channel.voidPromise()); return result; }