Skip to content

Commit

Permalink
fix(catchError): ensure proper handling of async return for synchrono…
Browse files Browse the repository at this point in the history
…us source error handling (#5627)

- Refactors catchError to be simpler and to get rid of _unsubscribeAndRecycle usage as well as some other overly clever bits.

fixes #5115
  • Loading branch information
benlesh authored Aug 5, 2020
1 parent af1824e commit 1b29d4b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 54 deletions.
8 changes: 4 additions & 4 deletions spec/operators/catch-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators';
import { catchError, map, mergeMap, takeWhile, delay } from 'rxjs/operators';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
import { TestScheduler } from 'rxjs/testing';
Expand Down Expand Up @@ -431,8 +431,8 @@ describe('catchError operator', () => {
// TODO(v8): see https://github.com/ReactiveX/rxjs/issues/5115
// The re-implementation in version 8 should fix the problem in the
// referenced issue. Closed subscribers should remain closed.
/*
it('issue #5115', (done: MochaDone) => {

it('Properly handle async handled result if source is synchronous', (done: MochaDone) => {
const source = new Observable<string>(observer => {
observer.error(new Error('kaboom!'));
observer.complete();
Expand All @@ -456,5 +456,5 @@ describe('catchError operator', () => {
}
);
});
*/

});
96 changes: 46 additions & 50 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { Operator } from '../Operator';
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';

import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { lift } from '../util/lift';
import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
import { Subscription } from '../Subscription';
import { from } from '../observable/from';

/* tslint:disable:max-line-length */
export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>>;
/* tslint:enable:max-line-length */

/**
Expand Down Expand Up @@ -105,55 +108,48 @@ export function catchError<T, O extends ObservableInput<any>>(selector: (err: an
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>> {
return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> {
const operator = new CatchOperator(selector);
const caught = lift(source, operator);
operator.caught = caught;
return caught;
};
}
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;

class CatchOperator<T, R> implements Operator<T, T | R> {
caught: Observable<T> | undefined;
const handleError = (err: any) => {
try {
handledResult = from(selector(err, catchError(selector)(source)));
} catch (err) {
subscriber.error(err);
return;
}
};

constructor(private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught!));
}
}
innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult.subscribe(subscriber));
} else {
syncUnsub = true;
}
}
},
complete: () => subscriber.complete(),
});

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class CatchSubscriber<T, R> extends SimpleOuterSubscriber<T, T | R> {
constructor(destination: Subscriber<any>,
private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>,
private caught: Observable<T>) {
super(destination);
}

// NOTE: overriding `error` instead of `_error` because we don't want
// to have this flag this subscriber as `isStopped`. We can mimic the
// behavior of the RetrySubscriber (from the `retry` operator), where
// we unsubscribe from our source chain, reset our Subscriber flags,
// then subscribe to the selector result.
error(err: any) {
if (!this.isStopped) {
let result: any;
try {
result = this.selector(err, this.caught);
} catch (err2) {
super.error(err2);
return;
if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscription.add(handledResult!.subscribe(subscriber));
} else {
subscription.add(innerSub);
}
this._unsubscribeAndRecycle();
const innerSubscriber = new SimpleInnerSubscriber(this);
this.add(innerSubscriber);
innerSubscribe(result, innerSubscriber);
}
}

return subscription;
});
}

0 comments on commit 1b29d4b

Please sign in to comment.