Skip to content

Commit

Permalink
Avoid self-suppression for exceptions (#2211)
Browse files Browse the repository at this point in the history
Motivation:

In case a processing function re-throws the original exception, we risk
to face `IllegalArgumentException("Self-suppression not permitted)` when
operator adds a re-thrown exception as suppressed.

Modifications:

- Add utility that verifies the suppressed exception is not the same as
the original one;

Result:

No `IllegalArgumentException("Self-suppression not permitted)`.
  • Loading branch information
idelpivnitskiy authored May 11, 2022
1 parent 4ce81d5 commit 3d26046
Show file tree
Hide file tree
Showing 38 changed files with 97 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL;
import static io.servicetalk.concurrent.api.SubscriberApiUtils.unwrapNullUnchecked;
import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

abstract class AbstractPubToSingle<T> extends AbstractNoHandleSubscribeSingle<T> {
private final Publisher<T> source;
Expand Down Expand Up @@ -110,7 +111,7 @@ void terminate(Object terminal) {
subscriber.onSubscribe(IGNORE_CANCEL);
} catch (Throwable t) {
if (terminal instanceof Throwable) {
((Throwable) terminal).addSuppressed(t);
addSuppressed((Throwable) terminal, t);
} else {
LOGGER.warn("Unexpected exception from onSubscribe from subscriber {}. Discarding result {}.",
subscriber, terminal, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class BeforeFinallyCompletable extends AbstractSynchronousCompletableOperator {
Expand Down Expand Up @@ -78,8 +79,7 @@ public void onError(Throwable cause) {
doFinally.onError(cause);
}
} catch (Throwable error) {
error.addSuppressed(cause);
original.onError(error);
original.onError(addSuppressed(error, cause));
return;
}
original.onError(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class BeforeFinallyPublisher<T> extends AbstractSynchronousPublisherOperator<T, T> {
Expand Down Expand Up @@ -93,8 +94,7 @@ public void onError(Throwable cause) {
doFinally.onError(cause);
}
} catch (Throwable err) {
err.addSuppressed(cause);
original.onError(err);
original.onError(addSuppressed(err, cause));
return;
}
original.onError(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class BeforeFinallySingle<T> extends AbstractSynchronousSingleOperator<T, T> {
Expand Down Expand Up @@ -79,8 +80,7 @@ public void onError(Throwable cause) {
doFinally.onError(cause);
}
} catch (Throwable err) {
err.addSuppressed(cause);
original.onError(err);
original.onError(addSuppressed(err, cause));
return;
}
original.onError(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.function.Supplier;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class BeforeSubscriberCompletable extends AbstractSynchronousCompletableOperator {
Expand Down Expand Up @@ -67,8 +68,7 @@ public void onError(Throwable t) {
try {
subscriber.onError(t);
} catch (Throwable cause) {
t.addSuppressed(cause);
original.onError(t);
original.onError(addSuppressed(t, cause));
return;
}
original.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.function.Supplier;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class BeforeSubscriberPublisher<T> extends AbstractSynchronousPublisherOperator<T, T> {
Expand Down Expand Up @@ -70,8 +71,7 @@ public void onError(Throwable t) {
try {
subscriber.onError(t);
} catch (Throwable cause) {
t.addSuppressed(cause);
original.onError(t);
original.onError(addSuppressed(t, cause));
return;
}
original.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.function.Supplier;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class BeforeSubscriberSingle<T> extends AbstractSynchronousSingleOperator<T, T> {
Expand Down Expand Up @@ -66,8 +67,7 @@ public void onError(Throwable t) {
try {
subscriber.onError(t);
} catch (Throwable cause) {
t.addSuppressed(cause);
original.onError(t);
original.onError(addSuppressed(t, cause));
return;
}
original.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;

abstract class CompletableMergeSubscriber implements Subscriber {
Expand Down Expand Up @@ -68,9 +69,7 @@ public final void onError(Throwable t) {
return;
}
} else {
Throwable tmpT = (Throwable) terminalNotification;
tmpT.addSuppressed(t);
t = tmpT;
t = addSuppressed((Throwable) terminalNotification, t);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -56,7 +57,7 @@ public void cancel() {
if (t == null) {
t = tt;
} else {
t.addSuppressed(tt);
addSuppressed(t, tt);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

final class CompositeExceptionUtils {
/**
* Default to {@code 1} so {@link Throwable#addSuppressed(Throwable)} will not be used by default.
Expand All @@ -33,7 +35,7 @@ static <T> void addPendingError(AtomicIntegerFieldUpdater<T> updater, T owner, i
if (newSize < 0) {
updater.set(owner, Integer.MAX_VALUE);
} else if (newSize < maxDelayedErrors && original != causeToAdd) {
original.addSuppressed(causeToAdd);
addSuppressed(original, causeToAdd);
} else {
updater.decrementAndGet(owner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class OnErrorResumeCompletable extends AbstractNoHandleSubscribeCompletable {
Expand Down Expand Up @@ -86,8 +87,7 @@ public void onError(Throwable throwable) {
requireNonNull(parent.nextFactory.apply(throwable)) :
null;
} catch (Throwable t) {
t.addSuppressed(throwable);
subscriber.onError(t);
subscriber.onError(addSuppressed(t, throwable));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class OnErrorResumePublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
Expand Down Expand Up @@ -76,18 +77,17 @@ public void onNext(T t) {
}

@Override
public void onError(Throwable t) {
public void onError(Throwable throwable) {
final Publisher<? extends T> next;
try {
next = !resubscribed && predicate.test(t) ? requireNonNull(nextFactory.apply(t)) : null;
} catch (Throwable throwable) {
throwable.addSuppressed(t);
subscriber.onError(throwable);
next = !resubscribed && predicate.test(throwable) ? requireNonNull(nextFactory.apply(throwable)) : null;
} catch (Throwable t) {
subscriber.onError(addSuppressed(t, throwable));
return;
}

if (next == null) {
subscriber.onError(t);
subscriber.onError(throwable);
} else {
final Subscriber<? super T> offloadedSubscriber =
contextProvider.wrapPublisherSubscriber(this, contextMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class OnErrorResumeSingle<T> extends AbstractNoHandleSubscribeSingle<T> {
Expand Down Expand Up @@ -81,8 +82,7 @@ public void onError(Throwable throwable) {
try {
next = !resubscribed && predicate.test(throwable) ? requireNonNull(nextFactory.apply(throwable)) : null;
} catch (Throwable t) {
t.addSuppressed(throwable);
subscriber.onError(t);
subscriber.onError(addSuppressed(t, throwable));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.IntPredicate;

import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

/**
* {@link Publisher} to do {@link Publisher#repeat(IntPredicate)} and {@link Publisher#retry(BiIntPredicate)}
Expand Down Expand Up @@ -116,7 +117,7 @@ private void tryRedo(TerminalNotification notification) {
} catch (Throwable cause) {
Throwable originalCause = notification.cause();
if (originalCause != null) {
cause.addSuppressed(originalCause);
addSuppressed(cause, originalCause);
}
subscriber.onError(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.IntFunction;

import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -143,7 +144,7 @@ private void redoIfRequired(TerminalNotification terminalNotification) {
} catch (Throwable cause) {
Throwable originalCause = terminalNotification.cause();
if (originalCause != null) {
cause.addSuppressed(originalCause);
addSuppressed(cause, originalCause);
}
subscriber.onError(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

/**
* A {@link Single} implementation as returned by {@link Single#retry(BiIntPredicate)}.
*
Expand Down Expand Up @@ -97,8 +99,7 @@ public void onError(Throwable t) {
try {
shouldRetry = retrySingle.shouldRetry.test(++retryCount, t);
} catch (Throwable cause) {
cause.addSuppressed(t);
target.onError(cause);
target.onError(addSuppressed(cause, t));
return;
}
if (shouldRetry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -104,8 +105,7 @@ public void onError(Throwable t) {
try {
retryDecider = requireNonNull(retrySingle.shouldRetry.apply(++retryCount, t));
} catch (Throwable cause) {
cause.addSuppressed(t);
target.onError(cause);
target.onError(addSuppressed(cause, t));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -150,7 +151,7 @@ private Subscriber checkSubscriberAndExceptions() {
final RuntimeException exception = new RuntimeException("Unexpected exception(s) encountered",
exceptions.get(0));
for (int i = 1; i < exceptions.size(); i++) {
exception.addSuppressed(exceptions.get(i));
addSuppressed(exception, exceptions.get(i));
}
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -176,7 +177,7 @@ private Subscriber<? super T> checkSubscriberAndExceptions() {
final RuntimeException exception = new RuntimeException("Unexpected exception(s) encountered",
exceptions.get(0));
for (int i = 1; i < exceptions.size(); i++) {
exception.addSuppressed(exceptions.get(i));
addSuppressed(exception, exceptions.get(i));
}
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static io.servicetalk.concurrent.test.internal.AwaitUtils.await;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -153,7 +154,7 @@ private Subscriber<? super T> checkSubscriberAndExceptions() {
final RuntimeException exception = new RuntimeException("Unexpected exception(s) encountered",
exceptions.get(0));
for (int i = 1; i < exceptions.size(); i++) {
exception.addSuppressed(exceptions.get(i));
addSuppressed(exception, exceptions.get(i));
}
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -80,8 +81,7 @@ public void terminate(PublisherSource.Subscriber<?> subscriber, Throwable additi
subscriber.onError(additionalCause);
} else {
assert cause != null;
cause.addSuppressed(additionalCause);
subscriber.onError(cause);
subscriber.onError(addSuppressed(cause, additionalCause));
}
}

Expand All @@ -102,8 +102,7 @@ public void terminate(Subscriber subscriber, Throwable additionalCause) {
subscriber.onError(additionalCause);
} else {
assert cause != null;
cause.addSuppressed(additionalCause);
subscriber.onError(cause);
subscriber.onError(addSuppressed(cause, additionalCause));
}
}

Expand Down
Loading

0 comments on commit 3d26046

Please sign in to comment.