Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public class ExponentialBackoffRetryLogic implements RetryLogic {
public static final long DEFAULT_MAX_RETRY_TIME_MS = SECONDS.toMillis(30);

private static final long INITIAL_RETRY_DELAY_MS = SECONDS.toMillis(1);
private static final double RETRY_DELAY_MULTIPLIER = 1;
private static final double RETRY_DELAY_MULTIPLIER = 2.0;
private static final double RETRY_DELAY_JITTER_FACTOR = 0.2;
private static final long MAX_RETRY_DELAY = Long.MAX_VALUE / 2;

private final long maxRetryTimeMs;
private final long initialRetryDelayMs;
private final double multiplier;
private final double jitterFactor;
final long initialRetryDelayMs;
final double multiplier;
final double jitterFactor;
private final EventExecutorGroup eventExecutorGroup;
private final Clock clock;
private final SleepTask sleepTask;
Expand Down Expand Up @@ -325,8 +325,8 @@ private void verifyAfterConstruction() {
if (initialRetryDelayMs < 0) {
throw new IllegalArgumentException("Initial retry delay should >= 0: " + initialRetryDelayMs);
}
if (multiplier < 1.0) {
throw new IllegalArgumentException("Multiplier should be >= 1.0: " + multiplier);
if (multiplier < 2.0) {
throw new IllegalArgumentException("Multiplier should be >= 2.0: " + multiplier);
}
if (jitterFactor < 0 || jitterFactor > 1) {
throw new IllegalArgumentException("Jitter factor should be in [0.0, 1.0]: " + jitterFactor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.lang.Long.MAX_VALUE;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.closeTo;
Expand Down Expand Up @@ -97,29 +98,40 @@ void throwsForIllegalInitialRetryDelay() {
@Test
void throwsForIllegalMultiplier() {
var error = assertThrows(
IllegalArgumentException.class, () -> newRetryLogic(1, 1, 0.42, 1, Clock.systemUTC(), (ignored) -> {}));
IllegalArgumentException.class, () -> newRetryLogic(1, 1, 1.99, 1, Clock.systemUTC(), (ignored) -> {}));
assertThat(error.getMessage(), containsString("Multiplier"));
}

@Test
void throwsForIllegalJitterFactor() {
var error1 = assertThrows(
IllegalArgumentException.class,
() -> newRetryLogic(1, 1, 1, -0.42, Clock.systemUTC(), (ignored) -> {}));
() -> newRetryLogic(1, 1, 2, -0.42, Clock.systemUTC(), (ignored) -> {}));
assertThat(error1.getMessage(), containsString("Jitter"));

var error2 = assertThrows(
IllegalArgumentException.class, () -> newRetryLogic(1, 1, 1, 1.42, Clock.systemUTC(), (ignored) -> {}));
IllegalArgumentException.class, () -> newRetryLogic(1, 1, 2, 1.42, Clock.systemUTC(), (ignored) -> {}));
assertThat(error2.getMessage(), containsString("Jitter"));
}

@Test
void throwsForIllegalClock() {
var error =
assertThrows(IllegalArgumentException.class, () -> newRetryLogic(1, 1, 1, 1, null, (ignored) -> {}));
assertThrows(IllegalArgumentException.class, () -> newRetryLogic(1, 1, 2, 1, null, (ignored) -> {}));
assertThat(error.getMessage(), containsString("Clock"));
}

@Test
void shouldInitialiseWithExpectedDefaultValues() {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = new ExponentialBackoffRetryLogic(MAX_VALUE, eventExecutor, clock, DEV_NULL_LOGGING, sleepTask);

assertEquals(SECONDS.toMillis(1), logic.initialRetryDelayMs);
assertEquals(2.0, logic.multiplier);
assertEquals(0.2, logic.jitterFactor);
}

@Test
void nextDelayCalculatedAccordingToMultiplier() throws Exception {
var retries = 27;
Expand Down Expand Up @@ -318,7 +330,7 @@ void doesNotRetryWhenMaxRetryTimeExceededRx() {
void sleepsOnServiceUnavailableException() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = newRetryLogic(1, 42, 1, 0, clock, sleepTask);
var logic = newRetryLogic(1, 42, 2, 0, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
var error = serviceUnavailable();
Expand All @@ -335,7 +347,7 @@ void schedulesRetryOnServiceUnavailableExceptionAsync() {
var result = "The Result";
var clock = mock(Clock.class);

var retryLogic = newRetryLogic(1, 42, 1, 0, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 42, 2, 0, clock, (ignored) -> {});

Supplier<CompletionStage<Object>> workMock = newWorkMock();
var error = serviceUnavailable();
Expand All @@ -353,7 +365,7 @@ void schedulesRetryOnServiceUnavailableExceptionAsync() {
void sleepsOnSessionExpiredException() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = newRetryLogic(1, 4242, 1, 0, clock, sleepTask);
var logic = newRetryLogic(1, 4242, 2, 0, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
var error = sessionExpired();
Expand All @@ -370,7 +382,7 @@ void schedulesRetryOnSessionExpiredExceptionAsync() {
var result = "The Result";
var clock = mock(Clock.class);

var retryLogic = newRetryLogic(1, 4242, 1, 0, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 4242, 2, 0, clock, (ignored) -> {});

Supplier<CompletionStage<Object>> workMock = newWorkMock();
var error = sessionExpired();
Expand All @@ -388,7 +400,7 @@ void schedulesRetryOnSessionExpiredExceptionAsync() {
void sleepsOnTransientException() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = newRetryLogic(1, 23, 1, 0, clock, sleepTask);
var logic = newRetryLogic(1, 23, 2, 0, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
var error = transientException();
Expand All @@ -405,7 +417,7 @@ void schedulesRetryOnTransientExceptionAsync() {
var result = "The Result";
var clock = mock(Clock.class);

var retryLogic = newRetryLogic(1, 23, 1, 0, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 23, 2, 0, clock, (ignored) -> {});

Supplier<CompletionStage<Object>> workMock = newWorkMock();
var error = transientException();
Expand All @@ -423,7 +435,7 @@ void schedulesRetryOnTransientExceptionAsync() {
void throwsWhenUnknownError() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = newRetryLogic(1, 1, 1, 1, clock, sleepTask);
var logic = newRetryLogic(1, 1, 2, 1, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
var error = new IllegalStateException();
Expand All @@ -439,7 +451,7 @@ void throwsWhenUnknownError() throws Exception {
@Test
void doesNotRetryOnUnknownErrorAsync() {
var clock = mock(Clock.class);
var retryLogic = newRetryLogic(1, 1, 1, 1, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 1, 2, 1, clock, (ignored) -> {});

Supplier<CompletionStage<Object>> workMock = newWorkMock();
var error = new IllegalStateException();
Expand All @@ -456,7 +468,7 @@ void doesNotRetryOnUnknownErrorAsync() {
void throwsWhenTransactionTerminatedError() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = newRetryLogic(1, 13, 1, 0, clock, sleepTask);
var logic = newRetryLogic(1, 13, 2, 0, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
var error = new ClientException("Neo.ClientError.Transaction.Terminated", "");
Expand All @@ -472,7 +484,7 @@ void throwsWhenTransactionTerminatedError() throws Exception {
@Test
void doesNotRetryOnTransactionTerminatedErrorAsync() {
var clock = mock(Clock.class);
var retryLogic = newRetryLogic(1, 13, 1, 0, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 13, 2, 0, clock, (ignored) -> {});

Supplier<CompletionStage<Object>> workMock = newWorkMock();
var error = new ClientException("Neo.ClientError.Transaction.Terminated", "");
Expand All @@ -489,7 +501,7 @@ void doesNotRetryOnTransactionTerminatedErrorAsync() {
void throwsWhenTransactionLockClientStoppedError() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
var logic = newRetryLogic(1, 13, 1, 0, clock, sleepTask);
var logic = newRetryLogic(1, 13, 2, 0, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
var error = new ClientException("Neo.ClientError.Transaction.LockClientStopped", "");
Expand All @@ -505,7 +517,7 @@ void throwsWhenTransactionLockClientStoppedError() throws Exception {
@Test
void doesNotRetryOnTransactionLockClientStoppedErrorAsync() {
var clock = mock(Clock.class);
var retryLogic = newRetryLogic(1, 15, 1, 0, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 15, 2, 0, clock, (ignored) -> {});

Supplier<CompletionStage<Object>> workMock = newWorkMock();
var error = new ClientException("Neo.ClientError.Transaction.LockClientStopped", "");
Expand All @@ -523,7 +535,7 @@ void doesNotRetryOnTransactionLockClientStoppedErrorAsync() {
void schedulesRetryOnErrorRx(Exception error) {
var result = "The Result";
var clock = mock(Clock.class);
var retryLogic = newRetryLogic(1, 4242, 1, 0, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 4242, 2, 0, clock, (ignored) -> {});

Publisher<String> publisher = createMono(result, error);
var single = Flux.from(retryLogic.retryRx(publisher)).single();
Expand All @@ -539,7 +551,7 @@ void schedulesRetryOnErrorRx(Exception error) {
@MethodSource("cannotBeRetriedErrors")
void scheduleNoRetryOnErrorRx(Exception error) {
var clock = mock(Clock.class);
var retryLogic = newRetryLogic(1, 10, 1, 1, clock, (ignored) -> {});
var retryLogic = newRetryLogic(1, 10, 2, 1, clock, (ignored) -> {});

var single = Flux.from(retryLogic.retryRx(Mono.error(error))).single();

Expand All @@ -555,7 +567,7 @@ void throwsWhenSleepInterrupted() throws Exception {
var clock = mock(Clock.class);
var sleepTask = mock(ExponentialBackoffRetryLogic.SleepTask.class);
doThrow(new InterruptedException()).when(sleepTask).sleep(1);
var logic = newRetryLogic(1, 1, 1, 0, clock, sleepTask);
var logic = newRetryLogic(1, 1, 2, 0, clock, sleepTask);

Supplier<Void> workMock = newWorkMock();
when(workMock.get()).thenThrow(serviceUnavailable());
Expand Down