This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
sample.js
87 lines (76 loc) · 2.92 KB
/
sample.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
var SampleObservable = (function(__super__) {
inherits(SampleObservable, __super__);
function SampleObservable(source, sampler) {
this.source = source;
this._sampler = sampler;
__super__.call(this);
}
SampleObservable.prototype.subscribeCore = function (o) {
var state = {
o: o,
atEnd: false,
value: null,
hasValue: false,
sourceSubscription: new SingleAssignmentDisposable()
};
state.sourceSubscription.setDisposable(this.source.subscribe(new SampleSourceObserver(state)));
return new BinaryDisposable(
state.sourceSubscription,
this._sampler.subscribe(new SamplerObserver(state))
);
};
return SampleObservable;
}(ObservableBase));
var SamplerObserver = (function(__super__) {
inherits(SamplerObserver, __super__);
function SamplerObserver(s) {
this._s = s;
__super__.call(this);
}
SamplerObserver.prototype._handleMessage = function () {
if (this._s.hasValue) {
this._s.hasValue = false;
this._s.o.onNext(this._s.value);
}
this._s.atEnd && this._s.o.onCompleted();
};
SamplerObserver.prototype.next = function () { this._handleMessage(); };
SamplerObserver.prototype.error = function (e) { this._s.onError(e); };
SamplerObserver.prototype.completed = function () { this._handleMessage(); };
return SamplerObserver;
}(AbstractObserver));
var SampleSourceObserver = (function(__super__) {
inherits(SampleSourceObserver, __super__);
function SampleSourceObserver(s) {
this._s = s;
__super__.call(this);
}
SampleSourceObserver.prototype.next = function (x) {
this._s.hasValue = true;
this._s.value = x;
};
SampleSourceObserver.prototype.error = function (e) { this._s.o.onError(e); };
SampleSourceObserver.prototype.completed = function () {
this._s.atEnd = true;
this._s.sourceSubscription.dispose();
};
return SampleSourceObserver;
}(AbstractObserver));
/**
* Samples the observable sequence at each interval.
*
* @example
* 1 - res = source.sample(sampleObservable); // Sampler tick sequence
* 2 - res = source.sample(5000); // 5 seconds
* 2 - res = source.sample(5000, Rx.Scheduler.timeout); // 5 seconds
*
* @param {Mixed} intervalOrSampler Interval at which to sample (specified as an integer denoting milliseconds) or Sampler Observable.
* @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used.
* @returns {Observable} Sampled observable sequence.
*/
observableProto.sample = function (intervalOrSampler, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return typeof intervalOrSampler === 'number' ?
new SampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
new SampleObservable(this, intervalOrSampler);
};