Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow failure retries with exponential backoffs and 'until' predicates #1527

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.mutiny.groups;

import static io.smallrye.mutiny.helpers.ExponentialBackoff.backoffWithPredicateFactory;
import static io.smallrye.mutiny.helpers.ExponentialBackoff.noBackoffPredicateFactory;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.validate;

Expand All @@ -11,7 +13,6 @@

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ExponentialBackoff;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -97,7 +98,6 @@ public Multi<T> atMost(long numberOfAttempts) {
* @return a new {@link Multi} retrying to subscribe to the current
* {@link Multi} until it gets an item or until expiration {@code expireAt}. When the expiration is reached,
* the last failure is propagated.
*
* @throws IllegalArgumentException if back off not configured,
*/
@CheckReturnValue
Expand Down Expand Up @@ -126,7 +126,6 @@ public Multi<T> expireAt(long expireAt) {
* @return a new {@link Multi} retrying to subscribe to the current
* {@link Multi} until it gets an item or until expiration {@code expireIn}. When the expiration is reached,
* the last failure is propagated.
*
* @throws IllegalArgumentException if back off not configured,
*/
@CheckReturnValue
Expand All @@ -146,25 +145,14 @@ public Multi<T> expireIn(long expireIn) {
@CheckReturnValue
public Multi<T> until(Predicate<? super Throwable> predicate) {
Predicate<? super Throwable> actual = Infrastructure.decorate(nonNull(predicate, "predicate"));
Function<Multi<Throwable>, Publisher<Long>> whenStreamFactory;
if (backOffConfigured) {
throw new IllegalArgumentException(
"Invalid retry configuration, `until` cannot be used with a back-off configuration");
ScheduledExecutorService pool = (this.executor == null) ? Infrastructure.getDefaultWorkerPool() : this.executor;
whenStreamFactory = backoffWithPredicateFactory(initialBackOff, jitter, maxBackoff, predicate, pool);
} else {
whenStreamFactory = noBackoffPredicateFactory(predicate);
}
Function<Multi<Throwable>, Publisher<Long>> whenStreamFactory = stream -> stream.onItem()
.transformToUni(failure -> Uni.createFrom().<Long> emitter(emitter -> {
try {
if (actual.test(failure)) {
emitter.complete(1L);
} else {
emitter.fail(failure);
}
} catch (Throwable ex) {
emitter.fail(ex);
}
}))
.concatenate();
return Infrastructure
.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.mutiny.groups;

import static io.smallrye.mutiny.helpers.ExponentialBackoff.backoffWithPredicateFactory;
import static io.smallrye.mutiny.helpers.ExponentialBackoff.noBackoffPredicateFactory;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.validate;

Expand Down Expand Up @@ -134,24 +136,21 @@ public Uni<T> expireIn(long expireIn) {
* must not be {@code null}. If the predicate returns {@code true} for the given failure, a
* re-subscription is attempted.
* @return the new {@code Uni} instance
* @throws IllegalArgumentException if back off configured
*/
@CheckReturnValue
public Uni<T> until(Predicate<? super Throwable> predicate) {
ParameterValidation.nonNull(predicate, "predicate");
Function<Multi<Throwable>, Flow.Publisher<Long>> whenStreamFactory = stream -> stream.onItem()
.transformToUniAndConcatenate(failure -> {
try {
if (predicate.test(failure)) {
return Uni.createFrom().item(1L);
} else {
return Uni.createFrom().failure(failure);
}
} catch (Throwable err) {
return Uni.createFrom().failure(err);
}
});
return when(whenStreamFactory);
Function<Multi<Throwable>, Flow.Publisher<Long>> whenStreamFactory;
if (backOffConfigured) {
ScheduledExecutorService pool = (this.executor == null) ? Infrastructure.getDefaultWorkerPool() : this.executor;
whenStreamFactory = backoffWithPredicateFactory(initialBackOffDuration, jitter, maxBackoffDuration, predicate,
pool);
} else {
whenStreamFactory = noBackoffPredicateFactory(predicate);
}
Function<Multi<Throwable>, ? extends Flow.Publisher<?>> actual = Infrastructure
.decorate(nonNull(whenStreamFactory, "whenStreamFactory"));
return upstream.toMulti().onFailure(this.onFailurePredicate).retry().when(actual).toUni();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -35,10 +35,13 @@ public static Function<Multi<Throwable>, Publisher<Long>> randomExponentialBacko

validate(firstBackoff, maxBackoff, jitterFactor, executor);

AtomicInteger index = new AtomicInteger();
return t -> t
.onItem().transformToUni(failure -> {
int iteration = index.getAndIncrement();
return new Function<>() {
int index;

@Override
public Publisher<Long> apply(Multi<Throwable> t) {
return t.onItem().transformToUniAndConcatenate(failure -> {
int iteration = index++;
if (iteration >= numRetries) {
failure.addSuppressed(
new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries, failure));
Expand All @@ -48,7 +51,9 @@ public static Function<Multi<Throwable>, Publisher<Long>> randomExponentialBacko
return Uni.createFrom().item((long) iteration).onItem().delayIt()
.onExecutor(executor).by(delay);
}
}).concatenate();
});
}
};
}

private static Duration getNextDelay(Duration firstBackoff, Duration maxBackoff, double jitterFactor, int iteration) {
Expand Down Expand Up @@ -101,10 +106,13 @@ public static Function<Multi<Throwable>, Publisher<Long>> randomExponentialBacko

validate(firstBackoff, maxBackoff, jitterFactor, executor);

AtomicInteger index = new AtomicInteger();
return t -> t
.onItem().transformToUni(failure -> {
int iteration = index.getAndIncrement();
return new Function<>() {
int index;

@Override
public Publisher<Long> apply(Multi<Throwable> t) {
return t.onItem().transformToUniAndConcatenate(failure -> {
int iteration = index++;
Duration delay = getNextDelay(firstBackoff, maxBackoff, jitterFactor, iteration);

long checkTime = System.currentTimeMillis() + delay.toMillis();
Expand All @@ -117,7 +125,9 @@ public static Function<Multi<Throwable>, Publisher<Long>> randomExponentialBacko
}
return Uni.createFrom().item((long) iteration).onItem().delayIt()
.onExecutor(executor).by(delay);
}).concatenate();
});
}
};
}

private static long getJitter(double jitterFactor, Duration nextBackoff) {
Expand All @@ -144,4 +154,54 @@ private static Duration getNextAttemptDelay(Duration firstBackoff, Duration maxB
}
return nextBackoff;
}

public static Function<Multi<Throwable>, Publisher<Long>> backoffWithPredicateFactory(final Duration initialBackOff,
final double jitter, final Duration maxBackoff, Predicate<? super Throwable> predicate,
ScheduledExecutorService pool) {
return new Function<>() {
int index = 0;

@Override
public Publisher<Long> apply(Multi<Throwable> stream) {
return stream.onItem()
.transformToUniAndConcatenate(failure -> {
int iteration = index++;
try {
if (predicate.test(failure)) {
Duration delay = getNextDelay(initialBackOff, maxBackoff, jitter,
iteration);
return Uni.createFrom().item((long) iteration)
.onItem().delayIt().onExecutor(pool).by(delay);
} else {
return Uni.createFrom().failure(failure);
}
} catch (Throwable err) {
failure.addSuppressed(err);
return Uni.createFrom().failure(failure);
}
});
}
};
}

public static Function<Multi<Throwable>, Publisher<Long>> noBackoffPredicateFactory(
Predicate<? super Throwable> predicate) {
return new Function<>() {
@Override
public Publisher<Long> apply(Multi<Throwable> stream) {
return stream.onItem()
.transformToUniAndConcatenate(failure -> {
try {
if (predicate.test(failure)) {
return Uni.createFrom().item(1L);
} else {
return Uni.createFrom().failure(failure);
}
} catch (Throwable err) {
return Uni.createFrom().failure(err);
}
});
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -594,4 +597,54 @@ void avoidSpuriousInterruptsWithBackoff() {
assertThat(result).isEqualTo("yolo");
assertThat(interrupted).isFalse();
}

@Test
void backoffWithUntilPredicateAlwaysTrue() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();
String result = Uni.createFrom().<String> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
if (counter.getAndIncrement() < 5) {
emitter.fail(new IOException("boom"));
} else {
emitter.complete("ok");
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0).until(err -> true)
.await().atMost(Duration.ofSeconds(5));

assertThat(result).isEqualTo("ok");
assertThat(timestamps).hasSize(6);

ArrayList<Long> diffs = new ArrayList<>();
int index = timestamps.size() - 1;
while (index > 0) {
diffs.add(timestamps.get(index) - timestamps.get(index - 1));
index = index - 1;
}
assertThat(diffs)
.doesNotHaveDuplicates()
.isSortedAccordingTo(Comparator.reverseOrder());
}

@Test
void backoffWithUntilPredicateEventuallyFailing() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();

assertThrows(CompletionException.class, () -> Uni.createFrom().<String> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
if (counter.getAndIncrement() < 5) {
emitter.fail(new IOException("boom"));
} else {
emitter.complete("ok");
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0)
.until(err -> counter.get() < 2)
.await().atMost(Duration.ofSeconds(5)));

assertThat(timestamps).hasSize(2);
assertThat(timestamps.get(1) - timestamps.get(0)).isGreaterThan(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,6 @@ public void testThatYouCannotUseWhenIfBackoffIsConfigured() {
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).when(t -> Multi.createFrom().item(t)));
}

@Test
public void testThatYouCannotUseUntilIfBackoffIsConfigured() {
assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item("hello")
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).until(t -> true));
}

@Test
public void testJitterValidation() {
assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().item("hello")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -190,4 +193,70 @@ public void testWithPredicateReturningFalse() {
subscriber.assertItems(0, 1).assertFailedWith(Exception.class, "boom");
}

@Test
public void testWithBackoffAndUntilAndAlwaysTruePredicate() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();
AssertSubscriber<Integer> sub = Multi.createFrom().<Integer> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
if (counter.incrementAndGet() == 5) {
emitter.complete();
} else {
emitter.fail(new IOException("boom"));
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0).until(err -> true)
.subscribe().withSubscriber(AssertSubscriber.create());

sub.request(256);
sub.awaitCompletion();
List<Integer> items = sub.getItems();
assertThat(items)
.hasSize(15)
.startsWith(1, 2, 3)
.endsWith(1, 2, 3);

assertThat(timestamps)
.hasSize(5);
assertThat(timestamps.get(4) - timestamps.get(3))
.isGreaterThan(timestamps.get(3) - timestamps.get(2));
assertThat(timestamps.get(3) - timestamps.get(2))
.isGreaterThan(timestamps.get(2) - timestamps.get(1));
}

@Test
public void testWithBackoffAndUntilAndEventualFailure() {
AtomicInteger counter = new AtomicInteger();
ArrayList<Long> timestamps = new ArrayList<>();
AssertSubscriber<Integer> sub = Multi.createFrom().<Integer> emitter(emitter -> {
timestamps.add(System.currentTimeMillis());
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
if (counter.incrementAndGet() == 5) {
emitter.complete();
} else {
emitter.fail(new IOException("boom"));
}
})
.onFailure().retry().withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)).withJitter(0)
.until(err -> counter.get() < 3)
.subscribe().withSubscriber(AssertSubscriber.create());

sub.request(256);
sub.awaitFailure().assertFailedWith(IOException.class, "boom");

assertThat(sub.getItems())
.hasSize(9)
.startsWith(1, 2, 3)
.endsWith(1, 2, 3);

assertThat(timestamps)
.hasSize(3);
assertThat(timestamps.get(2) - timestamps.get(1))
.isGreaterThan(timestamps.get(1) - timestamps.get(0));
}
}
Loading
Loading