diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 5902db3d22..379509af08 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs'; -import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators'; +import { catchError, map, mergeMap, takeWhile, delay } from 'rxjs/operators'; import * as sinon from 'sinon'; import { createObservableInputs } from '../helpers/test-helper'; import { TestScheduler } from 'rxjs/testing'; @@ -431,8 +431,8 @@ describe('catchError operator', () => { // TODO(v8): see https://github.com/ReactiveX/rxjs/issues/5115 // The re-implementation in version 8 should fix the problem in the // referenced issue. Closed subscribers should remain closed. - /* - it('issue #5115', (done: MochaDone) => { + + it('Properly handle async handled result if source is synchronous', (done: MochaDone) => { const source = new Observable(observer => { observer.error(new Error('kaboom!')); observer.complete(); @@ -456,5 +456,5 @@ describe('catchError operator', () => { } ); }); - */ + }); diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index c60fa130bc..64572c2b7c 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -1,13 +1,16 @@ -import { Operator } from '../Operator'; +/** @prettier */ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { lift } from '../util/lift'; -import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; +import { Subscription } from '../Subscription'; +import { from } from '../observable/from'; /* tslint:disable:max-line-length */ -export function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; +export function catchError>( + selector: (err: any, caught: Observable) => O +): OperatorFunction>; /* tslint:enable:max-line-length */ /** @@ -105,55 +108,48 @@ export function catchError>(selector: (err: an export function catchError>( selector: (err: any, caught: Observable) => O ): OperatorFunction> { - return function catchErrorOperatorFunction(source: Observable): Observable> { - const operator = new CatchOperator(selector); - const caught = lift(source, operator); - operator.caught = caught; - return caught; - }; -} + return (source: Observable) => + lift(source, function (this: Subscriber, source: Observable) { + const subscriber = this; + const subscription = new Subscription(); + let innerSub: Subscription | null = null; + let syncUnsub = false; + let handledResult: Observable>; -class CatchOperator implements Operator { - caught: Observable | undefined; + const handleError = (err: any) => { + try { + handledResult = from(selector(err, catchError(selector)(source))); + } catch (err) { + subscriber.error(err); + return; + } + }; - constructor(private selector: (err: any, caught: Observable) => ObservableInput) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught!)); - } -} + innerSub = source.subscribe({ + next: (value) => subscriber.next(value), + error: (err) => { + handleError(err); + if (handledResult) { + if (innerSub) { + innerSub.unsubscribe(); + innerSub = null; + subscription.add(handledResult.subscribe(subscriber)); + } else { + syncUnsub = true; + } + } + }, + complete: () => subscriber.complete(), + }); -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class CatchSubscriber extends SimpleOuterSubscriber { - constructor(destination: Subscriber, - private selector: (err: any, caught: Observable) => ObservableInput, - private caught: Observable) { - super(destination); - } - - // NOTE: overriding `error` instead of `_error` because we don't want - // to have this flag this subscriber as `isStopped`. We can mimic the - // behavior of the RetrySubscriber (from the `retry` operator), where - // we unsubscribe from our source chain, reset our Subscriber flags, - // then subscribe to the selector result. - error(err: any) { - if (!this.isStopped) { - let result: any; - try { - result = this.selector(err, this.caught); - } catch (err2) { - super.error(err2); - return; + if (syncUnsub) { + innerSub.unsubscribe(); + innerSub = null; + subscription.add(handledResult!.subscribe(subscriber)); + } else { + subscription.add(innerSub); } - this._unsubscribeAndRecycle(); - const innerSubscriber = new SimpleInnerSubscriber(this); - this.add(innerSubscriber); - innerSubscribe(result, innerSubscriber); - } - } + + return subscription; + }); }