Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completable#mergeDelayError(Publisher) and HTTP client control flow #1336

Merged
merged 1 commit into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,46 @@ public final Completable merge(Iterable<? extends Completable> other) {
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator.</a>
*/
public final <T> Publisher<T> merge(Publisher<? extends T> mergeWith) {
return new CompletableMergeWithPublisher<>(this, mergeWith, executor);
return new CompletableMergeWithPublisher<>(this, mergeWith, false, executor);
}

/**
* Merges the passed {@link Publisher} with this {@link Completable}.
* <p>
* The resulting {@link Publisher} emits all items emitted by the passed {@link Publisher} and terminates when both
* this {@link Completable} and the passed {@link Publisher} terminate. If either terminates with an error then the
* error will be propagated to the return value.
* <pre>{@code
* ExecutorService e = ...;
* List<Future<Void>> futures = ...;
* futures.add(e.submit(() -> resultOfThisCompletable()));
* futures.add(e.submit(() -> resultOfMergeWithStream());
* Throwable overallCause = null;
* // This is an approximation, this operator does not provide any ordering guarantees for the results.
* for (Future<Void> future : futures) {
* try {
* f.get();
* } catch (Throwable cause) {
* if (overallCause != null) {
* overallCause = cause;
* }
* }
* }
* if (overallCause != null) {
* throw overallCause;
* }
* }</pre>
*
* @param mergeWith the {@link Publisher} to merge in
* @param <T> The value type of the resulting {@link Publisher}.
* @return {@link Publisher} emits all items emitted by the passed {@link Publisher} and terminates when both this
* {@link Completable} and the passed {@link Publisher} terminate. If either terminates with an error then the
* error will be propagated to the return value.
*
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator.</a>
*/
public final <T> Publisher<T> mergeDelayError(Publisher<? extends T> mergeWith) {
return new CompletableMergeWithPublisher<>(this, mergeWith, true, executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import io.servicetalk.concurrent.internal.SignalOffloader;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

/**
* {@link Publisher} as returned by {@link Completable#merge(Publisher)}.
*
Expand All @@ -34,22 +37,152 @@
final class CompletableMergeWithPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
private final Completable original;
private final Publisher<? extends T> mergeWith;
private final boolean delayError;

CompletableMergeWithPublisher(Completable original, Publisher<? extends T> mergeWith, Executor executor) {
CompletableMergeWithPublisher(Completable original, Publisher<? extends T> mergeWith, boolean delayError,
Executor executor) {
super(executor);
this.mergeWith = mergeWith;
this.original = original;
this.delayError = delayError;
}

@Override
void handleSubscribe(final Subscriber<? super T> subscriber, final SignalOffloader signalOffloader,
final AsyncContextMap contextMap, final AsyncContextProvider contextProvider) {
new Merger<>(subscriber, signalOffloader, contextMap, contextProvider)
.merge(original, mergeWith, signalOffloader, contextMap, contextProvider);
if (delayError) {
new MergerDelayError<>(subscriber, signalOffloader, contextMap, contextProvider)
.merge(original, mergeWith, signalOffloader, contextMap, contextProvider);
} else {
new Merger<>(subscriber, signalOffloader, contextMap, contextProvider)
.merge(original, mergeWith, signalOffloader, contextMap, contextProvider);
}
}

private static final class Merger<T> implements Subscriber<T> {
private static final class MergerDelayError<T> implements Subscriber<T> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MergerDelayError, TerminalSignal> terminalUpdater =
AtomicReferenceFieldUpdater.newUpdater(MergerDelayError.class, TerminalSignal.class, "terminal");
private final CompletableSubscriber completableSubscriber;
private final Subscriber<? super T> offloadedSubscriber;
private final DelayedSubscription subscription = new DelayedSubscription();
@Nullable
private volatile TerminalSignal terminal;

MergerDelayError(Subscriber<? super T> subscriber, SignalOffloader signalOffloader, AsyncContextMap contextMap,
AsyncContextProvider contextProvider) {
// This is used only to deliver signals that originate from the mergeWith Publisher. Since, we need to
// preserve the threading semantics of the original Completable, we offload the subscriber so that we do not
// invoke it from the mergeWith Publisher Executor thread.
this.offloadedSubscriber = signalOffloader.offloadSubscriber(contextProvider.wrapPublisherSubscriber(
subscriber, contextMap));
completableSubscriber = new CompletableSubscriber();
}

void merge(Completable original, Publisher<? extends T> mergeWith, SignalOffloader signalOffloader,
AsyncContextMap contextMap, AsyncContextProvider contextProvider) {
offloadedSubscriber.onSubscribe(
new MergedCancellableWithSubscription(subscription, completableSubscriber));
original.delegateSubscribe(completableSubscriber, signalOffloader, contextMap,
contextProvider);
// SignalOffloader is associated with the original Completable. Since mergeWith Publisher is provided by
// the user, it will have its own Executor, hence we should not pass this signalOffloader to subscribe to
// mergeWith.
// Any signal originating from mergeWith Publisher should be offloaded before they are sent to the
// Subscriber of the resulting Publisher of CompletableMergeWithPublisher as the Executor associated with
// the original Completable defines the threading semantics for that Subscriber.
mergeWith.subscribeInternal(this);
}

@Override
public void onSubscribe(final Subscription subscription) {
this.subscription.delayedSubscription(subscription);
}

@Override
public void onNext(@Nullable final T t) {
offloadedSubscriber.onNext(t);
}

@Override
public void onError(final Throwable t) {
terminateSubscriber(new TerminalSignal(t, true));
}

@Override
public void onComplete() {
terminateSubscriber(TerminalSignal.PUB_COMPLETED);
}

private void terminateSubscriber(final TerminalSignal terminalSignal) {
for (;;) {
final TerminalSignal currState = terminal;
if (currState != null) {
if (currState.fromPublisher == terminalSignal.fromPublisher) {
// The goal of this check is to prevent concurrency on the Subscriber and require both sources
// terminate before terminating downstream. We don't need to enforce each source terminates
// exactly once for the correctness of this operator (so we don't).
throw duplicateTerminalException(currState);
}
if (currState.cause == null) {
if (terminalSignal.cause == null) {
offloadedSubscriber.onComplete();
} else {
offloadedSubscriber.onError(terminalSignal.cause);
}
} else {
offloadedSubscriber.onError(currState.cause);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at CompletableMergeSubscriber, there we add the 2nd error as suppressed for the 1st one, but here we propagate only the first failure. The 2nd may also be useful, consider propagating both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it but I'm hesitant to use addSuppressed in general as it may result in memory leak with static exceptions, difficult to "turn off" and limit the queue size ... but we can look at this issue more generally in a followup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually unit tests remind me of another issue with addSuppressed, self-suppression throws an exception so we would have to guard against this everywhere (which we don't). I'll skip addSuppressed for now...

}
break;
} else if (terminalUpdater.compareAndSet(this, null, terminalSignal)) {
break;
}
}
}

private final class CompletableSubscriber extends DelayedCancellable implements CompletableSource.Subscriber {
@Override
public void onSubscribe(Cancellable cancellable) {
delayedCancellable(cancellable);
}

@Override
public void onComplete() {
terminateSubscriber(TerminalSignal.COM_COMPLETED);
}

@Override
public void onError(Throwable t) {
terminateSubscriber(new TerminalSignal(t, false));
}
}

private static final class TerminalSignal {
private static final TerminalSignal PUB_COMPLETED = new TerminalSignal(true);
private static final TerminalSignal COM_COMPLETED = new TerminalSignal(false);
@Nullable
final Throwable cause;
final boolean fromPublisher;

TerminalSignal(boolean fromPublisher) {
cause = null;
this.fromPublisher = fromPublisher;
}

TerminalSignal(Throwable cause, boolean fromPublisher) {
this.cause = requireNonNull(cause);
this.fromPublisher = fromPublisher;
}
}

private static IllegalStateException duplicateTerminalException(TerminalSignal currState) {
throw new IllegalStateException("duplicate terminal event from " + (currState.fromPublisher ?
Publisher.class.getSimpleName() : Completable.class.getSimpleName()), currState.cause);
}
}

private static final class Merger<T> implements Subscriber<T> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<Merger> completionCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(Merger.class, "completionCount");

Expand All @@ -67,7 +200,6 @@ private static final class Merger<T> implements Subscriber<T> {
// invoke it from the mergeWith Publisher Executor thread.
this.offloadedSubscriber = new ConcurrentTerminalSubscriber<>(signalOffloader.offloadSubscriber(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use ConcurrentSubscription and ConcurrentTerminalSubscriber to handle concurrency in this class. The ConcurrentSubscription has a comment clarifying why it's required. Consider adding a similar comment for ConcurrentTerminalSubscriber.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentTerminalSubscriber usage isn't changed in this PR and it is removed all together in #1334 so lets defer on this for now.

contextProvider.wrapPublisherSubscriber(subscriber, contextMap)), false);

completableSubscriber = new CompletableSubscriber();
}

Expand Down
Loading