Skip to content

Commit 8473fe5

Browse files
committed
feat(repeat): add higher-order lettable version of repeat
1 parent 76668a9 commit 8473fe5

File tree

3 files changed

+67
-44
lines changed

3 files changed

+67
-44
lines changed

src/operator/repeat.ts

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
1+
32
import { Observable } from '../Observable';
4-
import { EmptyObservable } from '../observable/EmptyObservable';
5-
import { TeardownLogic } from '../Subscription';
3+
import { repeat as higherOrder } from '../operators/repeat';
64

75
/**
86
* Returns an Observable that repeats the stream of items emitted by the source Observable at most count times.
@@ -17,44 +15,5 @@ import { TeardownLogic } from '../Subscription';
1715
* @owner Observable
1816
*/
1917
export function repeat<T>(this: Observable<T>, count: number = -1): Observable<T> {
20-
if (count === 0) {
21-
return new EmptyObservable<T>();
22-
} else if (count < 0) {
23-
return this.lift(new RepeatOperator(-1, this));
24-
} else {
25-
return this.lift(new RepeatOperator(count - 1, this));
26-
}
27-
}
28-
29-
class RepeatOperator<T> implements Operator<T, T> {
30-
constructor(private count: number,
31-
private source: Observable<T>) {
32-
}
33-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
34-
return source.subscribe(new RepeatSubscriber(subscriber, this.count, this.source));
35-
}
36-
}
37-
38-
/**
39-
* We need this JSDoc comment for affecting ESDoc.
40-
* @ignore
41-
* @extends {Ignored}
42-
*/
43-
class RepeatSubscriber<T> extends Subscriber<T> {
44-
constructor(destination: Subscriber<any>,
45-
private count: number,
46-
private source: Observable<T>) {
47-
super(destination);
48-
}
49-
complete() {
50-
if (!this.isStopped) {
51-
const { source, count } = this;
52-
if (count === 0) {
53-
return super.complete();
54-
} else if (count > -1) {
55-
this.count = count - 1;
56-
}
57-
source.subscribe(this._unsubscribeAndRecycle());
58-
}
59-
}
18+
return higherOrder(count)(this);
6019
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export { publish } from './publish';
5353
export { publishBehavior } from './publishBehavior';
5454
export { race } from './race';
5555
export { reduce } from './reduce';
56+
export { repeat } from './repeat';
5657
export { refCount } from './refCount';
5758
export { scan } from './scan';
5859
export { subscribeOn } from './subscribeOn';

src/operators/repeat.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable } from '../Observable';
4+
import { EmptyObservable } from '../observable/EmptyObservable';
5+
import { TeardownLogic } from '../Subscription';
6+
import { MonoTypeOperatorFunction } from '../interfaces';
7+
8+
/**
9+
* Returns an Observable that repeats the stream of items emitted by the source Observable at most count times.
10+
*
11+
* <img src="./img/repeat.png" width="100%">
12+
*
13+
* @param {number} [count] The number of times the source Observable items are repeated, a count of 0 will yield
14+
* an empty Observable.
15+
* @return {Observable} An Observable that repeats the stream of items emitted by the source Observable at most
16+
* count times.
17+
* @method repeat
18+
* @owner Observable
19+
*/
20+
export function repeat<T>(count: number = -1): MonoTypeOperatorFunction<T> {
21+
return (source: Observable<T>) => {
22+
if (count === 0) {
23+
return new EmptyObservable<T>();
24+
} else if (count < 0) {
25+
return source.lift(new RepeatOperator(-1, source));
26+
} else {
27+
return source.lift(new RepeatOperator(count - 1, source));
28+
}
29+
};
30+
}
31+
32+
class RepeatOperator<T> implements Operator<T, T> {
33+
constructor(private count: number,
34+
private source: Observable<T>) {
35+
}
36+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
37+
return source.subscribe(new RepeatSubscriber(subscriber, this.count, this.source));
38+
}
39+
}
40+
41+
/**
42+
* We need this JSDoc comment for affecting ESDoc.
43+
* @ignore
44+
* @extends {Ignored}
45+
*/
46+
class RepeatSubscriber<T> extends Subscriber<T> {
47+
constructor(destination: Subscriber<any>,
48+
private count: number,
49+
private source: Observable<T>) {
50+
super(destination);
51+
}
52+
complete() {
53+
if (!this.isStopped) {
54+
const { source, count } = this;
55+
if (count === 0) {
56+
return super.complete();
57+
} else if (count > -1) {
58+
this.count = count - 1;
59+
}
60+
source.subscribe(this._unsubscribeAndRecycle());
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)