Skip to content

Commit

Permalink
fix(repeat): fix inner subscription semantics for repeat
Browse files Browse the repository at this point in the history
Fix repeat operator to unsubscribe from the repeatable source as soon as
possible (when the previous repetition was completed), not when the
resulting Observable was unsubscribed (which is as late as possible).
Also fix repeat operator to not subscribe to the source at all if
count=0.

Resolves ReactiveX#554.
  • Loading branch information
Andre Medeiros committed Oct 19, 2015
1 parent c75e414 commit 95c10c8
Showing 1 changed file with 64 additions and 20 deletions.
84 changes: 64 additions & 20 deletions src/operators/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,92 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import EmptyObservable from '../observables/EmptyObservable';
import immediate from '../schedulers/immediate';
import Subscription from '../Subscription';

export default function repeat<T>(count: number = -1): Observable<T> {
return this.lift(new RepeatOperator(count, this));
if (count === 0) {
return EmptyObservable.create();
} else {
return this.lift(new RepeatOperator(count, this));
}
}

class RepeatOperator<T, R> implements Operator<T, R> {
constructor(private count: number, private original: Observable<T>) {
constructor(private count: number,
private source: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RepeatSubscriber(subscriber, this.count, this.original);
return new FirstRepeatSubscriber(subscriber, this.count, this.source);
}
}

class RepeatSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>, private count: number, private original: Observable<T>) {
super(destination);
this.invalidateRepeat();
class FirstRepeatSubscriber<T> extends Subscriber<T> {
private lastSubscription: Subscription<T>;

constructor(public destination: Subscriber<T>,
private count: number,
private source: Observable<T>) {
super(null);
if (count === 0) {
this.destination.complete();
super.unsubscribe();
}
this.lastSubscription = this;
}

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this.destination.error(err);
}

complete() {
if (!this.isUnsubscribed) {
this.resubscribe(this.count);
}
}

private repeatSubscription(): void {
let state = { dest: this.destination, count: this.count, original: this.original };
immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state);
unsubscribe() {
const lastSubscription = this.lastSubscription;
if (lastSubscription === this) {
super.unsubscribe();
} else {
lastSubscription.unsubscribe();
}
}

private invalidateRepeat(): Boolean {
let completed = this.count === 0;
if (completed) {
resubscribe(count: number) {
this.lastSubscription.unsubscribe();
if (count - 1 === 0) {
this.destination.complete();
} else {
const nextSubscriber = new MoreRepeatSubscriber(this, count - 1);
this.lastSubscription = this.source.subscribe(nextSubscriber);
}
return completed;
}
}

private static dispatchSubscription({ dest, count, original }): void {
return original.subscribe(new RepeatSubscriber(dest, count, original));
class MoreRepeatSubscriber<T> extends Subscriber<T> {
constructor(private parent: FirstRepeatSubscriber<T>,
private count: number) {
super(null);
}

_next(value: T) {
this.parent.destination.next(value);
}

_error(err: any) {
this.parent.destination.error(err);
}

_complete() {
if (!this.invalidateRepeat()) {
this.count--;
this.repeatSubscription();
}
const count = this.count;
this.parent.resubscribe(count < 0 ? -1 : count);
}
}

0 comments on commit 95c10c8

Please sign in to comment.