From b4452b5a8750d40c9bf1b4ae66b9d6039d93564d Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Mon, 13 Sep 2021 12:59:34 +0100 Subject: [PATCH] Fix reactive transaction function retry logic to retry on relevant resource cleanup failures This update fixes reactive transaction function retry logic and makes it retry when retryable errors occur during resource cleanup. --- .../retry/ExponentialBackoffRetryLogic.java | 6 +++++ .../ExponentialBackoffRetryLogicTest.java | 23 +++++++++++++++++++ .../backend/messages/requests/StartTest.java | 3 --- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index 8c42d4a772..cb9d7a7e26 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -40,6 +40,7 @@ import org.neo4j.driver.Logging; import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -180,6 +181,11 @@ private Retry exponentialBackoffRetryRx() contextView -> { Throwable throwable = retrySignal.failure(); + // Extract nested Neo4jException from not Neo4jException. Reactor usingWhen returns RuntimeException on async resource cleanup failure. + if ( throwable != null && !(throwable instanceof Neo4jException) && throwable.getCause() instanceof Neo4jException ) + { + throwable = throwable.getCause(); + } Throwable error = extractPossibleTerminationCause( throwable ); List errors = contextView.getOrDefault( "errors", null ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index d345daf75e..ba4817977b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -1008,6 +1008,29 @@ void doesRetryOnAuthorizationExpiredExceptionRx() assertEquals( "Done", result ); } + @Test + void doesRetryOnAsyncResourceCleanupRuntimeExceptionRx() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( any( Class.class ) ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = await( Mono.from( logic.retryRx( Mono.fromSupplier( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw new RuntimeException( "Async resource cleanup failed after", + authorizationExpiredException() ); + } + return "Done"; + } ) ) ) ); + + assertEquals( "Done", result ); + } + @Test void doesNotRetryOnRandomClientExceptionRx() { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 4e74590949..0468bde969 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -66,8 +66,6 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$", "Does not support partially consumed state" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function$", - "Commit failure leaks outside function" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$", "Rollback failures following commit failure" ); @@ -84,7 +82,6 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );