Skip to content

Commit

Permalink
fix(takeUntil): unsubscribe notifier when it completes
Browse files Browse the repository at this point in the history
Fix operator takeUntil to automatically unsubscribe the notifier when it
completes. This is to conform RxJS Next with RxJS 4.

Somewhat related to issue ReactiveX#577.
  • Loading branch information
Andre Medeiros committed Oct 23, 2015
1 parent ba997c6 commit 7d4cb7a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export interface CoreOperators<T> {
switchMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeUntil?: (observable: Observable<any>) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
throttle?: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout?: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith?: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
Expand Down
43 changes: 29 additions & 14 deletions src/operators/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,56 @@ import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';

export default function takeUntil<T>(observable: Observable<any>) {
return this.lift(new TakeUntilOperator(observable));
export default function takeUntil<T>(notifier: Observable<any>) {
return this.lift(new TakeUntilOperator(notifier));
}

class TakeUntilOperator<T, R> implements Operator<T, R> {

observable: Observable<any>;

constructor(observable: Observable<any>) {
this.observable = observable;
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new TakeUntilSubscriber(subscriber, this.observable);
return new TakeUntilSubscriber(subscriber, this.notifier);
}
}

class TakeUntilSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>,
observable: Observable<any>) {
super(destination);
this.add(observable._subscribe(new TakeUntilInnerSubscriber(destination)));
private notificationSubscriber: TakeUntilInnerSubscriber<any> = null;

constructor(public destination: Subscriber<T>,
private notifier: Observable<any>) {
super(null);
this.notificationSubscriber = new TakeUntilInnerSubscriber(destination);
this.add(notifier.subscribe(this.notificationSubscriber));
}

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
this.destination.complete();
this.notificationSubscriber.unsubscribe();
}
}

class TakeUntilInnerSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>) {
super(destination);
constructor(public destination: Subscriber<T>) {
super(null);
}

_next() {
this.destination.complete();
}

_error(e) {
this.destination.error(e);
}

_complete() {
}
}

0 comments on commit 7d4cb7a

Please sign in to comment.