Skip to content

Commit

Permalink
Update depedencies (#1070)
Browse files Browse the repository at this point in the history
* Update reactor-bom to 2020.0.8 (#947)

* Update reactor-bom to 2020.0.8

* Polishing.

* Revert list kind change

Co-authored-by: Michael Simons <michael.simons@neo4j.com>

* Update depedencies

Co-authored-by: Michael Simons <michael.simons@neo4j.com>
  • Loading branch information
injectives and michael-simons authored Nov 11, 2021
1 parent e1a6095 commit 91957c2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -34,7 +34,6 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import org.neo4j.driver.Logger;
Expand Down Expand Up @@ -144,7 +143,7 @@ public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
@Override
public <T> Publisher<T> retryRx( Publisher<T> work )
{
return Flux.from( work ).retryWhen( retryRxCondition() );
return Flux.from( work ).retryWhen( exponentialBackoffRetryRx() );
}

protected boolean canRetryOn( Throwable error )
Expand Down Expand Up @@ -175,48 +174,46 @@ private static Throwable extractPossibleTerminationCause( Throwable error )
return error;
}

private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition()
private Retry exponentialBackoffRetryRx()
{
return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 ->
{

Throwable throwable = t2.getT1();
Throwable error = extractPossibleTerminationCause( throwable );

Context ctx = t2.getT2();

List<Throwable> errors = ctx.getOrDefault( "errors", null );

long startTime = ctx.getOrDefault( "startTime", -1L );
long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs );

if ( canRetryOn( error ) )
{
long currentTime = clock.millis();
if ( startTime == -1 )
return Retry.from( retrySignals -> retrySignals.flatMap( retrySignal -> Mono.deferContextual(
contextView ->
{
startTime = currentTime;
}
Throwable throwable = retrySignal.failure();
Throwable error = extractPossibleTerminationCause( throwable );

long elapsedTime = currentTime - startTime;
if ( elapsedTime < maxRetryTimeMs )
{
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );

nextDelayMs = (long) (nextDelayMs * multiplier);
errors = recordError( error, errors );
List<Throwable> errors = contextView.getOrDefault( "errors", null );

// retry on netty event loop thread
EventExecutor eventExecutor = eventExecutorGroup.next();
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement(
Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
}
}
addSuppressed( throwable, errors );
if ( canRetryOn( error ) )
{
long currentTime = clock.millis();

long startTime = contextView.getOrDefault( "startTime", currentTime );
long nextDelayMs = contextView.getOrDefault( "nextDelayMs", initialRetryDelayMs );

long elapsedTime = currentTime - startTime;
if ( elapsedTime < maxRetryTimeMs )
{
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );

nextDelayMs = (long) (nextDelayMs * multiplier);
errors = recordError( error, errors );

// retry on netty event loop thread
EventExecutor eventExecutor = eventExecutorGroup.next();
Context context = Context.of(
"errors", errors,
"startTime", startTime,
"nextDelayMs", nextDelayMs
);
return Mono.just( context ).delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
}
}
addSuppressed( throwable, errors );

return Mono.error( throwable );
} );
return Mono.error( throwable );
} ) ) );
}

private <T> void executeWorkInEventLoop( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work )
Expand Down
18 changes: 9 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@
<!-- Please note that when updating this dependency -->
<!-- (i.e. due to a security vulnerability or bug) that the -->
<!-- corresponding server dependency also needs updating.-->
<netty-handler.version>4.1.65.Final</netty-handler.version>
<netty-handler.version>4.1.70.Final</netty-handler.version>
<!-- Please note that when updating this dependency -->
<!-- (i.e. due to a security vulnerability or bug) that the -->
<!-- corresponding server dependency also needs updating.-->
<reactor-bom.version>Dysprosium-SR21</reactor-bom.version>
<reactor-bom.version>2020.0.13</reactor-bom.version>
<rxjava.version>2.2.21</rxjava.version>
<slf4j-api.version>1.7.31</slf4j-api.version>
<slf4j-api.version>1.7.32</slf4j-api.version>
<hamcrest-junit.version>2.0.0.0</hamcrest-junit.version>
<mockito-core.version>2.28.2</mockito-core.version>
<junit.version>5.7.2</junit.version>
<jarchivelib.version>1.1.0</jarchivelib.version>
<junit.version>5.8.1</junit.version>
<jarchivelib.version>1.2.0</jarchivelib.version>
<bouncycastle-jdk15on.version>1.69</bouncycastle-jdk15on.version>
<logback-classic.version>1.2.3</logback-classic.version>
<jackson.version>2.12.3</jackson.version>
<lombok.version>1.18.20</lombok.version>
<svm.version>20.3.2</svm.version>
<logback-classic.version>1.2.6</logback-classic.version>
<jackson.version>2.13.0</jackson.version>
<lombok.version>1.18.22</lombok.version>
<svm.version>20.3.4</svm.version>
</properties>

<modules>
Expand Down

0 comments on commit 91957c2

Please sign in to comment.