Skip to content

Commit

Permalink
port #1118 onErrorDropped also logs error, MonoCompletionStage drops
Browse files Browse the repository at this point in the history
This commit makes the MonoCompletionStage drop errors that are thrown
inside the future.whenComplete block (which can happen with fatal
exceptions). Additionally, the default onErrorDropped behavior also logs
the dropped error.

This results in these errors being visible in the logs, where they were
previously swallowed by CompletionFuture.whenComplete.

This is a backport of #1126 (commit 7e29da8), as tracked in #1111
  • Loading branch information
simonbasle committed Mar 19, 2018
1 parent 5291239 commit 97d110f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import java.util.concurrent.CompletionStage;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.Logger;
import reactor.util.Loggers;

/**
* Emits the value or error produced by the wrapped CompletionStage.
Expand All @@ -33,6 +36,8 @@ final class MonoCompletionStage<T>
extends Mono<T>
implements Fuseable {

static final Logger LOGGER = Loggers.getLogger(MonoCompletionStage.class);

final CompletionStage<? extends T> future;

MonoCompletionStage(CompletionStage<? extends T> future) {
Expand All @@ -51,12 +56,18 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

future.whenComplete((v, e) -> {
if (e != null) {
actual.onError(e);
} else if (v != null) {
sds.complete(v);
} else {
actual.onComplete();
try {
if (e != null) {
actual.onError(e);
} else if (v != null) {
sds.complete(v);
} else {
actual.onComplete();
}
}
catch (Throwable e1) {
Operators.onErrorDropped(e1, actual.currentContext());
throw Exceptions.bubble(e1);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ public static long multiplyCap(long a, long b) {

/**
* An unexpected exception is about to be dropped.
* <p>
* If no hook is registered for {@link Hooks#onErrorDropped(Consumer)}, the dropped
* error is logged at ERROR level and thrown (via {@link Exceptions#bubble(Throwable)}.
*
* @param e the dropped exception
* @param context a context that might hold a local error consumer
Expand All @@ -253,6 +256,7 @@ public static void onErrorDropped(Throwable e, Context context) {
hook = Hooks.onErrorDroppedHook;
}
if (hook == null) {
log.error("Operator called default onErrorDropped", e);
throw Exceptions.bubble(e);
}
hook.accept(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
*/
package reactor.core.publisher;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import org.junit.Test;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

public class MonoCompletionStageTest {

Expand All @@ -45,4 +43,22 @@ public void fromCompletionStage() {
assertThat(Mono.fromCompletionStage(completionStage).block())
.isEqualTo("helloFuture");
}

@Test
public void stackOverflowGoesToOnErrorDropped() {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.complete(1);
Mono<Integer> simple = Mono.fromFuture(future);

StepVerifier.create(
simple.map(r -> {
throw new StackOverflowError("boom, good bye Future");
})
)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(1))
.thenCancel()
.verifyThenAssertThat()
.hasDroppedErrorWithMessage("boom, good bye Future");
}
}

0 comments on commit 97d110f

Please sign in to comment.