Skip to content

Commit

Permalink
feat(operator): add last operator
Browse files Browse the repository at this point in the history
- adds EmptyError error type
- adds ability to group values in marble tests
- adds last operator

closes #304
closes #306
  • Loading branch information
benlesh committed Sep 14, 2015
1 parent 13ada1f commit d841b11
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 3 deletions.
22 changes: 22 additions & 0 deletions spec/operators/last-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* globals describe, it, expect, expectObservable, hot, cold */
var Rx = require('../../dist/cjs/Rx');

describe('Observable.prototype.last()', function(){
it('should take the last value of an observable', function(){
var e1 = hot('--a--^--b--c--|');
var expected = '---------(c|)';
expectObservable(e1.last()).toBe(expected)
});

it('should error on empty', function() {
var e1 = hot('--a--^----|');
var expected = '-----#';
expectObservable(e1.last()).toBe(expected, null, new Rx.EmptyError());
});

it('should go on forever on never', function() {
var e2 = hot('--^---');
var expected = '----';
expectObservable(e2.last()).toBe(expected);
});
});
19 changes: 19 additions & 0 deletions spec/schedulers/TestScheduler-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ describe('TestScheduler', function() {
{ frame: 150, notification: Notification.createError('omg error!') }
]);
});

it('should default in the letter for the value if no value hash was passed', function(){
var result = TestScheduler.parseMarbles('--a--b--c--');
expect(result).toDeepEqual([
{ frame: 20, notification: Notification.createNext('a') },
{ frame: 50, notification: Notification.createNext('b') },
{ frame: 80, notification: Notification.createNext('c') },
])
});

it('should handle grouped values', function() {
var result = TestScheduler.parseMarbles('---(abc)---');
expect(result).toDeepEqual([
{ frame: 30, notification: Notification.createNext('a') },
{ frame: 30, notification: Notification.createNext('b') },
{ frame: 30, notification: Notification.createNext('c') }
]);
});

});

describe('createColdObservable()', function () {
Expand Down
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ export default class Observable<T> {
startWith: <T>(x: T) => Observable<T>;
debounce: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;

last: (predicate?: (value: T, index:number) => boolean, thisArg?: any, defaultValue?: any) => Observable<T>;

filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable<T>;
Expand Down
8 changes: 7 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Observable from './Observable';
import Subscriber from './Subscriber';
import Subscription from './Subscription';
import Notification from './Notification';
import EmptyError from './util/EmptyError';

import ReplaySubject from './subjects/ReplaySubject';
import BehaviorSubject from './subjects/BehaviorSubject';
Expand Down Expand Up @@ -215,6 +216,10 @@ import sampleTime from './operators/sampleTime';
observableProto.sample = sample;
observableProto.sampleTime = sampleTime;

import last from './operators/last';

observableProto.last = last;

var Scheduler = {
nextTick,
immediate
Expand All @@ -231,5 +236,6 @@ export {
ConnectableObservable,
Notification,
VirtualTimeScheduler,
TestScheduler
TestScheduler,
EmptyError
};
68 changes: 68 additions & 0 deletions src/operators/last.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import Observable from '../Observable';
import Operator from '../Operator';
import Subscriber from '../Subscriber';
import Observer from '../Observer';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';
import EmptyError from '../util/EmptyError';

export default function last<T>(predicate?: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any, defaultValue?: any) : Observable<T> {
return this.lift(new LastOperator(predicate, thisArg, defaultValue, this));
}

class LastOperator<T, R> implements Operator<T, R> {
constructor(private predicate?: (value: T, index: number, source:Observable<T>) => boolean, private thisArg?: any, private defaultValue?: any, private source?: Observable<T>) {

}

call(observer: Subscriber<R>): Subscriber<T> {
return new LastSubscriber(observer, this.predicate, this.thisArg, this.defaultValue, this.source);
}
}

class LastSubscriber<T> extends Subscriber<T> {
private lastValue: T;
private hasValue: boolean = false;
private predicate: Function;
private index: number = 0;

constructor(destination: Observer<T>, predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any, private defaultValue?: any, private source?: Observable<T>) {
super(destination);
if(typeof defaultValue !== 'undefined') {
this.lastValue = defaultValue;
this.hasValue = true;
}
if(typeof predicate === 'function') {
this.predicate = bindCallback(predicate, thisArg, 3);
}
}

_next(value: T) {
const predicate = this.predicate;
if(predicate) {
let result = tryCatch(predicate)(value, this.index++, this.source);
if(result === errorObject) {
this.destination.error(result.e);
} else if (result) {
this.lastValue = result;
this.hasValue = true;
}
} else {
this.lastValue = value;
this.hasValue = true;
}
}

_complete() {
const destination = this.destination;
if(this.hasValue) {
destination.next(this.lastValue);
destination.complete();
} else {
destination.error(new EmptyError);
}
}
}
13 changes: 11 additions & 2 deletions src/schedulers/TestScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ export default class TestScheduler extends VirtualTimeScheduler {
let results: ({ notification: Notification<any>, frame: number })[] = [];
let subIndex = marbles.indexOf('^');
let frameOffset = subIndex === -1 ? 0 : (subIndex * -10);
let getValue = typeof values !== 'object' ? (x) => x : (x) => values[x];
let groupStart = -1;

for (let i = 0; i < len; i++) {
let frame = i * 10;
Expand All @@ -84,6 +86,12 @@ export default class TestScheduler extends VirtualTimeScheduler {
switch (c) {
case '-':
break;
case '(':
groupStart = frame;
break;
case ')':
groupStart = -1;
break;
case '|':
notification = Notification.createComplete();
break;
Expand All @@ -93,14 +101,15 @@ export default class TestScheduler extends VirtualTimeScheduler {
notification = Notification.createError(errorValue || 'error');
break;
default:
notification = Notification.createNext(values[c]);
notification = Notification.createNext(getValue(c));
break;
}


frame += frameOffset;

if (notification) {
results.push({ notification, frame });
results.push({ notification, frame: groupStart > -1 ? groupStart : frame });
}
}
return results;
Expand Down
4 changes: 4 additions & 0 deletions src/util/EmptyError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export default class EmptyError implements Error {
name = 'EmptyError'
message = 'no elements in sequence';
}

0 comments on commit d841b11

Please sign in to comment.