From 7d4cb7ab618b7971217cb309c2940c3c253949c3 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Fri, 23 Oct 2015 13:27:52 +0300 Subject: [PATCH] fix(takeUntil): unsubscribe notifier when it completes Fix operator takeUntil to automatically unsubscribe the notifier when it completes. This is to conform RxJS Next with RxJS 4. Somewhat related to issue #577. --- src/CoreOperators.ts | 2 +- src/operators/takeUntil.ts | 43 +++++++++++++++++++++++++------------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 09ffda5a0cb..cf8d226ac49 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -66,7 +66,7 @@ export interface CoreOperators { switchMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; switchMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take?: (count: number) => Observable; - takeUntil?: (observable: Observable) => Observable; + takeUntil?: (notifier: Observable) => Observable; throttle?: (delay: number, scheduler?: Scheduler) => Observable; timeout?: (due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith?: (due: number|Date, withObservable: Observable, scheduler?: Scheduler) => Observable; diff --git a/src/operators/takeUntil.ts b/src/operators/takeUntil.ts index e63d4026d5f..3f8239a6bc6 100644 --- a/src/operators/takeUntil.ts +++ b/src/operators/takeUntil.ts @@ -3,41 +3,56 @@ import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; -export default function takeUntil(observable: Observable) { - return this.lift(new TakeUntilOperator(observable)); +export default function takeUntil(notifier: Observable) { + return this.lift(new TakeUntilOperator(notifier)); } class TakeUntilOperator implements Operator { - - observable: Observable; - - constructor(observable: Observable) { - this.observable = observable; + constructor(private notifier: Observable) { } call(subscriber: Subscriber): Subscriber { - return new TakeUntilSubscriber(subscriber, this.observable); + return new TakeUntilSubscriber(subscriber, this.notifier); } } class TakeUntilSubscriber extends Subscriber { - constructor(destination: Subscriber, - observable: Observable) { - super(destination); - this.add(observable._subscribe(new TakeUntilInnerSubscriber(destination))); + private notificationSubscriber: TakeUntilInnerSubscriber = null; + + constructor(public destination: Subscriber, + private notifier: Observable) { + super(null); + this.notificationSubscriber = new TakeUntilInnerSubscriber(destination); + this.add(notifier.subscribe(this.notificationSubscriber)); + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this.destination.error(err); + } + + _complete() { + this.destination.complete(); + this.notificationSubscriber.unsubscribe(); } } class TakeUntilInnerSubscriber extends Subscriber { - constructor(destination: Subscriber) { - super(destination); + constructor(public destination: Subscriber) { + super(null); } + _next() { this.destination.complete(); } + _error(e) { this.destination.error(e); } + _complete() { } } \ No newline at end of file