Skip to content

Commit

Permalink
Completable#mergeDelayError(Publisher) and HTTP client control flow (a…
Browse files Browse the repository at this point in the history
…pple#1336)

Motivation:
The HTTP client control flow merges the resule of the write Publisher
(a Completable) and the read Publisher via Completable#merge(Publisher).
However when offloading is enabled it is possible events will be
delivered out of order which may result in dropped data/trailers.

Modifications:
- Introduce the Completable#mergeDelayError(Publisher) operator
- Use new mergeDelayError(..) operator instead of merge(..) so that both
streams must complete which ensures all the read data is delivered to
the application before any errors. It is assumed that if the write
Publisher fails that the read publisher will also fail due to transport
failure.

Result:
No more dropped data/trailres on HTTP client due to write failure (e.g.
connection closure) being delivered before read data due to offloading
(e.g. not enough demand, subscription is offloaded, demand arrives after
write error is already delivered).
  • Loading branch information
Scottmitch authored and bondolo committed Jan 29, 2021
1 parent 808ba23 commit 906d6cf
Show file tree
Hide file tree
Showing 9 changed files with 639 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,46 @@ public final Completable merge(Iterable<? extends Completable> other) {
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator.</a>
*/
public final <T> Publisher<T> merge(Publisher<? extends T> mergeWith) {
return new CompletableMergeWithPublisher<>(this, mergeWith, executor);
return new CompletableMergeWithPublisher<>(this, mergeWith, false, executor);
}

/**
* Merges the passed {@link Publisher} with this {@link Completable}.
* <p>
* The resulting {@link Publisher} emits all items emitted by the passed {@link Publisher} and terminates when both
* this {@link Completable} and the passed {@link Publisher} terminate. If either terminates with an error then the
* error will be propagated to the return value.
* <pre>{@code
* ExecutorService e = ...;
* List<Future<Void>> futures = ...;
* futures.add(e.submit(() -> resultOfThisCompletable()));
* futures.add(e.submit(() -> resultOfMergeWithStream());
* Throwable overallCause = null;
* // This is an approximation, this operator does not provide any ordering guarantees for the results.
* for (Future<Void> future : futures) {
* try {
* f.get();
* } catch (Throwable cause) {
* if (overallCause != null) {
* overallCause = cause;
* }
* }
* }
* if (overallCause != null) {
* throw overallCause;
* }
* }</pre>
*
* @param mergeWith the {@link Publisher} to merge in
* @param <T> The value type of the resulting {@link Publisher}.
* @return {@link Publisher} emits all items emitted by the passed {@link Publisher} and terminates when both this
* {@link Completable} and the passed {@link Publisher} terminate. If either terminates with an error then the
* error will be propagated to the return value.
*
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX merge operator.</a>
*/
public final <T> Publisher<T> mergeDelayError(Publisher<? extends T> mergeWith) {
return new CompletableMergeWithPublisher<>(this, mergeWith, true, executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import io.servicetalk.concurrent.internal.SignalOffloader;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

/**
* {@link Publisher} as returned by {@link Completable#merge(Publisher)}.
*
Expand All @@ -34,22 +37,152 @@
final class CompletableMergeWithPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
private final Completable original;
private final Publisher<? extends T> mergeWith;
private final boolean delayError;

CompletableMergeWithPublisher(Completable original, Publisher<? extends T> mergeWith, Executor executor) {
CompletableMergeWithPublisher(Completable original, Publisher<? extends T> mergeWith, boolean delayError,
Executor executor) {
super(executor);
this.mergeWith = mergeWith;
this.original = original;
this.delayError = delayError;
}

@Override
void handleSubscribe(final Subscriber<? super T> subscriber, final SignalOffloader signalOffloader,
final AsyncContextMap contextMap, final AsyncContextProvider contextProvider) {
new Merger<>(subscriber, signalOffloader, contextMap, contextProvider)
.merge(original, mergeWith, signalOffloader, contextMap, contextProvider);
if (delayError) {
new MergerDelayError<>(subscriber, signalOffloader, contextMap, contextProvider)
.merge(original, mergeWith, signalOffloader, contextMap, contextProvider);
} else {
new Merger<>(subscriber, signalOffloader, contextMap, contextProvider)
.merge(original, mergeWith, signalOffloader, contextMap, contextProvider);
}
}

private static final class Merger<T> implements Subscriber<T> {
private static final class MergerDelayError<T> implements Subscriber<T> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MergerDelayError, TerminalSignal> terminalUpdater =
AtomicReferenceFieldUpdater.newUpdater(MergerDelayError.class, TerminalSignal.class, "terminal");
private final CompletableSubscriber completableSubscriber;
private final Subscriber<? super T> offloadedSubscriber;
private final DelayedSubscription subscription = new DelayedSubscription();
@Nullable
private volatile TerminalSignal terminal;

MergerDelayError(Subscriber<? super T> subscriber, SignalOffloader signalOffloader, AsyncContextMap contextMap,
AsyncContextProvider contextProvider) {
// This is used only to deliver signals that originate from the mergeWith Publisher. Since, we need to
// preserve the threading semantics of the original Completable, we offload the subscriber so that we do not
// invoke it from the mergeWith Publisher Executor thread.
this.offloadedSubscriber = signalOffloader.offloadSubscriber(contextProvider.wrapPublisherSubscriber(
subscriber, contextMap));
completableSubscriber = new CompletableSubscriber();
}

void merge(Completable original, Publisher<? extends T> mergeWith, SignalOffloader signalOffloader,
AsyncContextMap contextMap, AsyncContextProvider contextProvider) {
offloadedSubscriber.onSubscribe(
new MergedCancellableWithSubscription(subscription, completableSubscriber));
original.delegateSubscribe(completableSubscriber, signalOffloader, contextMap,
contextProvider);
// SignalOffloader is associated with the original Completable. Since mergeWith Publisher is provided by
// the user, it will have its own Executor, hence we should not pass this signalOffloader to subscribe to
// mergeWith.
// Any signal originating from mergeWith Publisher should be offloaded before they are sent to the
// Subscriber of the resulting Publisher of CompletableMergeWithPublisher as the Executor associated with
// the original Completable defines the threading semantics for that Subscriber.
mergeWith.subscribeInternal(this);
}

@Override
public void onSubscribe(final Subscription subscription) {
this.subscription.delayedSubscription(subscription);
}

@Override
public void onNext(@Nullable final T t) {
offloadedSubscriber.onNext(t);
}

@Override
public void onError(final Throwable t) {
terminateSubscriber(new TerminalSignal(t, true));
}

@Override
public void onComplete() {
terminateSubscriber(TerminalSignal.PUB_COMPLETED);
}

private void terminateSubscriber(final TerminalSignal terminalSignal) {
for (;;) {
final TerminalSignal currState = terminal;
if (currState != null) {
if (currState.fromPublisher == terminalSignal.fromPublisher) {
// The goal of this check is to prevent concurrency on the Subscriber and require both sources
// terminate before terminating downstream. We don't need to enforce each source terminates
// exactly once for the correctness of this operator (so we don't).
throw duplicateTerminalException(currState);
}
if (currState.cause == null) {
if (terminalSignal.cause == null) {
offloadedSubscriber.onComplete();
} else {
offloadedSubscriber.onError(terminalSignal.cause);
}
} else {
offloadedSubscriber.onError(currState.cause);
}
break;
} else if (terminalUpdater.compareAndSet(this, null, terminalSignal)) {
break;
}
}
}

private final class CompletableSubscriber extends DelayedCancellable implements CompletableSource.Subscriber {
@Override
public void onSubscribe(Cancellable cancellable) {
delayedCancellable(cancellable);
}

@Override
public void onComplete() {
terminateSubscriber(TerminalSignal.COM_COMPLETED);
}

@Override
public void onError(Throwable t) {
terminateSubscriber(new TerminalSignal(t, false));
}
}

private static final class TerminalSignal {
private static final TerminalSignal PUB_COMPLETED = new TerminalSignal(true);
private static final TerminalSignal COM_COMPLETED = new TerminalSignal(false);
@Nullable
final Throwable cause;
final boolean fromPublisher;

TerminalSignal(boolean fromPublisher) {
cause = null;
this.fromPublisher = fromPublisher;
}

TerminalSignal(Throwable cause, boolean fromPublisher) {
this.cause = requireNonNull(cause);
this.fromPublisher = fromPublisher;
}
}

private static IllegalStateException duplicateTerminalException(TerminalSignal currState) {
throw new IllegalStateException("duplicate terminal event from " + (currState.fromPublisher ?
Publisher.class.getSimpleName() : Completable.class.getSimpleName()), currState.cause);
}
}

private static final class Merger<T> implements Subscriber<T> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<Merger> completionCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(Merger.class, "completionCount");

Expand All @@ -67,7 +200,6 @@ private static final class Merger<T> implements Subscriber<T> {
// invoke it from the mergeWith Publisher Executor thread.
this.offloadedSubscriber = new ConcurrentTerminalSubscriber<>(signalOffloader.offloadSubscriber(
contextProvider.wrapPublisherSubscriber(subscriber, contextMap)), false);

completableSubscriber = new CompletableSubscriber();
}

Expand Down
Loading

0 comments on commit 906d6cf

Please sign in to comment.