Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repeat: tests and fixes #562

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 70 additions & 7 deletions spec/operators/repeat-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,85 @@ var Observable = Rx.Observable;

describe('Observable.prototype.repeat()', function () {
it('should resubscribe count number of times', function () {
var e1 = cold('--a--b--|');
var e1 = cold('--a--b--| ');
var subs = ['^ ! ',
' ^ ! ',
' ^ !'];
var expected = '--a--b----a--b----a--b--|';

expectObservable(e1.repeat(3)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should resubscribe multiple times', function () {
var e1 = cold('--a--b--|');
var e1 = cold('--a--b--| ');
var subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
var expected = '--a--b----a--b----a--b----a--b--|';

expectObservable(e1.repeat(2).repeat(2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should complete without emit when count is zero', function () {
var e1 = cold('--a--b--|');
var subs = [];
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should emit source once when count is one', function () {
var e1 = cold('--a--b--|');
var subs = '^ !';
var expected = '--a--b--|';

expectObservable(e1.repeat(1)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should repeat until gets unsubscribed', function () {
var e1 = cold('--a--b--|');
var unsub = '--------------!';
var e1 = cold('--a--b--| ');
var subs = ['^ ! ',
' ^ !'];
var unsub = ' !';
var expected = '--a--b----a--b-';

expectObservable(e1.repeat(10), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should be able to repeat indefinitely until unsubscribed', function () {
var e1 = cold('--a--b--| ');
var subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
var unsub = ' !';
var expected = '--a--b----a--b----a--b----a--b----a--b----a--';

expectObservable(e1.repeat(), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should consider negative count as repeat indefinitely', function () {
var e1 = cold('--a--b--| ');
var subs = ['^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
var unsub = ' !';
var expected = '--a--b----a--b----a--b----a--b----a--b----a--';

expectObservable(e1.repeat(-1), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not complete when source never completes', function () {
Expand All @@ -48,30 +94,39 @@ describe('Observable.prototype.repeat()', function () {

it('should not complete when source does not completes', function () {
var e1 = cold('-');
var unsub = ' !';
var subs = '^ !';
var expected = '-';

expectObservable(e1.repeat(3)).toBe(expected);
expectObservable(e1.repeat(3), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should complete immediately when source does not complete withut emit but count is zero', function () {
it('should complete immediately when source does not complete without emit but count is zero', function () {
var e1 = cold('-');
var subs = [];
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should complete immediately when source does not complete but count is zero', function () {
var e1 = cold('--a--b--');
var subs = [];
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should emit source once and does not complete when source emits but does not complete', function () {
var e1 = cold('--a--b--');
var subs = ['^ '];
var expected = '--a--b--';

expectObservable(e1.repeat(3)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should complete when source is empty', function () {
Expand All @@ -82,24 +137,32 @@ describe('Observable.prototype.repeat()', function () {
});

it('should complete when source does not emit', function () {
var e1 = cold('----|');
var e1 = cold('----| ');
var subs = ['^ ! ',
' ^ ! ',
' ^ !'];
var expected = '------------|';

expectObservable(e1.repeat(3)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should complete immediately when source does not emit but count is zero', function () {
var e1 = cold('----|');
var subs = [];
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should raise error when source raises error', function () {
var e1 = cold('--a--b--#');
var subs = '^ !';
var expected = '--a--b--#';

expectObservable(e1.repeat(2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should raises error if source throws', function () {
Expand Down
84 changes: 64 additions & 20 deletions src/operators/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,92 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import EmptyObservable from '../observables/EmptyObservable';
import immediate from '../schedulers/immediate';
import Subscription from '../Subscription';

export default function repeat<T>(count: number = -1): Observable<T> {
return this.lift(new RepeatOperator(count, this));
if (count === 0) {
return EmptyObservable.create();
} else {
return this.lift(new RepeatOperator(count, this));
}
}

class RepeatOperator<T, R> implements Operator<T, R> {
constructor(private count: number, private original: Observable<T>) {
constructor(private count: number,
private source: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RepeatSubscriber(subscriber, this.count, this.original);
return new FirstRepeatSubscriber(subscriber, this.count, this.source);
}
}

class RepeatSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>, private count: number, private original: Observable<T>) {
super(destination);
this.invalidateRepeat();
class FirstRepeatSubscriber<T> extends Subscriber<T> {
private lastSubscription: Subscription<T>;

constructor(public destination: Subscriber<T>,
private count: number,
private source: Observable<T>) {
super(null);
if (count === 0) {
this.destination.complete();
super.unsubscribe();
}
this.lastSubscription = this;
}

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this.destination.error(err);
}

complete() {
if (!this.isUnsubscribed) {
this.resubscribe(this.count);
}
}

private repeatSubscription(): void {
let state = { dest: this.destination, count: this.count, original: this.original };
immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state);
unsubscribe() {
const lastSubscription = this.lastSubscription;
if (lastSubscription === this) {
super.unsubscribe();
} else {
lastSubscription.unsubscribe();
}
}

private invalidateRepeat(): Boolean {
let completed = this.count === 0;
if (completed) {
resubscribe(count: number) {
this.lastSubscription.unsubscribe();
if (count - 1 === 0) {
this.destination.complete();
} else {
const nextSubscriber = new MoreRepeatSubscriber(this, count - 1);
this.lastSubscription = this.source.subscribe(nextSubscriber);
}
return completed;
}
}

private static dispatchSubscription({ dest, count, original }): void {
return original.subscribe(new RepeatSubscriber(dest, count, original));
class MoreRepeatSubscriber<T> extends Subscriber<T> {
constructor(private parent: FirstRepeatSubscriber<T>,
private count: number) {
super(null);
}

_next(value: T) {
this.parent.destination.next(value);
}

_error(err: any) {
this.parent.destination.error(err);
}

_complete() {
if (!this.invalidateRepeat()) {
this.count--;
this.repeatSubscription();
}
const count = this.count;
this.parent.resubscribe(count < 0 ? -1 : count);
}
}