diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java index 2dd5851db9..64653904e3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java @@ -50,11 +50,6 @@ public CompletionStage runAsync(Query query) return tx.runAsync(query, true ); } - public void markTerminated() - { - tx.markTerminated(); - } - public boolean isOpen() { return tx.isOpen(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index 79f0717974..3aa8d933a6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -138,7 +138,7 @@ public CompletionStage resetAsync() { if ( tx != null ) { - tx.markTerminated(); + tx.markTerminated( null ); } } ) .thenCompose( ignore -> connectionStage ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index d821029f3e..a9c504cd58 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.async; +import java.util.EnumSet; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; @@ -58,13 +59,66 @@ private enum State ROLLED_BACK } + /** + * This is a holder so that we can have ony the state volatile in the tx without having to synchronize the whole block. + */ + private static final class StateHolder + { + private static final EnumSet OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED ); + private static final StateHolder ACTIVE_HOLDER = new StateHolder( State.ACTIVE, null ); + private static final StateHolder COMMITTED_HOLDER = new StateHolder( State.COMMITTED, null ); + private static final StateHolder ROLLED_BACK_HOLDER = new StateHolder( State.ROLLED_BACK, null ); + + /** + * The actual state. + */ + final State value; + + /** + * If this holder contains a state of {@link State#TERMINATED}, this represents the cause if any. + */ + final Throwable causeOfTermination; + + static StateHolder of( State value ) + { + switch ( value ) + { + case ACTIVE: + return ACTIVE_HOLDER; + case COMMITTED: + return COMMITTED_HOLDER; + case ROLLED_BACK: + return ROLLED_BACK_HOLDER; + case TERMINATED: + default: + throw new IllegalArgumentException( "Cannot provide a default state holder for state " + value ); + } + } + + static StateHolder terminatedWith( Throwable cause ) + { + return new StateHolder( State.TERMINATED, cause ); + } + + private StateHolder( State value, Throwable causeOfTermination ) + { + this.value = value; + this.causeOfTermination = causeOfTermination; + } + + boolean isOpen() + { + return OPEN_STATES.contains( this.value ); + } + } + private final Connection connection; private final BoltProtocol protocol; private final BookmarkHolder bookmarkHolder; private final ResultCursorsHolder resultCursors; private final long fetchSize; - private volatile State state = State.ACTIVE; + private volatile StateHolder state = StateHolder.of( State.ACTIVE ); public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize ) { @@ -104,11 +158,11 @@ public CompletionStage closeAsync() public CompletionStage commitAsync() { - if ( state == State.COMMITTED ) + if ( state.value == State.COMMITTED ) { return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) ); } - else if ( state == State.ROLLED_BACK ) + else if ( state.value == State.ROLLED_BACK ) { return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) ); } @@ -122,11 +176,11 @@ else if ( state == State.ROLLED_BACK ) public CompletionStage rollbackAsync() { - if ( state == State.COMMITTED ) + if ( state.value == State.COMMITTED ) { return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) ); } - else if ( state == State.ROLLED_BACK ) + else if ( state.value == State.ROLLED_BACK ) { return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) ); } @@ -158,12 +212,12 @@ public CompletionStage runRx(Query query) public boolean isOpen() { - return state != State.COMMITTED && state != State.ROLLED_BACK; + return state.isOpen(); } - public void markTerminated() + public void markTerminated( Throwable cause ) { - state = State.TERMINATED; + state = StateHolder.terminatedWith( cause ); } public Connection connection() @@ -173,34 +227,34 @@ public Connection connection() private void ensureCanRunQueries() { - if ( state == State.COMMITTED ) + if ( state.value == State.COMMITTED ) { throw new ClientException( "Cannot run more queries in this transaction, it has been committed" ); } - else if ( state == State.ROLLED_BACK ) + else if ( state.value == State.ROLLED_BACK ) { throw new ClientException( "Cannot run more queries in this transaction, it has been rolled back" ); } - else if ( state == State.TERMINATED ) + else if ( state.value == State.TERMINATED ) { throw new ClientException( "Cannot run more queries in this transaction, " + - "it has either experienced an fatal error or was explicitly terminated" ); + "it has either experienced an fatal error or was explicitly terminated", state.causeOfTermination ); } } private CompletionStage doCommitAsync() { - if ( state == State.TERMINATED ) + if ( state.value == State.TERMINATED ) { return failedFuture( new ClientException( "Transaction can't be committed. " + - "It has been rolled back either because of an error or explicit termination" ) ); + "It has been rolled back either because of an error or explicit termination", state.causeOfTermination ) ); } return protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark ); } private CompletionStage doRollbackAsync() { - if ( state == State.TERMINATED ) + if ( state.value == State.TERMINATED ) { return completedWithNull(); } @@ -224,11 +278,11 @@ private void transactionClosed( boolean isCommitted ) { if ( isCommitted ) { - state = State.COMMITTED; + state = StateHolder.of( State.COMMITTED ); } else { - state = State.ROLLED_BACK; + state = StateHolder.of( State.ROLLED_BACK ); } connection.release(); // release in background } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java index e5103ad76d..760b9b4b7a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java @@ -45,6 +45,6 @@ public void afterFailure( Throwable error ) // always mark transaction as terminated because every error is "acknowledged" with a RESET message // so database forgets about the transaction after the first error // such transaction should not attempt to commit and can be considered as rolled back - tx.markTerminated(); + tx.markTerminated( error ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index 307c5160ee..0d755a63d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -56,7 +56,7 @@ public RxResult run(Query query) // The logic here shall be the same as `TransactionPullResponseHandler#afterFailure` as that is where cursor handling failure // This is optional as tx still holds a reference to all cursor futures and they will be clean up properly in commit Throwable error = Futures.completionExceptionCause( completionError ); - tx.markTerminated(); + tx.markTerminated( error ); cursorFuture.completeExceptionally( error ); } } ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index da88e3e041..a02304e46a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -39,6 +39,7 @@ import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -100,8 +101,9 @@ public T retry( Supplier work ) { return work.get(); } - catch ( Throwable error ) + catch ( Throwable throwable ) { + Throwable error = extractPossibleTerminationCause( throwable ); if ( canRetryOn( error ) ) { long currentTime = clock.millis(); @@ -122,8 +124,10 @@ public T retry( Supplier work ) continue; } } - addSuppressed( error, errors ); - throw error; + + // Add the original error in case we didn't continue the loop from within the if above. + addSuppressed( throwable, errors ); + throw throwable; } } } @@ -144,54 +148,67 @@ public Publisher retryRx( Publisher work ) protected boolean canRetryOn( Throwable error ) { - return error instanceof SessionExpiredException || - error instanceof ServiceUnavailableException || - isTransientError( error ); + return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || isTransientError( error ); + } + + /** + * Extracts the possible cause of a transaction that has been marked terminated. + * + * @param error + * @return + */ + private static Throwable extractPossibleTerminationCause( Throwable error ) + { + + // Having a dedicated "TerminatedException" inheriting from ClientException might be a good idea. + if ( error instanceof ClientException && error.getCause() != null ) + { + return error.getCause(); + } + return error; } private Function,Publisher> retryRxCondition() { - return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> { - Throwable lastError = t2.getT1(); + return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> + { + + Throwable throwable = t2.getT1(); + Throwable error = extractPossibleTerminationCause( throwable ); + Context ctx = t2.getT2(); List errors = ctx.getOrDefault( "errors", null ); - long startTime = ctx.getOrDefault( "startTime", -1L ); + long startTime = ctx.getOrDefault( "startTime", -1L ); long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs ); - if( !canRetryOn( lastError ) ) + if ( canRetryOn( error ) ) { - addSuppressed( lastError, errors ); - return Mono.error( lastError ); - } - - long currentTime = clock.millis(); - if ( startTime == -1 ) - { - startTime = currentTime; - } + long currentTime = clock.millis(); + if ( startTime == -1 ) + { + startTime = currentTime; + } - long elapsedTime = currentTime - startTime; - if ( elapsedTime < maxRetryTimeMs ) - { - long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); - log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", lastError ); + long elapsedTime = currentTime - startTime; + if ( elapsedTime < maxRetryTimeMs ) + { + long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); + log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); - nextDelayMs = (long) (nextDelayMs * multiplier); - errors = recordError( lastError, errors ); + nextDelayMs = (long) (nextDelayMs * multiplier); + errors = recordError( error, errors ); - // retry on netty event loop thread - EventExecutor eventExecutor = eventExecutorGroup.next(); - return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ) - .delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); + // retry on netty event loop thread + EventExecutor eventExecutor = eventExecutorGroup.next(); + return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement( + Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); + } } - else - { - addSuppressed( lastError, errors ); + addSuppressed( throwable, errors ); - return Mono.error( lastError ); - } + return Mono.error( throwable ); } ); } @@ -249,9 +266,10 @@ private void executeWork( CompletableFuture resultFuture, Supplier void retryOnError( CompletableFuture resultFuture, Supplier> work, - long startTime, long retryDelayMs, Throwable error, List errors ) + private void retryOnError( CompletableFuture resultFuture, Supplier> work, long startTime, long retryDelayMs, Throwable throwable, + List errors ) { + Throwable error = extractPossibleTerminationCause( throwable ); if ( canRetryOn( error ) ) { long currentTime = clock.millis(); @@ -269,8 +287,8 @@ private void retryOnError( CompletableFuture resultFuture, Supplier result = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ); @@ -133,8 +143,7 @@ void shouldHandleAcquireReadTransaction() throws IOException, InterruptedExcepti Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) { - List result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) - .list( record -> record.get( "n.name" ).asString() ) ); + List result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ) ); assertThat( result, equalTo( asList( "Bob", "Alice", "Tina" ) ) ); } @@ -153,8 +162,7 @@ void shouldHandleAcquireReadSessionAndTransaction() throws IOException, Interrup StubServer readServer = stubController.startStub( "read_server_v3_read_tx.script", 9005 ); URI uri = URI.create( "neo4j://127.0.0.1:9001" ); try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); - Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); - Transaction tx = session.beginTransaction() ) + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); Transaction tx = session.beginTransaction() ) { List result = tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ); @@ -209,7 +217,8 @@ void shouldRoundRobinReadServersWhenUsingTransaction() throws IOException, Inter // Run twice, one on each read server for ( int i = 0; i < 2; i++ ) { - try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); Transaction tx = session.beginTransaction() ) + try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); + Transaction tx = session.beginTransaction() ) { assertThat( tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ), equalTo( asList( "Bob", "Alice", "Tina" ) ) ); @@ -234,7 +243,8 @@ void shouldThrowSessionExpiredIfReadServerDisappears() throws IOException, Inter URI uri = URI.create( "neo4j://127.0.0.1:9001" ); //Expect - assertThrows( SessionExpiredException.class, () -> { + assertThrows( SessionExpiredException.class, () -> + { try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) { @@ -256,7 +266,8 @@ void shouldThrowSessionExpiredIfReadServerDisappearsWhenUsingTransaction() throw URI uri = URI.create( "neo4j://127.0.0.1:9001" ); //Expect - SessionExpiredException e = assertThrows( SessionExpiredException.class, () -> { + SessionExpiredException e = assertThrows( SessionExpiredException.class, () -> + { try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); Transaction tx = session.beginTransaction() ) @@ -305,8 +316,7 @@ void shouldThrowSessionExpiredIfWriteServerDisappearsWhenUsingTransaction() thro URI uri = URI.create( "neo4j://127.0.0.1:9001" ); //Expect try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); - Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); - Transaction tx = session.beginTransaction() ) + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); Transaction tx = session.beginTransaction() ) { assertThrows( SessionExpiredException.class, () -> tx.run( "MATCH (n) RETURN n.name" ).consume() ); } @@ -364,8 +374,7 @@ void shouldHandleAcquireWriteSessionAndTransaction() throws IOException, Interru StubServer writeServer = stubController.startStub( "write_server_v3_write_tx.script", 9007 ); URI uri = URI.create( "neo4j://127.0.0.1:9001" ); try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); - Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); - Transaction tx = session.beginTransaction() ) + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); Transaction tx = session.beginTransaction() ) { tx.run( "CREATE (n {name:'Bob'})" ); tx.commit(); @@ -534,6 +543,113 @@ void shouldHandleLeaderSwitchWhenWritingInTransaction() throws IOException, Inte assertThat( server.exitStatus(), equalTo( 0 ) ); } + @Test + void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunction() throws IOException, InterruptedException + { + // Given + StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); + + // START a write server that fails on the first write attempt but then succeeds on the second + StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 ); + URI uri = URI.create( "neo4j://127.0.0.1:9001" ); + + Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() ); + List names; + + try ( Session session = driver.session( builder().withDatabase( "mydatabase" ).build() ) ) + { + names = session.writeTransaction( tx -> + { + tx.run( "RETURN 1" ); + try + { + Thread.sleep( 100 ); + } + catch ( InterruptedException ex ) + { + } + return tx.run( "MATCH (n) RETURN n.name" ).list( RoutingDriverBoltKitTest::extractNameField ); + } ); + } + + assertEquals( asList( "Foo", "Bar" ), names ); + + // Finally + driver.close(); + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + + @Test + void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionAsync() throws IOException, InterruptedException + { + // Given + StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); + + // START a write server that fails on the first write attempt but then succeeds on the second + StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 ); + URI uri = URI.create( "neo4j://127.0.0.1:9001" ); + + Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() ); + AsyncSession session = driver.asyncSession( builder().withDatabase( "mydatabase" ).build() ); + List names = Futures.blockingGet( session.writeTransactionAsync( + tx -> tx.runAsync( "RETURN 1" ) + .thenComposeAsync( ignored -> { + try + { + Thread.sleep( 100 ); + } + catch ( InterruptedException ex ) + { + } + return tx.runAsync( "MATCH (n) RETURN n.name" ); + } ) + .thenComposeAsync( cursor -> cursor.listAsync( RoutingDriverBoltKitTest::extractNameField ) ) ) ); + + assertEquals( asList( "Foo", "Bar" ), names ); + + // Finally + driver.close(); + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + + private static String extractNameField(Record record) + { + return record.get( 0 ).asString(); + } + + // This does not exactly reproduce the async and blocking versions above, as we don't have any means of ignoring + // the flux of the RETURN 1 query (not pulling the result) like we do in above, so this is "just" a test for + // a leader going away during the execution of a flux. + @Test + void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionRX() throws IOException, InterruptedException + { + // Given + StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); + + // START a write server that fails on the first write attempt but then succeeds on the second + StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries_rx.script", 9007 ); + URI uri = URI.create( "neo4j://127.0.0.1:9001" ); + + Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() ); + + Flux fluxOfNames = Flux.usingWhen( Mono.fromSupplier( () -> driver.rxSession( builder().withDatabase( "mydatabase" ).build() ) ), + session -> session.writeTransaction( tx -> + { + RxResult result = tx.run( "RETURN 1" ); + return Flux.from( result.records() ).limitRate( 100 ).thenMany( tx.run( "MATCH (n) RETURN n.name" ).records() ).limitRate( 100 ).map( + RoutingDriverBoltKitTest::extractNameField ); + } ), RxSession::close ); + + StepVerifier.create( fluxOfNames ).expectNext( "Foo", "Bar" ).verifyComplete(); + + // Finally + driver.close(); + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + @Test void shouldSendInitialBookmark() throws Exception { @@ -696,8 +812,7 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc Logger logger = mock( Logger.class ); Config config = insecureBuilder().withLogging( ignored -> logger ).build(); - try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); - Session session = driver.session() ) + try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); Session session = driver.session() ) { AtomicInteger invocations = new AtomicInteger(); List records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ); @@ -730,8 +845,7 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemovedV3() throws E Logger logger = mock( Logger.class ); Config config = insecureBuilder().withLogging( ignored -> logger ).build(); - try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); - Session session = driver.session() ) + try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); Session session = driver.session() ) { AtomicInteger invocations = new AtomicInteger(); List records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ); @@ -757,8 +871,7 @@ void shouldRetryReadTransactionUntilFailure() throws Exception StubServer brokenReader1 = stubController.startStub( "dead_read_server_tx.script", 9005 ); StubServer brokenReader2 = stubController.startStub( "dead_read_server_tx.script", 9006 ); - try ( Driver driver = newDriverWithFixedRetries( "neo4j://127.0.0.1:9001", 1 ); - Session session = driver.session() ) + try ( Driver driver = newDriverWithFixedRetries( "neo4j://127.0.0.1:9001", 1 ); Session session = driver.session() ) { AtomicInteger invocations = new AtomicInteger(); assertThrows( SessionExpiredException.class, () -> session.readTransaction( queryWork( "MATCH (n) RETURN n.name", invocations ) ) ); @@ -958,8 +1071,7 @@ void shouldServeReadsButFailWritesWhenNoWritersAvailable() throws Exception StubServer router2 = stubController.startStub( "discover_no_writers_9010.script", 9004 ); StubServer reader = stubController.startStub( "read_server_v3_read_tx.script", 9003 ); - try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9010", INSECURE_CONFIG ); - Session session = driver.session() ) + try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9010", INSECURE_CONFIG ); Session session = driver.session() ) { assertEquals( asList( "Bob", "Alice", "Tina" ), readStrings( "MATCH (n) RETURN n.name", session ) ); @@ -1042,10 +1154,9 @@ void shouldSendMultipleBookmarks() throws Exception StubServer router = stubController.startStub( "acquire_endpoints_v3.script", 9001 ); StubServer writer = stubController.startStub( "multiple_bookmarks.script", 9007 ); - try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9001", INSECURE_CONFIG ); - Session session = driver.session( builder().withBookmarks( InternalBookmark.parse( - asOrderedSet( "neo4j:bookmark:v1:tx5", "neo4j:bookmark:v1:tx29", "neo4j:bookmark:v1:tx94", "neo4j:bookmark:v1:tx56", - "neo4j:bookmark:v1:tx16", "neo4j:bookmark:v1:tx68" ) ) ).build() ) ) + try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9001", INSECURE_CONFIG ); Session session = driver.session( builder().withBookmarks( + InternalBookmark.parse( asOrderedSet( "neo4j:bookmark:v1:tx5", "neo4j:bookmark:v1:tx29", "neo4j:bookmark:v1:tx94", "neo4j:bookmark:v1:tx56", + "neo4j:bookmark:v1:tx16", "neo4j:bookmark:v1:tx68" ) ) ).build() ) ) { try ( Transaction tx = session.beginTransaction() ) { @@ -1117,7 +1228,8 @@ void shouldUseResolverDuringRediscoveryWhenExistingRoutersFail() throws Exceptio StubServer reader = stubController.startStub( "read_server_v3_read_tx.script", 9005 ); AtomicBoolean resolverInvoked = new AtomicBoolean(); - ServerAddressResolver resolver = address -> { + ServerAddressResolver resolver = address -> + { if ( resolverInvoked.compareAndSet( false, true ) ) { // return the address first time @@ -1139,13 +1251,11 @@ void shouldUseResolverDuringRediscoveryWhenExistingRoutersFail() throws Exceptio try ( Session session = driver.session() ) { // run first query against 9001, which should return result and exit - List names1 = session.run( "MATCH (n) RETURN n.name AS name" ) - .list( record -> record.get( "name" ).asString() ); + List names1 = session.run( "MATCH (n) RETURN n.name AS name" ).list( record -> record.get( "name" ).asString() ); assertEquals( asList( "Alice", "Bob", "Eve" ), names1 ); // run second query with retries, it should rediscover using 9042 returned by the resolver and read from 9005 - List names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) - .list( record -> record.get( 0 ).asString() ) ); + List names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ).list( RoutingDriverBoltKitTest::extractNameField ) ); assertEquals( asList( "Bob", "Alice", "Tina" ), names2 ); } } @@ -1187,7 +1297,8 @@ void useSessionAfterDriverIsClosed() throws Exception @Test void shouldRevertToInitialRouterIfKnownRouterThrowsProtocolErrors() throws Exception { - ServerAddressResolver resolver = a -> { + ServerAddressResolver resolver = a -> + { SortedSet addresses = new TreeSet<>( new PortBasedServerAddressComparator() ); addresses.add( ServerAddress.of( "127.0.0.1", 9001 ) ); addresses.add( ServerAddress.of( "127.0.0.1", 9003 ) ); @@ -1273,7 +1384,8 @@ private static Driver newDriver( String uriString, DriverFactory driverFactory, private static TransactionWork> queryWork( final String query, final AtomicInteger invocations ) { - return tx -> { + return tx -> + { invocations.incrementAndGet(); return tx.run( query ).list(); }; @@ -1281,7 +1393,8 @@ private static TransactionWork> queryWork( final String query, fina private static List readStrings( final String query, Session session ) { - return session.readTransaction( tx -> { + return session.readTransaction( tx -> + { List records = tx.run( query ).list(); List names = new ArrayList<>( records.size() ); for ( Record record : records ) diff --git a/driver/src/test/java/org/neo4j/driver/integration/UnmanagedTransactionIT.java b/driver/src/test/java/org/neo4j/driver/integration/UnmanagedTransactionIT.java index 98bfb49042..175bede1a3 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/UnmanagedTransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/UnmanagedTransactionIT.java @@ -128,7 +128,7 @@ void shouldFailToCommitAfterTermination() { UnmanagedTransaction tx = beginTransaction(); - tx.markTerminated(); + tx.markTerminated( null ); ClientException e = assertThrows( ClientException.class, () -> await( tx.commitAsync() ) ); assertThat( e.getMessage(), startsWith( "Transaction can't be committed" ) ); @@ -162,7 +162,7 @@ void shouldRollbackAfterTermination() { UnmanagedTransaction tx = beginTransaction(); - tx.markTerminated(); + tx.markTerminated( null ); assertNull( await( tx.rollbackAsync() ) ); assertFalse( tx.isOpen() ); @@ -173,7 +173,7 @@ void shouldFailToRunQueryWhenTerminated() { UnmanagedTransaction tx = beginTransaction(); txRun( tx, "CREATE (:MyLabel)" ); - tx.markTerminated(); + tx.markTerminated( null ); ClientException e = assertThrows( ClientException.class, () -> txRun( tx, "CREATE (:MyOtherLabel)" ) ); assertThat( e.getMessage(), startsWith( "Cannot run more queries in this transaction" ) ); @@ -183,7 +183,7 @@ void shouldFailToRunQueryWhenTerminated() void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated() { UnmanagedTransaction tx1 = beginTransaction(); - tx1.markTerminated(); + tx1.markTerminated( null ); // commit should fail, make session forget about this transaction and release the connection to the pool ClientException e = assertThrows( ClientException.class, () -> await( tx1.commitAsync() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java index 8404d1682f..e30f0d27bd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.summary.ResultSummary; @@ -64,6 +65,7 @@ class InternalAsyncTransactionTest { private Connection connection; + private NetworkSession networkSession; private InternalAsyncTransaction tx; @BeforeEach @@ -73,7 +75,8 @@ void setUp() ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( ConnectionContext.class ) ) ) .thenReturn( completedFuture( connection ) ); - InternalAsyncSession session = new InternalAsyncSession( newSession( connectionProvider ) ); + networkSession = newSession(connectionProvider); + InternalAsyncSession session = new InternalAsyncSession(networkSession); tx = (InternalAsyncTransaction) await( session.beginTransactionAsync() ); } @@ -91,7 +94,7 @@ private static Stream>> @ParameterizedTest @MethodSource( "allSessionRunMethods" ) - void shouldFlushOnRun( Function> runReturnOne ) throws Throwable + void shouldFlushOnRun( Function> runReturnOne ) { setupSuccessfulRunAndPull( connection ); @@ -102,7 +105,7 @@ void shouldFlushOnRun( Function> } @Test - void shouldCommit() throws Throwable + void shouldCommit() { await( tx.commitAsync() ); @@ -112,7 +115,7 @@ void shouldCommit() throws Throwable } @Test - void shouldRollback() throws Throwable + void shouldRollback() { await( tx.rollbackAsync() ); @@ -122,9 +125,9 @@ void shouldRollback() throws Throwable } @Test - void shouldRollbackWhenFailedRun() throws Throwable + void shouldRollbackWhenFailedRun() { - tx.markTerminated(); + Futures.blockingGet( networkSession.resetAsync() ); ClientException clientException = assertThrows( ClientException.class, () -> await( tx.commitAsync() ) ); assertThat( clientException.getMessage(), containsString( "It has been rolled back either because of an error or explicit termination" ) ); @@ -133,7 +136,7 @@ void shouldRollbackWhenFailedRun() throws Throwable } @Test - void shouldReleaseConnectionWhenFailedToCommit() throws Throwable + void shouldReleaseConnectionWhenFailedToCommit() { setupFailingCommit( connection ); assertThrows( Exception.class, () -> await( tx.commitAsync() ) ); @@ -143,7 +146,7 @@ void shouldReleaseConnectionWhenFailedToCommit() throws Throwable } @Test - void shouldReleaseConnectionWhenFailedToRollback() throws Throwable + void shouldReleaseConnectionWhenFailedToRollback() { setupFailingRollback( connection ); assertThrows( Exception.class, () -> await( tx.rollbackAsync() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index a1b6249ae4..5ec591906c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -144,7 +144,7 @@ void shouldBeClosedWhenMarkedAsTerminated() { UnmanagedTransaction tx = beginTx( connectionMock() ); - tx.markTerminated(); + tx.markTerminated( null ); assertTrue( tx.isOpen() ); } @@ -154,7 +154,7 @@ void shouldBeClosedWhenMarkedTerminatedAndClosed() { UnmanagedTransaction tx = beginTx( connectionMock() ); - tx.markTerminated(); + tx.markTerminated( null ); await( tx.closeAsync() ); assertFalse( tx.isOpen() ); @@ -196,7 +196,7 @@ void shouldReleaseConnectionWhenTerminatedAndCommitted() Connection connection = connectionMock(); UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); - tx.markTerminated(); + tx.markTerminated( null ); assertThrows( ClientException.class, () -> await( tx.commitAsync() ) ); @@ -210,7 +210,7 @@ void shouldReleaseConnectionWhenTerminatedAndRolledBack() Connection connection = connectionMock(); UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); - tx.markTerminated(); + tx.markTerminated( null ); await( tx.rollbackAsync() ); verify( connection ).release(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java index ff4618371f..910c146e89 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java @@ -73,6 +73,6 @@ private static void testErrorHandling( Throwable error ) handler.onFailure( error ); - verify( tx ).markTerminated(); + verify( tx ).markTerminated( error ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java index c6306aa5ef..9b793f40e4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java @@ -73,7 +73,7 @@ protected void shouldHandleFailure( BasicPullResponseHandler.State state ) // Then assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.FAILURE_STATE ) ); - verify( tx ).markTerminated(); + verify( tx ).markTerminated( error ); verify( recordConsumer ).accept( null, error ); verify( summaryConsumer ).accept( any( ResultSummary.class ), eq( error ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index 99fc715513..656a8dddb8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -135,6 +135,6 @@ void shouldMarkTxIfFailedToRun( Function runReturnOne ) verify( tx ).runRx( any( Query.class ) ); RuntimeException t = assertThrows( CompletionException.class, () -> Futures.getNow( cursorFuture ) ); assertThat( t.getCause(), equalTo( error ) ); - verify( tx ).markTerminated(); + verify( tx ).markTerminated( error ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index 513ac2a9b1..5b5f26d325 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.retry; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +43,7 @@ import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -762,6 +765,50 @@ void doesNotCollectSuppressedErrorsWhenSameErrorIsThrownRx() assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); } + @Test + void doesRetryOnClientExceptionWithRetryableCause() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = logic.retry( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw clientExceptionWithValidTerminationCause(); + } + return "Done"; + } ); + + assertEquals( "Done", result ); + } + + @Test + void doesNotRetryOnRandomClientException() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + ClientException exception = Assertions.assertThrows( ClientException.class, () -> logic.retry( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw randomClientException(); + } + return "Done"; + } ) ); + + assertEquals( "Meeh", exception.getMessage() ); + } + @Test void eachRetryIsLogged() { @@ -781,6 +828,52 @@ void eachRetryIsLogged() ); } + @Test + void doesRetryOnClientExceptionWithRetryableCauseAsync() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = await( logic.retryAsync( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw clientExceptionWithValidTerminationCause(); + } + return CompletableFuture.completedFuture( "Done" ); + } ) ); + + assertEquals( "Done", result ); + } + + @Test + void doesNotRetryOnRandomClientExceptionAsync() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + ClientException exception = Assertions.assertThrows( ClientException.class, () -> await( logic.retryAsync( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw randomClientException(); + } + return CompletableFuture.completedFuture( "Done" ); + } ) ) ); + + assertEquals( "Meeh", exception.getMessage() ); + } + @Test void eachRetryIsLoggedAsync() { @@ -802,6 +895,52 @@ void eachRetryIsLoggedAsync() ); } + @Test + void doesRetryOnClientExceptionWithRetryableCauseRx() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = await( Mono.from( logic.retryRx( Mono.fromSupplier( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw clientExceptionWithValidTerminationCause(); + } + return "Done"; + } ) ) ) ); + + assertEquals( "Done", result ); + } + + @Test + void doesNotRetryOnRandomClientExceptionRx() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + ClientException exception = Assertions.assertThrows( ClientException.class, () -> await( Mono.from( logic.retryRx( Mono.fromSupplier( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw randomClientException(); + } + return "Done"; + } ) ) ) ) ); + + assertEquals( "Meeh", exception.getMessage() ); + } + @Test void eachRetryIsLoggedRx() { @@ -1111,6 +1250,16 @@ private static ServiceUnavailableException serviceUnavailable() return new ServiceUnavailableException( "" ); } + private static RuntimeException clientExceptionWithValidTerminationCause() + { + return new ClientException( "¯\\_(ツ)_/¯", serviceUnavailable() ); + } + + private static RuntimeException randomClientException() + { + return new ClientException( "Meeh" ); + } + private static SessionExpiredException sessionExpired() { return new SessionExpiredException( "" ); diff --git a/driver/src/test/resources/acquire_endpoints_twice_v4.script b/driver/src/test/resources/acquire_endpoints_twice_v4.script new file mode 100644 index 0000000000..f92363e573 --- /dev/null +++ b/driver/src/test/resources/acquire_endpoints_twice_v4.script @@ -0,0 +1,16 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO HELLO +!: AUTO GOODBYE + +C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": { "address": "127.0.0.1:9001"}, "database": "mydatabase"} {"mode": "r", "db": "system"} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": { "address": "127.0.0.1:9001"}, "database": "mydatabase"} {"mode": "r", "db": "system"} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} + diff --git a/driver/src/test/resources/not_able_to_write_server_tx_func_retries.script b/driver/src/test/resources/not_able_to_write_server_tx_func_retries.script new file mode 100644 index 0000000000..447bcbf7fb --- /dev/null +++ b/driver/src/test/resources/not_able_to_write_server_tx_func_retries.script @@ -0,0 +1,25 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO BEGIN +!: AUTO HELLO +!: AUTO GOODBYE +!: AUTO ROLLBACK + +C: RUN "RETURN 1" {} {} + PULL {"n": 1000} +S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"} + IGNORED +C: RUN "RETURN 1" {} {} + PULL {"n": 1000} +S: SUCCESS {"fields": ["1"]} + RECORD [1] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} {} + PULL {"n": 1000} +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Foo"] + RECORD ["Bar"] + SUCCESS {} +C: COMMIT +S: SUCCESS {"bookmark": "NewBookmark"} + diff --git a/driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script b/driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script new file mode 100644 index 0000000000..b2d651000c --- /dev/null +++ b/driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script @@ -0,0 +1,25 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO BEGIN +!: AUTO HELLO +!: AUTO GOODBYE +!: AUTO ROLLBACK + +C: RUN "RETURN 1" {} {} +S: SUCCESS {"fields": ["1"]} +C: PULL {"n": 100} +S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"} +C: RUN "RETURN 1" {} {} +S: SUCCESS {"fields": ["1"]} +C: PULL {"n": 100} +S: RECORD [1] + SUCCESS {"has_more": false} +C: RUN "MATCH (n) RETURN n.name" {} {} +S: SUCCESS {"fields": ["n.name"]} +C: PULL {"n": 100} +S: RECORD ["Foo"] + RECORD ["Bar"] + SUCCESS {"has_more": false} +C: COMMIT +S: SUCCESS {"bookmark": "NewBookmark"} +