Skip to content

Commit

Permalink
feat(forEach): now allows cancellation by passing a Subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jul 30, 2018
1 parent 5a6c90f commit 8b74c2a
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 36 deletions.
130 changes: 101 additions & 29 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import * as Rx from 'rxjs/Rx';
import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { map } from '../src/internal/operators/map';
import * as HostReportErrorModule from '../src/internal/util/hostReportError';
import { noop } from '../src/internal/util/noop';
import { Subscription, interval, timer, Observable, Operator, config, Subscriber, Subject } from 'rxjs';

//tslint:disable-next-line
require('./helpers/test-helper');

declare const asDiagram: any, rxTestScheduler: any;

const Observable = Rx.Observable;

declare const __root__: any;

function expectFullObserver(val: any) {
Expand Down Expand Up @@ -82,7 +80,7 @@ describe('Observable', () => {
it('should allow Promise to be globally configured', (done) => {
let wasCalled = false;

Rx.config.Promise = function MyPromise(callback: any) {
config.Promise = function MyPromise(callback: any) {
wasCalled = true;
return new Promise<number>(callback);
} as any;
Expand Down Expand Up @@ -114,7 +112,7 @@ describe('Observable', () => {
});
});

it('should handle a synchronous throw from the next handler', () => {
it('should handle a synchronous throw from the next handler', (done: MochaDone) => {
const expected = new Error('I told, you Bobby Boucher, threes are the debil!');
const syncObservable = new Observable<number>((observer) => {
observer.next(1);
Expand All @@ -125,21 +123,22 @@ describe('Observable', () => {

const results: Array<number | Error> = [];

return syncObservable.forEach((x) => {
syncObservable.forEach((x) => {
results.push(x);
if (x === 3) {
throw expected;
}
}).then(
})
.catch(err => results.push(err))
.then(
() => {
throw new Error('should not be called');
},
(err) => {
results.push(err);
// Since the consuming code can no longer interfere with the synchronous
// producer, the remaining results are nexted.
expect(results).to.deep.equal([1, 2, 3, 4, expected]);
expect(results).to.deep.equal([1, 2, 3, expected]);
}
).then(
() => done(),
done,
);
});

Expand Down Expand Up @@ -171,6 +170,79 @@ describe('Observable', () => {
}
);
});

it('should be cancellable with a second Subscription argument', (done: MochaDone) => {
const source = interval(1000);
const subs = new Subscription();
let called = 0;
let completed = 0;
let error: any = undefined;

source.forEach(() => called++, subs)
.then(
() => completed++,
(err) => error = err,
);

subs.unsubscribe();

// wait a tick
Promise.resolve().then(() => {
expect(called).to.equal(0);
expect(completed).to.equal(0);
expect(error.message).to.equal('Observable forEach unsubscribed');
})
.then(
() => done(),
done
);
});

it('should be cancellable with a second Subscription argument, if the subscription is already unsubbed', (done: MochaDone) => {
const source = interval(1000);
const subs = new Subscription();
let called = 0;
let completed = 0;
let error: any = undefined;

subs.unsubscribe();

source.forEach(() => called++, subs)
.then(
() => completed++,
(err) => error = err,
)
.then(() => {
expect(called).to.equal(0);
expect(completed).to.equal(0);
expect(error.message).to.equal('Observable forEach unsubscribed');
})
.then(
() => done(),
done,
);
});

it('should throw an error if unsubscribed in async-await', (done: MochaDone) => {
async function test() {
const subs = new Subscription();
const results: any[] = [];
const observableComplete = timer(1000).forEach(x => results.push(x), subs);

// async, but should be the 1000ms above.
Promise.resolve().then(() => subs.unsubscribe());

let error: any = undefined;
try {
await observableComplete;
} catch (err) {
error = err;
}
expect(error.message).to.equal('Observable forEach unsubscribed');
}

test().then(() => done(), done);
});
});

describe('subscribe', () => {
Expand Down Expand Up @@ -304,7 +376,7 @@ describe('Observable', () => {
const sub = source.subscribe(() => {
//noop
});
expect(sub instanceof Rx.Subscription).to.be.true;
expect(sub instanceof Subscription).to.be.true;
expect(unsubscribeCalled).to.be.false;
expect(sub.unsubscribe).to.be.a('function');

Expand Down Expand Up @@ -555,10 +627,10 @@ describe('Observable', () => {
warnCalledWith.push(args);
};

Rx.config.useDeprecatedSynchronousErrorHandling = true;
config.useDeprecatedSynchronousErrorHandling = true;
expect(warnCalledWith.length).to.equal(1);

Rx.config.useDeprecatedSynchronousErrorHandling = false;
config.useDeprecatedSynchronousErrorHandling = false;
expect(logCalledWith.length).to.equal(1);

console.log = _log;
Expand All @@ -570,7 +642,7 @@ describe('Observable', () => {
beforeEach(() => {
const _warn = console.warn;
console.warn = noop;
Rx.config.useDeprecatedSynchronousErrorHandling = true;
config.useDeprecatedSynchronousErrorHandling = true;
console.warn = _warn;
});

Expand All @@ -584,7 +656,7 @@ describe('Observable', () => {
observer.next(1);
});

const sink = Rx.Subscriber.create(() => {
const sink = Subscriber.create(() => {
throw 'error!';
});

Expand All @@ -596,7 +668,7 @@ describe('Observable', () => {
afterEach(() => {
const _log = console.log;
console.log = noop;
Rx.config.useDeprecatedSynchronousErrorHandling = false;
config.useDeprecatedSynchronousErrorHandling = false;
console.log = _log;
});
});
Expand Down Expand Up @@ -649,7 +721,7 @@ describe('Observable.create', () => {

it('should provide an observer to the function', () => {
let called = false;
const result = Observable.create((observer: Rx.Observer<any>) => {
const result = Observable.create((observer: Observer<any>) => {
called = true;
expectFullObserver(observer);
observer.complete();
Expand Down Expand Up @@ -680,13 +752,13 @@ describe('Observable.create', () => {
/** @test {Observable} */
describe('Observable.lift', () => {

class MyCustomObservable<T> extends Rx.Observable<T> {
class MyCustomObservable<T> extends Observable<T> {
static from<T>(source: any) {
const observable = new MyCustomObservable<T>();
observable.source = <Rx.Observable<T>> source;
observable.source = <Observable<T>> source;
return observable;
}
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
Expand Down Expand Up @@ -723,7 +795,7 @@ describe('Observable.lift', () => {
observer.next(3);
observer.complete();
})
.multicast(() => new Rx.Subject<number>())
.multicast(() => new Subject<number>())
.refCount()
.map((x) => { return 10 * x; });

Expand All @@ -748,7 +820,7 @@ describe('Observable.lift', () => {
observer.next(3);
observer.complete();
})
.multicast(() => new Rx.Subject<number>(), (shared) => shared.map((x) => { return 10 * x; }));
.multicast(() => new Subject<number>(), (shared) => shared.map((x) => { return 10 * x; }));

expect(result instanceof MyCustomObservable).to.be.true;

Expand Down Expand Up @@ -837,7 +909,7 @@ describe('Observable.lift', () => {
// The custom Subscriber
const log: Array<string> = [];

class LogSubscriber<T> extends Rx.Subscriber<T> {
class LogSubscriber<T> extends Subscriber<T> {
next(value?: T): void {
log.push('next ' + value);
if (!this.isStopped) {
Expand All @@ -847,18 +919,18 @@ describe('Observable.lift', () => {
}

// The custom Operator
class LogOperator<T, R> implements Rx.Operator<T, R> {
constructor(private childOperator: Rx.Operator<T, R>) {
class LogOperator<T, R> implements Operator<T, R> {
constructor(private childOperator: Operator<T, R>) {
}

call(subscriber: Rx.Subscriber<R>, source: any): TeardownLogic {
call(subscriber: Subscriber<R>, source: any): TeardownLogic {
return this.childOperator.call(new LogSubscriber<R>(subscriber), source);
}
}

// The custom Observable
class LogObservable<T> extends Observable<T> {
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new LogObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = new LogOperator(operator);
Expand Down
20 changes: 13 additions & 7 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { throwError } from './observable/throwError';
import { observable as Symbol_observable } from '../internal/symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
import { isSubscription } from 'rxjs/internal/util/isSubscription';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -237,23 +238,28 @@ export class Observable<T> implements Subscribable<T> {
* @return {Promise} a promise that either resolves on observable completion or
* rejects with the handled error
*/
forEach(next: (value: T) => void, promiseCtor?: PromiseConstructorLike): Promise<void> {
forEach(next: (value: T) => void, promiseCtorOrSubscription?: PromiseConstructorLike|Subscription): Promise<void> {
let promiseCtor: PromiseConstructorLike;
let subs = isSubscription(promiseCtorOrSubscription) ? promiseCtorOrSubscription : undefined;
promiseCtor = getPromiseCtor(promiseCtor);

return new promiseCtor<void>((resolve, reject) => {
subs = subs || new Subscription();

// If the promise resolves with a complete, calling reject should noop.
subs.add(() => reject(new Error('Observable forEach unsubscribed')));

// Must be declared in a separate statement to avoid a RefernceError when
// accessing subscription below in the closure due to Temporal Dead Zone.
let subscription: Subscription;
subscription = this.subscribe((value) => {
subs.add(this.subscribe(function (this: Subscription, value) {
try {
next(value);
} catch (err) {
reject(err);
if (subscription) {
subscription.unsubscribe();
}
this.unsubscribe();
}
}, reject, resolve);
}, reject, resolve));

}) as Promise<void>;
}

Expand Down
9 changes: 9 additions & 0 deletions src/internal/util/isSubscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Subscription } from '../Subscription';

/**
* Tests to see if a value is an RxJS {@link Subscription}.
* @param x the value to test
*/
export function isSubscription(x: any): x is Subscription {
return x && typeof x.unsubscribe === 'function' && typeof x.add === 'function';
}

0 comments on commit 8b74c2a

Please sign in to comment.