Skip to content

Commit

Permalink
fix(retryWhen): Ensure subscription tears down between retries (#5623)
Browse files Browse the repository at this point in the history
* fix(retryWhen): Ensure subscription tears down between retries

Fixes an issue where subscriptions would not teardown until all retries were exhausted in the synchronous case.

* chore: respond to feedback
  • Loading branch information
benlesh authored Aug 5, 2020
1 parent 1b29d4b commit 6752af7
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 86 deletions.
23 changes: 22 additions & 1 deletion spec/operators/retryWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { retryWhen, map, mergeMap, takeUntil } from 'rxjs/operators';
import { of, EMPTY } from 'rxjs';
import { of, EMPTY, Observable, throwError } from 'rxjs';

/** @test {retryWhen} */
describe('retryWhen operator', () => {
Expand Down Expand Up @@ -329,4 +329,25 @@ describe('retryWhen operator', () => {
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should always teardown before starting the next cycle, even when synchronous', () => {
const results: any[] = [];
const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.error('bad');
return () => {
results.push('teardown');
}
});
const subscription = source.pipe(retryWhen(errors$ => errors$.pipe(
mergeMap((err, i) => i < 3 ? of(true) : throwError(err))
))).subscribe({
next: value => results.push(value),
error: (err) => results.push(err)
});

expect(subscription.closed).to.be.true;
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'bad', 'teardown'])
});
});
164 changes: 79 additions & 85 deletions src/internal/operators/retryWhen.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Operator } from '../Operator';
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';


import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { MonoTypeOperatorFunction } from '../types';
import { lift } from '../util/lift';
import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';

/**
* Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
Expand Down Expand Up @@ -62,88 +60,84 @@ import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '..
* @name retryWhen
*/
export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => lift(source, new RetryWhenOperator(notifier, source));
}

class RetryWhenOperator<T> implements Operator<T, T> {
constructor(protected notifier: (errors: Observable<any>) => Observable<any>,
protected source: Observable<T>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RetryWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {

private errors: Subject<any> | null = null;
private retries: Observable<any> | null = null;
private retriesSubscription: Subscription | null | undefined = null;

constructor(destination: Subscriber<R>,
private notifier: (errors: Observable<any>) => Observable<any>,
private source: Observable<T>) {
super(destination);
}

error(err: any) {
if (!this.isStopped) {

let errors = this.errors;
let retries = this.retries;
let retriesSubscription = this.retriesSubscription;

if (!retries) {
errors = new Subject();
try {
const { notifier } = this;
retries = notifier(errors);
} catch (e) {
return super.error(e);
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null;
let syncResub = false;
let errors$: Subject<any>;

/**
* Gets the subject to send errors through. If it doesn't exist,
* we know we need to setup the notifier.
*/
const getErrorSubject = () => {
if (!errors$) {
errors$ = new Subject();
let notifier$: Observable<any>;
// The notifier is a user-provided function, so we need to do
// some error handling.
try {
notifier$ = notifier(errors$);
} catch (err) {
subscriber.error(err);
// Returning null here will cause the code below to
// notice there's been a problem and skip error notification.
return null;
}
subscription.add(
notifier$.subscribe({
next: () => {
if (innerSub) {
subscribeForRetryWhen();
} else {
// If we don't have an innerSub yet, that's because the inner subscription
// call hasn't even returned yet. We've arrived here synchronously.
// So we flag that we want to resub, such that we can ensure teardown
// happens before we resubscribe.
syncResub = true;
}
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
})
);
}
retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
} else {
this.errors = null;
this.retriesSubscription = null;
}

this._unsubscribeAndRecycle();

this.errors = errors;
this.retries = retries;
this.retriesSubscription = retriesSubscription;

errors!.next(err);
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
const { errors, retriesSubscription } = this;
if (errors) {
errors.unsubscribe();
this.errors = null;
}
if (retriesSubscription) {
retriesSubscription.unsubscribe();
this.retriesSubscription = null;
}
this.retries = null;
}

notifyNext(): void {
const { _unsubscribe } = this;
return errors$;
};

const subscribeForRetryWhen = () => {
innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => {
const errors$ = getErrorSubject();
if (errors$) {
// We have set up the notifier without error.
errors$.next(err);
}
},
complete: () => subscriber.complete(),
});
if (syncResub) {
// Ensure that the inner subscription is torn down before
// moving on to the next subscription in the synchronous case.
// If we don't do this here, all inner subscriptions will not be
// torn down until the entire observable is done.
innerSub.unsubscribe();
innerSub = null;
// We may need to do this multiple times, so reset the flag.
syncResub = false;
// Resubscribe
subscribeForRetryWhen();
} else {
subscription.add(innerSub);
}
};

this._unsubscribe = null!;
this._unsubscribeAndRecycle();
this._unsubscribe = _unsubscribe;
// Start the subscription
subscribeForRetryWhen();

this.source.subscribe(this);
}
return subscription;
});
}

0 comments on commit 6752af7

Please sign in to comment.