Skip to content

Commit

Permalink
refactor(skipUntil): call super(destination) constructor
Browse files Browse the repository at this point in the history
Change SkipUntilSubscriber to give the destination subscriber to the
constructor in Subscriber. Adapt SkipUntilSubscriber to handle
unsubscription-on-complete correctly, by overriding unsubscribe().

Addresses ReactiveX#577.
  • Loading branch information
staltz committed Oct 29, 2015
1 parent bbf6d99 commit 3558a4f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
18 changes: 9 additions & 9 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import Subscription from './Subscription';
export default class Subscriber<T> extends Subscription<T> implements Observer<T> {
protected destination: Observer<any>;

private _subscription: Subscription<T>;
protected _subscription: Subscription<T>;

protected _isUnsubscribed: boolean = false;

private _isUnsubscribed: boolean = false;

static create<T>(next ?: (x?:any) => void,
error ?: (e?:any) => void,
complete?: () => void): Subscriber<T> {
Expand All @@ -22,23 +22,23 @@ export default class Subscriber<T> extends Subscription<T> implements Observer<T
subscriber._complete = (typeof complete === "function") && complete || noop;
return subscriber;
}

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
this.destination.complete();
}

constructor(destination?: Observer<any>) {
super();
this.destination = destination;

if (!destination) {
return;
}
Expand Down Expand Up @@ -90,9 +90,9 @@ export default class Subscriber<T> extends Subscription<T> implements Observer<T
}

unsubscribe() {
if(this._isUnsubscribed) {
if (this._isUnsubscribed) {
return;
} else if(this._subscription) {
} else if (this._subscription) {
this._isUnsubscribed = true;
} else {
super.unsubscribe();
Expand Down
15 changes: 13 additions & 2 deletions src/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ class SkipUntilOperator<T, R> implements Operator<T, R> {
class SkipUntilSubscriber<T> extends Subscriber<T> {
private notificationSubscriber: NotificationSubscriber<any> = null;

constructor(public destination: Subscriber<T>,
constructor(destination: Subscriber<T>,
private notifier: Observable<any>) {
super(null);
super(destination);
this.notificationSubscriber = new NotificationSubscriber(this);
this.add(this.notifier.subscribe(this.notificationSubscriber));
}
Expand All @@ -42,6 +42,17 @@ class SkipUntilSubscriber<T> extends Subscriber<T> {
}
this.notificationSubscriber.unsubscribe();
}

unsubscribe() {
if (this._isUnsubscribed) {
return;
} else if (this._subscription) {
this._subscription.unsubscribe();
this._isUnsubscribed = true;
} else {
super.unsubscribe();
}
}
}

class NotificationSubscriber<T> extends Subscriber<T> {
Expand Down

0 comments on commit 3558a4f

Please sign in to comment.