diff --git a/src/operator/retryWhen.ts b/src/operator/retryWhen.ts index e28b86c419..6d83344789 100644 --- a/src/operator/retryWhen.ts +++ b/src/operator/retryWhen.ts @@ -1,15 +1,5 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { Subject } from '../Subject'; -import { Subscription, TeardownLogic } from '../Subscription'; -import { tryCatch } from '../util/tryCatch'; -import { errorObject } from '../util/errorObject'; - -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - +import { retryWhen as higherOrder } from '../operators/retryWhen'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable * calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`. @@ -25,92 +15,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function retryWhen(this: Observable, notifier: (errors: Observable) => Observable): Observable { - return this.lift(new RetryWhenOperator(notifier, this)); -} - -class RetryWhenOperator implements Operator { - constructor(protected notifier: (errors: Observable) => Observable, - protected source: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class RetryWhenSubscriber extends OuterSubscriber { - - private errors: Subject; - private retries: Observable; - private retriesSubscription: Subscription; - - constructor(destination: Subscriber, - private notifier: (errors: Observable) => Observable, - private source: Observable) { - super(destination); - } - - error(err: any) { - if (!this.isStopped) { - - let errors = this.errors; - let retries: any = this.retries; - let retriesSubscription = this.retriesSubscription; - - if (!retries) { - errors = new Subject(); - retries = tryCatch(this.notifier)(errors); - if (retries === errorObject) { - return super.error(errorObject.e); - } - retriesSubscription = subscribeToResult(this, retries); - } else { - this.errors = null; - this.retriesSubscription = null; - } - - this._unsubscribeAndRecycle(); - - this.errors = errors; - this.retries = retries; - this.retriesSubscription = retriesSubscription; - - errors.next(err); - } - } - - protected _unsubscribe() { - const { errors, retriesSubscription } = this; - if (errors) { - errors.unsubscribe(); - this.errors = null; - } - if (retriesSubscription) { - retriesSubscription.unsubscribe(); - this.retriesSubscription = null; - } - this.retries = null; - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const { errors, retries, retriesSubscription } = this; - this.errors = null; - this.retries = null; - this.retriesSubscription = null; - - this._unsubscribeAndRecycle(); - - this.errors = errors; - this.retries = retries; - this.retriesSubscription = retriesSubscription; - - this.source.subscribe(this); - } + return higherOrder(notifier)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 8d2b9ce450..b68d14e5cb 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -56,6 +56,7 @@ export { reduce } from './reduce'; export { repeat } from './repeat'; export { repeatWhen } from './repeatWhen'; export { retry } from './retry'; +export { retryWhen } from './retryWhen'; export { refCount } from './refCount'; export { scan } from './scan'; export { subscribeOn } from './subscribeOn'; diff --git a/src/operators/retryWhen.ts b/src/operators/retryWhen.ts new file mode 100644 index 0000000000..0764c515b3 --- /dev/null +++ b/src/operators/retryWhen.ts @@ -0,0 +1,118 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { Subject } from '../Subject'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { tryCatch } from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; + +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; + +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable + * calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`. + * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child + * subscription. Otherwise this method will resubscribe to the source Observable. + * + * + * + * @param {function(errors: Observable): Observable} notifier - Receives an Observable of notifications with which a + * user can `complete` or `error`, aborting the retry. + * @return {Observable} The source Observable modified with retry logic. + * @method retryWhen + * @owner Observable + */ +export function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new RetryWhenOperator(notifier, source)); +} + +class RetryWhenOperator implements Operator { + constructor(protected notifier: (errors: Observable) => Observable, + protected source: Observable) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class RetryWhenSubscriber extends OuterSubscriber { + + private errors: Subject; + private retries: Observable; + private retriesSubscription: Subscription; + + constructor(destination: Subscriber, + private notifier: (errors: Observable) => Observable, + private source: Observable) { + super(destination); + } + + error(err: any) { + if (!this.isStopped) { + + let errors = this.errors; + let retries: any = this.retries; + let retriesSubscription = this.retriesSubscription; + + if (!retries) { + errors = new Subject(); + retries = tryCatch(this.notifier)(errors); + if (retries === errorObject) { + return super.error(errorObject.e); + } + retriesSubscription = subscribeToResult(this, retries); + } else { + this.errors = null; + this.retriesSubscription = null; + } + + this._unsubscribeAndRecycle(); + + this.errors = errors; + this.retries = retries; + this.retriesSubscription = retriesSubscription; + + errors.next(err); + } + } + + protected _unsubscribe() { + const { errors, retriesSubscription } = this; + if (errors) { + errors.unsubscribe(); + this.errors = null; + } + if (retriesSubscription) { + retriesSubscription.unsubscribe(); + this.retriesSubscription = null; + } + this.retries = null; + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + const { errors, retries, retriesSubscription } = this; + this.errors = null; + this.retries = null; + this.retriesSubscription = null; + + this._unsubscribeAndRecycle(); + + this.errors = errors; + this.retries = retries; + this.retriesSubscription = retriesSubscription; + + this.source.subscribe(this); + } +}