Skip to content

Commit

Permalink
feat(takeUntil): no longer subscribes to sourec if notifier synchrono…
Browse files Browse the repository at this point in the history
…usly emits

closes ReactiveX#2189
  • Loading branch information
benlesh committed Mar 30, 2018
1 parent 5be4a37 commit 7ccb684
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
9 changes: 9 additions & 0 deletions spec/operators/takeUntil-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ describe('Observable.prototype.takeUntil', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should complete without subscribing to the source when notifier synchronously emits', () => {
const e1 = hot('----a--|');
const e2 = Observable.of(0);
const expected = '(|) ';

expectObservable(e1.takeUntil(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe([]);
});

it('should allow unsubscribing explicitly and early', () => {
const e1 = hot('--a--b--c--d--e--f--g--|');
const e1subs = '^ ! ';
Expand Down
12 changes: 8 additions & 4 deletions src/internal/operators/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ class TakeUntilOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new TakeUntilSubscriber(subscriber, this.notifier));
const takeUntilSubscriber = new TakeUntilSubscriber(subscriber);
const notifierSubscription = subscribeToResult(takeUntilSubscriber, this.notifier);
if (notifierSubscription && !notifierSubscription.closed) {
takeUntilSubscriber.add(notifierSubscription);
return source.subscribe(takeUntilSubscriber);
}
return takeUntilSubscriber;
}
}

Expand All @@ -62,10 +68,8 @@ class TakeUntilOperator<T> implements Operator<T, T> {
*/
class TakeUntilSubscriber<T, R> extends OuterSubscriber<T, R> {

constructor(destination: Subscriber<any>,
private notifier: Observable<any>) {
constructor(destination: Subscriber<any>, ) {
super(destination);
this.add(subscribeToResult(this, notifier));
}

notifyNext(outerValue: T, innerValue: R,
Expand Down

0 comments on commit 7ccb684

Please sign in to comment.