Skip to content

Commit

Permalink
feat(operator): add sample and sampleTime
Browse files Browse the repository at this point in the history
- sample is equivalent to Rx2/3 sample(observable)
- sampleTime is equivalent to Rx2/3 sample(number, scheduler)

closes #178
  • Loading branch information
benlesh committed Sep 4, 2015
1 parent 4e79536 commit 9e62789
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 0 deletions.
26 changes: 26 additions & 0 deletions spec/operators/sample-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.sample', function () {
it('should get samples when the notifier emits', function (done) {
var expected = [1, 3, 5];
Observable.interval(100)
.sample(Observable.interval(220))
.take(3)
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
}, 2000);

it('should not complete when the notifier completes, nor should it emit', function (done) {
Observable.interval(100)
.sample(Observable.timer(220))
.subscribe(function (x) {
expect(x).toBe(1);
setTimeout(done, 500);
}, null, function () {
throw 'should not be called';
});
});
});
15 changes: 15 additions & 0 deletions spec/operators/sampleTime-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.sampleTime', function () {
it('should get samples on a delay', function (done) {
var expected = [1, 3, 5];
Observable.interval(100)
.sampleTime(220)
.take(3)
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
}, 2000);
});
3 changes: 3 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ export default class Observable<T> {
bufferTime: <T>(bufferTimeSpan: number, bufferCreationInterval?: number, scheduler?: Scheduler) => Observable<T[]>;
bufferCount: <T>(bufferSize: number, startBufferEvery: number) => Observable<T[]>;

sample: <T>(notifier: Observable<any>) => Observable<T>;
sampleTime: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;

finally: (ensure: () => void, thisArg?: any) => Observable<T>;
timeout: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
Expand Down
6 changes: 6 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ observableProto.bufferTime = bufferTime;
observableProto.bufferToggle = bufferToggle;
observableProto.bufferWhen = bufferWhen;

import sample from './operators/sample';
import sampleTime from './operators/sampleTime';

observableProto.sample = sample;
observableProto.sampleTime = sampleTime;

var Scheduler = {
nextTick,
immediate
Expand Down
64 changes: 64 additions & 0 deletions src/operators/sample.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Observable from '../Observable';
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';

export default function sample<T>(notifier: Observable<any>): Observable<T> {
return this.lift(new SampleOperator(notifier));
}

class SampleOperator<T, R> implements Operator<T, R> {
constructor(private notifier: Observable<any>) {
}

call(observer: Observer<R>) {
return new SampleSubscriber(observer, this.notifier);
}
}

class SampleSubscriber<T> extends Subscriber<T> {
private lastValue: T;
private hasValue: boolean = false;
private innerSubscription: Subscription<T>;

constructor(destination: Observer<T>, private notifier: Observable<any>) {
super(destination);
this.add(this.innerSubscription = notifier.subscribe(new SampleNoficationSubscriber(this)));
}

_next(value: T) {
this.lastValue = value;
this.hasValue = true;
}

notifyNext() {
if (this.hasValue) {
this.destination.next(this.lastValue);
}
}

notifyComplete() {
this.remove(this.innerSubscription);
this.innerSubscription.unsubscribe();
}
}


class SampleNoficationSubscriber<T> extends Subscriber<T> {
constructor(private parent: SampleSubscriber<T>) {
super(null);
}

_next() {
this.parent.notifyNext();
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
this.parent.notifyComplete();
}
}
46 changes: 46 additions & 0 deletions src/operators/sampleTime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Observable from '../Observable';
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';
import Scheduler from '../Scheduler';
import nextTick from '../schedulers/nextTick';

export default function sampleTime<T>(delay: number, scheduler: Scheduler = nextTick): Observable<T> {
return this.lift(new SampleTimeOperator(delay, scheduler));
}

class SampleTimeOperator<T, R> implements Operator<T, R> {
constructor(private delay: number, private scheduler: Scheduler) {
}

call(observer: Observer<R>) {
return new SampleTimeSubscriber(observer, this.delay, this.scheduler);
}
}

class SampleTimeSubscriber<T> extends Subscriber<T> {
lastValue: T;
hasValue: boolean = false;

constructor(destination: Observer<T>, private delay: number, private scheduler: Scheduler) {
super(destination);
this.add(scheduler.schedule(delay, { subscriber: this }, dispatchNotification));
}

_next(value: T) {
this.lastValue = value;
this.hasValue = true;
}

notifyNext() {
if (this.hasValue) {
this.destination.next(this.lastValue);
}
}
}

function dispatchNotification<T>(state: { subscriber: SampleTimeSubscriber<T> }) {
state.subscriber.notifyNext();
(<any>this).schedule(state);
}

0 comments on commit 9e62789

Please sign in to comment.