Skip to content

Commit

Permalink
Reduce Publisher/Single retry/repeat allocations (#2120)
Browse files Browse the repository at this point in the history
Motivation:
RepeatWhenSingle was recently introduced and avoids allocating
new Subscribers for each repeat operation. This helps save
object allocation when the operator is used in a loop. We
should use a consistent strategy for existing Publisher/Single
repeat/retry related operators.

Modifications:
- Redo[When]Publisher and Retry[When]Single no longer allocates
  a new Subscriber on each iteration.
- All the repeat/retry operators now have consistent behavior
  for AsyncContext map which is to re-use it. Previously a copy
  was made before resubscribing, however the old map is no longer
  used after a terminal event anyways so this shouldn't have any
  benefit. Either we copy the map up front before subscribe, or
  we just re-use the same map and let the async source at the top
  of the chain reset if necessary.

Result:
Less object allocation and consistent AsyncContext behavior for
repeat/retry operators.
  • Loading branch information
Scottmitch authored Mar 1, 2022
1 parent aab5549 commit 70d2a47
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ void handleSubscribe(Subscriber<? super T> subscriber, ContextMap contextMap,
}

abstract static class AbstractRedoSubscriber<T> implements Subscriber<T> {

final SequentialSubscription subscription;
final int redoCount;
final Subscriber<? super T> subscriber;
int redoCount;

AbstractRedoSubscriber(SequentialSubscription subscription, int redoCount, Subscriber<? super T> subscriber) {
this.subscription = subscription;
Expand Down Expand Up @@ -81,7 +80,6 @@ Subscription decorate(Subscription s) {
}

private static final class RedoSubscriber<T> extends AbstractRedoSubscriber<T> {

private final RedoPublisher<T> redoPublisher;
private final ContextMap contextMap;
private final AsyncContextProvider contextProvider;
Expand Down Expand Up @@ -114,7 +112,7 @@ public void onComplete() {
private void tryRedo(TerminalNotification notification) {
final boolean shouldRedo;
try {
shouldRedo = redoPublisher.shouldRedo.test(redoCount + 1, notification);
shouldRedo = redoPublisher.shouldRedo.test(++redoCount, notification);
} catch (Throwable cause) {
Throwable originalCause = notification.cause();
if (originalCause != null) {
Expand All @@ -125,13 +123,9 @@ private void tryRedo(TerminalNotification notification) {
}

if (shouldRedo) {
// For the current subscribe operation we want to use contextMap directly, but in the event a
// re-subscribe operation occurs we want to restore the original state of the AsyncContext map, so
// we save a copy upfront.
redoPublisher.original.delegateSubscribe(
new RedoSubscriber<>(subscription, redoCount + 1,
subscriber, contextMap.copy(), contextProvider, redoPublisher),
contextMap, contextProvider);
// Either we copy the map up front before subscribe, or we just re-use the same map and let the async
// source at the top of the chain reset if necessary. We currently choose the second option.
redoPublisher.original.delegateSubscribe(this, contextMap, contextProvider);
} else {
notification.terminate(subscriber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,33 @@ void handleSubscribe(Subscriber<? super T> subscriber,
}

private static final class RedoSubscriber<T> extends RedoPublisher.AbstractRedoSubscriber<T> {

private final SequentialCancellable cancellable;
private final RedoWhenPublisher<T> redoPublisher;
private final ContextMap contextMap;
private final AsyncContextProvider contextProvider;
private final CompletableSource.Subscriber completableSubscriber = new CompletableSource.Subscriber() {
@Override
public void onSubscribe(Cancellable completableCancellable) {
cancellable.nextCancellable(completableCancellable);
}

@Override
public void onComplete() {
// Either we copy the map up front before subscribe, or we just re-use the same map and let the async
// source at the top of the chain reset if necessary. We currently choose the second option.
redoPublisher.original.delegateSubscribe(RedoSubscriber.this, contextMap, contextProvider);
}

@Override
public void onError(Throwable t) {
if (!redoPublisher.forRetry) {
// repeat operator terminates repeat with error.
subscriber.onComplete();
} else {
subscriber.onError(t);
}
}
};

RedoSubscriber(SequentialSubscription subscription, int redoCount, Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider,
Expand Down Expand Up @@ -116,7 +138,7 @@ Subscription decorate(Subscription s) {
private void redoIfRequired(TerminalNotification terminalNotification) {
final Completable redoDecider;
try {
redoDecider = requireNonNull(redoPublisher.shouldRedo.apply(redoCount + 1, terminalNotification));
redoDecider = requireNonNull(redoPublisher.shouldRedo.apply(++redoCount, terminalNotification));
} catch (Throwable cause) {
Throwable originalCause = terminalNotification.cause();
if (originalCause != null) {
Expand All @@ -126,33 +148,7 @@ private void redoIfRequired(TerminalNotification terminalNotification) {
return;
}

redoDecider.subscribeInternal(new CompletableSource.Subscriber() {
@Override
public void onSubscribe(Cancellable completableCancellable) {
cancellable.nextCancellable(completableCancellable);
}

@Override
public void onComplete() {
// For the current subscribe operation we want to use contextMap directly, but in the event a
// re-subscribe operation occurs we want to restore the original state of the AsyncContext map, so
// we save a copy upfront.
redoPublisher.original.delegateSubscribe(
new RedoSubscriber<>(subscription, redoCount + 1, subscriber, contextMap.copy(),
contextProvider, redoPublisher),
contextMap, contextProvider);
}

@Override
public void onError(Throwable t) {
if (!redoPublisher.forRetry) {
// repeat operator terminates repeat with error.
subscriber.onComplete();
} else {
subscriber.onError(t);
}
}
});
redoDecider.subscribeInternal(completableSubscriber);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public void request(final long n) {
final long prev = outstandingDemandUpdater.getAndAccumulate(this, n,
FlowControlUtils::addWithOverflowProtectionIfNotNegative);
if (prev == 0) {
// Either we copy the map up front before subscribe, or we just re-use the same map and let the
// async source at the top of the chain reset if necessary. We currently choose the second option.
outer.original.delegateSubscribe(repeatSubscriber, contextMap, contextProvider);
}
} else {
Expand Down Expand Up @@ -147,6 +149,9 @@ public void onComplete() {
break;
} else if (outstandingDemandUpdater.compareAndSet(RepeatSubscription.this, prev, prev - 1)) {
if (prev > 1) {
// Either we copy the map up front before subscribe, or we just re-use the same map and
// let the async source at the top of the chain reset if necessary. We currently choose
// the second option.
outer.original.delegateSubscribe(RepeatSubscriber.this, contextMap, contextProvider);
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ void handleSubscribe(final Subscriber<? super T> subscriber,
}

abstract static class AbstractRetrySubscriber<T> implements Subscriber<T> {

final SequentialCancellable sequentialCancellable;
final Subscriber<? super T> target;
final int retryCount;
int retryCount;

AbstractRetrySubscriber(SequentialCancellable sequentialCancellable, Subscriber<? super T> target,
int retryCount) {
Expand All @@ -74,7 +73,6 @@ Cancellable decorate(Cancellable cancellable) {
}

private static final class RetrySubscriber<T> extends AbstractRetrySubscriber<T> {

private final RetrySingle<T> retrySingle;
private final ContextMap contextMap;
private final AsyncContextProvider contextProvider;
Expand All @@ -97,20 +95,16 @@ public void onSuccess(@Nullable T result) {
public void onError(Throwable t) {
final boolean shouldRetry;
try {
shouldRetry = retrySingle.shouldRetry.test(retryCount + 1, t);
shouldRetry = retrySingle.shouldRetry.test(++retryCount, t);
} catch (Throwable cause) {
cause.addSuppressed(t);
target.onError(cause);
return;
}
if (shouldRetry) {
// For the current subscribe operation we want to use contextMap directly, but in the event a
// re-subscribe operation occurs we want to restore the original state of the AsyncContext map, so
// we save a copy upfront.
retrySingle.original.delegateSubscribe(
new RetrySubscriber<>(sequentialCancellable, retrySingle, target, retryCount + 1,
contextMap.copy(), contextProvider),
contextMap, contextProvider);
// Either we copy the map up front before subscribe, or we just re-use the same map and let the async
// source at the top of the chain reset if necessary. We currently choose the second option.
retrySingle.original.delegateSubscribe(this, contextMap, contextProvider);
} else {
target.onError(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,28 @@ void handleSubscribe(final Subscriber<? super T> subscriber,
}

private static final class RetrySubscriber<T> extends RetrySingle.AbstractRetrySubscriber<T> {

private final SequentialCancellable retrySignalCancellable;
private final RetryWhenSingle<T> retrySingle;
private final ContextMap contextMap;
private final AsyncContextProvider contextProvider;
private final CompletableSource.Subscriber completableSubscriber = new CompletableSource.Subscriber() {
@Override
public void onSubscribe(Cancellable completableCancellable) {
retrySignalCancellable.nextCancellable(completableCancellable);
}

@Override
public void onComplete() {
// Either we copy the map up front before subscribe, or we just re-use the same map and let the async
// source at the top of the chain reset if necessary. We currently choose the second option.
retrySingle.original.delegateSubscribe(RetrySubscriber.this, contextMap, contextProvider);
}

@Override
public void onError(Throwable t) {
target.onError(t);
}
};

RetrySubscriber(SequentialCancellable cancellable, int redoCount, Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider,
Expand Down Expand Up @@ -85,34 +102,14 @@ public void onSuccess(@Nullable T t) {
public void onError(Throwable t) {
final Completable retryDecider;
try {
retryDecider = requireNonNull(retrySingle.shouldRetry.apply(retryCount + 1, t));
retryDecider = requireNonNull(retrySingle.shouldRetry.apply(++retryCount, t));
} catch (Throwable cause) {
cause.addSuppressed(t);
target.onError(cause);
return;
}

retryDecider.subscribeInternal(new CompletableSource.Subscriber() {
@Override
public void onSubscribe(Cancellable completableCancellable) {
retrySignalCancellable.nextCancellable(completableCancellable);
}

@Override
public void onComplete() {
// For the current subscribe operation we want to use contextMap directly, but in the event a
// re-subscribe operation occurs we want to restore the original state of the AsyncContext map, so
// we save a copy upfront.
retrySingle.original.delegateSubscribe(new RetrySubscriber<>(sequentialCancellable,
retryCount + 1, target, contextMap.copy(), contextProvider, retrySingle),
contextMap, contextProvider);
}

@Override
public void onError(Throwable t) {
target.onError(t);
}
});
retryDecider.subscribeInternal(completableSubscriber);
}
}
}

0 comments on commit 70d2a47

Please sign in to comment.