Skip to content

Commit e93bffc

Browse files
committed
feat(sample): readd sample operator
Add an implementation of `sample` that behaves like RxJS 4 BREAKING CHANGE: `sample` behavior returned to RxJS 4 behavior
1 parent 0f9d45b commit e93bffc

File tree

7 files changed

+245
-0
lines changed

7 files changed

+245
-0
lines changed

spec/operators/sample-spec.js

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/* globals describe, it, expect, hot, expectObservable,, expectSubscriptions */
2+
var Rx = require('../../dist/cjs/Rx.KitchenSink');
3+
var Observable = Rx.Observable;
4+
5+
describe('Observable.prototype.sample', function () {
6+
it('should get samples when the notifier emits', function () {
7+
var e1 = hot('----a-^--b----c----d----e----f----| ');
8+
var e1subs = '^ ! ';
9+
var e2 = hot( '-----x----------x----------x----------|');
10+
var e2subs = '^ ! ';
11+
var expected = '-----b----------d----------f| ';
12+
13+
expectObservable(e1.sample(e2)).toBe(expected);
14+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
15+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
16+
});
17+
18+
it('should sample nothing if source has not nexted yet', function () {
19+
var e1 = hot('----a-^-------b----|');
20+
var e1subs = '^ !';
21+
var e2 = hot( '-----x-------|');
22+
var e2subs = '^ !';
23+
var expected = '-------------|';
24+
25+
expectObservable(e1.sample(e2)).toBe(expected);
26+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
27+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
28+
});
29+
30+
it('should not complete when the notifier completes, nor should it emit', function () {
31+
var e1 = hot('----a----b----c----d----e----f----');
32+
var e1subs = '^ ';
33+
var e2 = hot('------x-| ');
34+
var e2subs = '^ ! ';
35+
var expected = '------a---------------------------';
36+
37+
expectObservable(e1.sample(e2)).toBe(expected);
38+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
39+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
40+
});
41+
42+
it('should complete only when the source completes, if notifier completes early', function () {
43+
var e1 = hot('----a----b----c----d----e----f---|');
44+
var e1subs = '^ !';
45+
var e2 = hot('------x-| ');
46+
var e2subs = '^ ! ';
47+
var expected = '------a--------------------------|';
48+
49+
expectObservable(e1.sample(e2)).toBe(expected);
50+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
51+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
52+
});
53+
54+
it('should allow unsubscribing explicitly and early', function () {
55+
var e1 = hot('----a-^--b----c----d----e----f----| ');
56+
var unsub = ' ! ';
57+
var e1subs = '^ ! ';
58+
var e2 = hot( '-----x----------x----------x----------|');
59+
var e2subs = '^ ! ';
60+
var expected = '-----b--------- ';
61+
62+
expectObservable(e1.sample(e2), unsub).toBe(expected);
63+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
64+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
65+
});
66+
67+
it('should not break unsubscription chains when result is unsubscribed explicitly', function () {
68+
var e1 = hot('----a-^--b----c----d----e----f----| ');
69+
var e1subs = '^ ! ';
70+
var e2 = hot( '-----x----------x----------x----------|');
71+
var e2subs = '^ ! ';
72+
var expected = '-----b--------- ';
73+
var unsub = ' ! ';
74+
75+
var result = e1
76+
.mergeMap(function (x) { return Observable.of(x); })
77+
.sample(e2)
78+
.mergeMap(function (x) { return Observable.of(x); });
79+
80+
expectObservable(result, unsub).toBe(expected);
81+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
82+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
83+
});
84+
85+
it('should only sample when a new value arrives, even if it is the same value', function () {
86+
var e1 = hot('----a----b----c----c----e----f----| ');
87+
var e1subs = '^ ! ';
88+
var e2 = hot('------x-x------xx-x---x----x--------|');
89+
var e2subs = '^ ! ';
90+
var expected = '------a--------c------c----e------| ';
91+
92+
expectObservable(e1.sample(e2)).toBe(expected);
93+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
94+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
95+
});
96+
97+
it('should raise error if source raises error', function () {
98+
var e1 = hot('----a-^--b----c----d----# ');
99+
var e1subs = '^ ! ';
100+
var e2 = hot( '-----x----------x----------x----------|');
101+
var e2subs = '^ ! ';
102+
var expected = '-----b----------d-# ';
103+
104+
expectObservable(e1.sample(e2)).toBe(expected);
105+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
106+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
107+
});
108+
109+
it('should completes if source does not emits', function () {
110+
var e1 = hot('|');
111+
var e2 = hot('------x-------|');
112+
var expected = '|';
113+
var e1subs = '(^!)';
114+
var e2subs = '(^!)';
115+
116+
expectObservable(e1.sample(e2)).toBe(expected);
117+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
118+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
119+
});
120+
121+
it('should raise error if source throws immediately', function () {
122+
var e1 = hot('#');
123+
var e2 = hot('------x-------|');
124+
var expected = '#';
125+
var e1subs = '(^!)';
126+
var e2subs = '(^!)';
127+
128+
expectObservable(e1.sample(e2)).toBe(expected);
129+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
130+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
131+
});
132+
133+
it('should raise error if notification raises error', function () {
134+
var e1 = hot('--a-----|');
135+
var e2 = hot('----#');
136+
var expected = '----#';
137+
var e1subs = '^ !';
138+
var e2subs = '^ !';
139+
140+
expectObservable(e1.sample(e2)).toBe(expected);
141+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
142+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
143+
});
144+
145+
it('should not completes if source does not complete', function () {
146+
var e1 = hot('-');
147+
var e1subs = '^ ';
148+
var e2 = hot('------x-------|');
149+
var e2subs = '^ !';
150+
var expected = '-';
151+
152+
expectObservable(e1.sample(e2)).toBe(expected);
153+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
154+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
155+
});
156+
157+
it('should sample only until source completes', function () {
158+
var e1 = hot('----a----b----c----d-|');
159+
var e1subs = '^ !';
160+
var e2 = hot('-----------x----------x------------|');
161+
var e2subs = '^ !';
162+
var expected = '-----------b---------|';
163+
164+
expectObservable(e1.sample(e2)).toBe(expected);
165+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
166+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
167+
});
168+
169+
it('should complete sampling if sample observable completes', function () {
170+
var e1 = hot('----a----b----c----d-|');
171+
var e1subs = '^ !';
172+
var e2 = hot('|');
173+
var e2subs = '(^!)';
174+
var expected = '---------------------|';
175+
176+
expectObservable(e1.sample(e2)).toBe(expected);
177+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
178+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
179+
});
180+
});

src/CoreOperators.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ export interface CoreOperators<T> {
6262
repeat?: (count?: number) => Observable<T>;
6363
retry?: (count?: number) => Observable<T>;
6464
retryWhen?: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
65+
sample?: (notifier: Observable<any>) => Observable<T>;
6566
scan?: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
6667
share?: () => Observable<T>;
6768
single?: (predicate?: (value: T, index: number) => boolean) => Observable<T>;

src/Observable.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ export class Observable<T> implements CoreOperators<T> {
246246
repeat: (count?: number) => Observable<T>;
247247
retry: (count?: number) => Observable<T>;
248248
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
249+
sample: (notifier: Observable<any>) => Observable<T>;
249250
scan: <R>(accumulator: (acc: R, x: T) => R, seed?: T | R) => Observable<R>;
250251
share: () => Observable<T>;
251252
single: (predicate?: (value: T, index: number) => boolean) => Observable<T>;

src/Rx.KitchenSink.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ import './add/operator/reduce';
102102
import './add/operator/repeat';
103103
import './add/operator/retry';
104104
import './add/operator/retryWhen';
105+
import './add/operator/sample';
105106
import './add/operator/scan';
106107
import './add/operator/share';
107108
import './add/operator/single';

src/Rx.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import './add/operator/reduce';
7474
import './add/operator/repeat';
7575
import './add/operator/retry';
7676
import './add/operator/retryWhen';
77+
import './add/operator/sample';
7778
import './add/operator/scan';
7879
import './add/operator/share';
7980
import './add/operator/single';

src/add/operator/sample.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import {Observable} from '../../Observable';
2+
import {sample} from '../../operator/sample';
3+
Observable.prototype.sample = sample;
4+
5+
export var _void: void;

src/operator/sample.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import {Observable} from '../Observable';
2+
import {Operator} from '../Operator';
3+
import {Subscriber} from '../Subscriber';
4+
5+
export function sample<T>(notifier: Observable<any>): Observable<T> {
6+
return this.lift(new SampleOperator(notifier));
7+
}
8+
9+
class SampleOperator<T, R> implements Operator<T, R> {
10+
constructor(private notifier: Observable<any>) {
11+
}
12+
13+
call(subscriber: Subscriber<R>) {
14+
return new SampleSubscriber(subscriber, this.notifier);
15+
}
16+
}
17+
18+
class SampleSubscriber<T> extends Subscriber<T> {
19+
private lastValue: T;
20+
private hasValue: boolean = false;
21+
22+
constructor(destination: Subscriber<T>, private notifier: Observable<any>) {
23+
super(destination);
24+
this.add(notifier._subscribe(new SampleNotificationSubscriber(this)));
25+
}
26+
27+
_next(value: T) {
28+
this.lastValue = value;
29+
this.hasValue = true;
30+
}
31+
32+
notifyNext() {
33+
if (this.hasValue) {
34+
this.hasValue = false;
35+
this.destination.next(this.lastValue);
36+
}
37+
}
38+
}
39+
40+
class SampleNotificationSubscriber<T> extends Subscriber<T> {
41+
constructor(private parent: SampleSubscriber<T>) {
42+
super(null);
43+
}
44+
45+
_next() {
46+
this.parent.notifyNext();
47+
}
48+
49+
_error(err: any) {
50+
this.parent.error(err);
51+
}
52+
53+
_complete() {
54+
//noop
55+
}
56+
}

0 commit comments

Comments
 (0)