diff --git a/spec/operators/repeat-spec.js b/spec/operators/repeat-spec.js index c0ff8b33f9..37e743a00f 100644 --- a/spec/operators/repeat-spec.js +++ b/spec/operators/repeat-spec.js @@ -4,39 +4,85 @@ var Observable = Rx.Observable; describe('Observable.prototype.repeat()', function () { it('should resubscribe count number of times', function () { - var e1 = cold('--a--b--|'); + var e1 = cold('--a--b--| '); + var subs = ['^ ! ', + ' ^ ! ', + ' ^ !']; var expected = '--a--b----a--b----a--b--|'; expectObservable(e1.repeat(3)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should resubscribe multiple times', function () { - var e1 = cold('--a--b--|'); + var e1 = cold('--a--b--| '); + var subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ !']; var expected = '--a--b----a--b----a--b----a--b--|'; expectObservable(e1.repeat(2).repeat(2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should complete without emit when count is zero', function () { var e1 = cold('--a--b--|'); + var subs = []; var expected = '|'; expectObservable(e1.repeat(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should emit source once when count is one', function () { var e1 = cold('--a--b--|'); + var subs = '^ !'; var expected = '--a--b--|'; expectObservable(e1.repeat(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should repeat until gets unsubscribed', function () { - var e1 = cold('--a--b--|'); - var unsub = '--------------!'; + var e1 = cold('--a--b--| '); + var subs = ['^ ! ', + ' ^ !']; + var unsub = ' !'; var expected = '--a--b----a--b-'; expectObservable(e1.repeat(10), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should be able to repeat indefinitely until unsubscribed', function () { + var e1 = cold('--a--b--| '); + var subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ !']; + var unsub = ' !'; + var expected = '--a--b----a--b----a--b----a--b----a--b----a--'; + + expectObservable(e1.repeat(), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should consider negative count as repeat indefinitely', function () { + var e1 = cold('--a--b--| '); + var subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ !']; + var unsub = ' !'; + var expected = '--a--b----a--b----a--b----a--b----a--b----a--'; + + expectObservable(e1.repeat(-1), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should not complete when source never completes', function () { @@ -48,30 +94,39 @@ describe('Observable.prototype.repeat()', function () { it('should not complete when source does not completes', function () { var e1 = cold('-'); + var unsub = ' !'; + var subs = '^ !'; var expected = '-'; - expectObservable(e1.repeat(3)).toBe(expected); + expectObservable(e1.repeat(3), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); - it('should complete immediately when source does not complete withut emit but count is zero', function () { + it('should complete immediately when source does not complete without emit but count is zero', function () { var e1 = cold('-'); + var subs = []; var expected = '|'; expectObservable(e1.repeat(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should complete immediately when source does not complete but count is zero', function () { var e1 = cold('--a--b--'); + var subs = []; var expected = '|'; expectObservable(e1.repeat(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should emit source once and does not complete when source emits but does not complete', function () { var e1 = cold('--a--b--'); + var subs = ['^ ']; var expected = '--a--b--'; expectObservable(e1.repeat(3)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should complete when source is empty', function () { @@ -82,24 +137,32 @@ describe('Observable.prototype.repeat()', function () { }); it('should complete when source does not emit', function () { - var e1 = cold('----|'); + var e1 = cold('----| '); + var subs = ['^ ! ', + ' ^ ! ', + ' ^ !']; var expected = '------------|'; expectObservable(e1.repeat(3)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should complete immediately when source does not emit but count is zero', function () { var e1 = cold('----|'); + var subs = []; var expected = '|'; expectObservable(e1.repeat(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should raise error when source raises error', function () { var e1 = cold('--a--b--#'); + var subs = '^ !'; var expected = '--a--b--#'; expectObservable(e1.repeat(2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should raises error if source throws', function () { diff --git a/src/operators/repeat.ts b/src/operators/repeat.ts index befff15e83..118fa21d3f 100644 --- a/src/operators/repeat.ts +++ b/src/operators/repeat.ts @@ -2,48 +2,92 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; +import EmptyObservable from '../observables/EmptyObservable'; import immediate from '../schedulers/immediate'; +import Subscription from '../Subscription'; export default function repeat(count: number = -1): Observable { - return this.lift(new RepeatOperator(count, this)); + if (count === 0) { + return EmptyObservable.create(); + } else { + return this.lift(new RepeatOperator(count, this)); + } } class RepeatOperator implements Operator { - constructor(private count: number, private original: Observable) { + constructor(private count: number, + private source: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RepeatSubscriber(subscriber, this.count, this.original); + return new FirstRepeatSubscriber(subscriber, this.count, this.source); } } -class RepeatSubscriber extends Subscriber { - constructor(destination: Observer, private count: number, private original: Observable) { - super(destination); - this.invalidateRepeat(); +class FirstRepeatSubscriber extends Subscriber { + private lastSubscription: Subscription; + + constructor(public destination: Subscriber, + private count: number, + private source: Observable) { + super(null); + if (count === 0) { + this.destination.complete(); + super.unsubscribe(); + } + this.lastSubscription = this; + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this.destination.error(err); + } + + complete() { + if (!this.isUnsubscribed) { + this.resubscribe(this.count); + } } - private repeatSubscription(): void { - let state = { dest: this.destination, count: this.count, original: this.original }; - immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state); + unsubscribe() { + const lastSubscription = this.lastSubscription; + if (lastSubscription === this) { + super.unsubscribe(); + } else { + lastSubscription.unsubscribe(); + } } - private invalidateRepeat(): Boolean { - let completed = this.count === 0; - if (completed) { + resubscribe(count: number) { + this.lastSubscription.unsubscribe(); + if (count - 1 === 0) { this.destination.complete(); + } else { + const nextSubscriber = new MoreRepeatSubscriber(this, count - 1); + this.lastSubscription = this.source.subscribe(nextSubscriber); } - return completed; } +} - private static dispatchSubscription({ dest, count, original }): void { - return original.subscribe(new RepeatSubscriber(dest, count, original)); +class MoreRepeatSubscriber extends Subscriber { + constructor(private parent: FirstRepeatSubscriber, + private count: number) { + super(null); + } + + _next(value: T) { + this.parent.destination.next(value); + } + + _error(err: any) { + this.parent.destination.error(err); } _complete() { - if (!this.invalidateRepeat()) { - this.count--; - this.repeatSubscription(); - } + const count = this.count; + this.parent.resubscribe(count < 0 ? -1 : count); } }