diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 5636aa9128..89fb382c15 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -229,6 +229,18 @@ public void unMuteAckFailure() ackFailureMuted = false; } + /** + * Check if ACK_FAILURE is muted. + *

+ * This method is not thread-safe and should only be executed by the event loop thread. + * + * @return {@code true} if ACK_FAILURE has been muted via {@link #muteAckFailure()}, {@code false} otherwise. + */ + public boolean isAckFailureMuted() + { + return ackFailureMuted; + } + private void ackFailureIfNeeded() { if ( !ackFailureMuted ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandler.java index 33e5910612..0c0b5297e0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandler.java @@ -23,6 +23,7 @@ import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; public class AckFailureResponseHandler implements ResponseHandler { @@ -42,10 +43,21 @@ public void onSuccess( Map metadata ) @Override public void onFailure( Throwable error ) { + if ( messageDispatcher.isAckFailureMuted() ) + { + // RESET cancelled this ACK_FAILURE and made the database send an IGNORED message + // this is not a protocol violation and database has all the connection stated cleared now + messageDispatcher.clearCurrentError(); + } + else + { + throw new ClientException( "Unable to acknowledge the previous error. Connection will be closed", error ); + } } @Override public void onRecord( Value[] fields ) { + throw new UnsupportedOperationException(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java index b5a707579b..b3a16be608 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java @@ -45,9 +45,18 @@ public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool, } @Override - protected void resetCompleted( CompletableFuture completionFuture ) + protected void resetCompleted( CompletableFuture completionFuture, boolean success ) { - setLastUsedTimestamp( channel, clock.millis() ); + if ( success ) + { + // update the last-used timestamp before returning the channel back to the pool + setLastUsedTimestamp( channel, clock.millis() ); + } + else + { + // close the channel before returning it back to the pool if RESET failed + channel.close(); + } Future released = pool.release( channel ); released.addListener( ignore -> completionFuture.complete( null ) ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index 838174b01b..240e6daec3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -39,13 +39,13 @@ public ResetResponseHandler( InboundMessageDispatcher messageDispatcher, Complet @Override public final void onSuccess( Map metadata ) { - resetCompleted(); + resetCompleted( true ); } @Override public final void onFailure( Throwable error ) { - resetCompleted(); + resetCompleted( false ); } @Override @@ -54,13 +54,13 @@ public final void onRecord( Value[] fields ) throw new UnsupportedOperationException(); } - private void resetCompleted() + private void resetCompleted( boolean success ) { messageDispatcher.unMuteAckFailure(); - resetCompleted( completionFuture ); + resetCompleted( completionFuture, success ); } - protected void resetCompleted( CompletableFuture completionFuture ) + protected void resetCompleted( CompletableFuture completionFuture, boolean success ) { completionFuture.complete( null ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java index b7832c65d4..afe6150b80 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java @@ -36,8 +36,10 @@ import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -428,6 +430,19 @@ public void shouldNotSupportAckFailureMessage() } } + @Test + public void shouldMuteAndUnMuteAckFailure() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + assertFalse( dispatcher.isAckFailureMuted() ); + + dispatcher.muteAckFailure(); + assertTrue( dispatcher.isAckFailureMuted() ); + + dispatcher.unMuteAckFailure(); + assertFalse( dispatcher.isAckFailureMuted() ); + } + private static void verifyFailure( ResponseHandler handler ) { ArgumentCaptor captor = ArgumentCaptor.forClass( Neo4jException.class ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandlerTest.java new file mode 100644 index 0000000000..90c327705d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandlerTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import org.junit.Test; + +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; + +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AckFailureResponseHandlerTest +{ + private final InboundMessageDispatcher dispatcher = mock( InboundMessageDispatcher.class ); + private final AckFailureResponseHandler handler = new AckFailureResponseHandler( dispatcher ); + + @Test + public void shouldClearCurrentErrorOnSuccess() + { + verify( dispatcher, never() ).clearCurrentError(); + handler.onSuccess( emptyMap() ); + verify( dispatcher ).clearCurrentError(); + } + + @Test + public void shouldThrowOnFailure() + { + RuntimeException error = new RuntimeException( "Unable to process ACK_FAILURE" ); + + try + { + handler.onFailure( error ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertSame( error, e.getCause() ); + } + } + + @Test + public void shouldClearCurrentErrorWhenAckFailureMutedAndFailureReceived() + { + RuntimeException error = new RuntimeException( "Some error" ); + when( dispatcher.isAckFailureMuted() ).thenReturn( true ); + + handler.onFailure( error ); + + verify( dispatcher ).clearCurrentError(); + } + + @Test + public void shouldThrowOnRecord() + { + try + { + handler.onRecord( new Value[0] ); + fail( "Exception expected" ); + } + catch ( UnsupportedOperationException ignore ) + { + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java index ce39f3e9b9..ba5bdbf10a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java @@ -71,7 +71,7 @@ public void shouldReleaseChannelOnSuccess() } @Test - public void shouldReleaseChannelOnFailure() + public void shouldCloseAndReleaseChannelOnFailure() { ChannelPool pool = newChannelPoolMock(); FakeClock clock = new FakeClock(); @@ -81,7 +81,7 @@ public void shouldReleaseChannelOnFailure() handler.onFailure( new RuntimeException() ); - verifyLastUsedTimestamp( 100 ); + assertTrue( channel.closeFuture().isDone() ); verify( pool ).release( eq( channel ) ); assertTrue( releaseFuture.isDone() ); assertFalse( releaseFuture.isCompletedExceptionally() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java index a72ef502c3..a00555ebd1 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java @@ -19,12 +19,14 @@ package org.neo4j.driver.v1.integration; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.mockito.Mockito; +import java.net.URI; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -41,8 +43,10 @@ import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.AuthToken; +import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Logging; @@ -53,11 +57,14 @@ import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.util.StubServer; import org.neo4j.driver.v1.util.TestNeo4j; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -65,6 +72,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.v1.Config.defaultConfig; import static org.neo4j.driver.v1.Values.parameters; @@ -280,6 +288,36 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC verify( connection2 ).release(); } + @Test + public void shouldCloseChannelWhenResetFails() throws Exception + { + StubServer server = StubServer.start( "reset_error.script", 9001 ); + try + { + URI uri = URI.create( "bolt://localhost:9001" ); + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).withoutEncryption().toConfig(); + ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM ); + + try ( Driver driver = driverFactory.newInstance( uri, AuthTokens.none(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) ) + { + try ( Session session = driver.session() ) + { + assertEquals( 42, session.run( "RETURN 42 AS answer" ).single().get( 0 ).asInt() ); + } + + List channels = driverFactory.pollChannels(); + // there should be a single channel + assertEquals( 1, channels.size() ); + // and it should be closed because it failed to RESET + assertNull( channels.get( 0 ).closeFuture().get( 30, SECONDS ) ); + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + private StatementResult createNodesInNewSession( int nodesToCreate ) { return createNodes( nodesToCreate, driver.session() ); diff --git a/driver/src/test/resources/reset_error.script b/driver/src/test/resources/reset_error.script new file mode 100644 index 0000000000..d806673658 --- /dev/null +++ b/driver/src/test/resources/reset_error.script @@ -0,0 +1,11 @@ +!: AUTO INIT + +C: RESET +S: SUCCESS {} +C: RUN "RETURN 42 AS answer" {} + PULL_ALL +S: SUCCESS {"fields": ["answer"]} + RECORD [42] + SUCCESS {} +C: RESET +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to reset"}