Skip to content

Commit

Permalink
feat(pluck): add pluck operator
Browse files Browse the repository at this point in the history
Pretty similar to RxJS 4 implementation.

Closes ReactiveX#1134
  • Loading branch information
luisgabriel committed Jan 12, 2016
1 parent 39e6c0e commit cee0eb4
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 1 deletion.
1 change: 0 additions & 1 deletion MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ enabling "composite" subscription behavior.
|`onErrorResumeNext`|`-`|
|`pausable`|`-`|
|`pausableBuffered`|`-`|
|`pluck`|`-`|
|`shareReplay`|`-`|
|`shareValue`|`-`|
|`selectConcatObserver` or `concatMapObserver`|`-`|
Expand Down
152 changes: 152 additions & 0 deletions spec/operators/pluck-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.pluck()', function () {
it('should work for one object', function () {
var a = cold('--x--|', {x: {prop: 42}});
var asubs = '^ !';
var expected = '--y--|';

var r = a.pluck('prop');
expectObservable(r).toBe(expected, {y: 42});
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work for multiple objects', function () {
var inputs = {
a: {prop: '1'},
b: {prop: '2'},
c: {prop: '3'},
d: {prop: '4'},
e: {prop: '5'},
};
var a = cold('--a-b--c-d---e-|', inputs);
var asubs = '^ !';
var expected = '--1-2--3-4---5-|';

var r = a.pluck('prop');
expectObservable(r).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work with deep nested properties', function () {
var inputs = {
a: {a: {b: {c: '1'}}},
b: {a: {b: {c: '2'}}},
c: {a: {b: {c: '3'}}},
d: {a: {b: {c: '4'}}},
e: {a: {b: {c: '5'}}},
};
var a = cold('--a-b--c-d---e-|', inputs);
var asubs = '^ !';
var expected = '--1-2--3-4---5-|';

var r = a.pluck('a', 'b', 'c');
expectObservable(r).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work with edge cases of deep nested properties', function () {
var inputs = {
a: {a: {b: {c: 1}}},
b: {a: {b: 2}},
c: {a: {c: {c: 3}}},
d: {},
e: {a: {b: {c: 5}}},
};
var a = cold('--a-b--c-d---e-|', inputs);
var asubs = '^ !';
var expected = '--r-x--y-z---w-|';
var values = {r: 1, x: undefined, y: undefined, z: undefined, w: 5};

var r = a.pluck('a', 'b', 'c');
expectObservable(r).toBe(expected, values);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should throw an error if not property is passed', function () {
expect(function () {
Observable.of({prop: 1}, {prop: 2}).pluck();
}).toThrow(new Error('List of properties cannot be empty.'));
});

it('should propagate errors from observable that emits only errors', function () {
var a = cold('#');
var asubs = '(^!)';
var expected = '#';

var r = a.pluck('whatever');
expectObservable(r).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should propagate errors from observable that emit values', function () {
var a = cold('--a--b--#', {a: {prop: '1'}, b: {prop: '2'}}, 'too bad');
var asubs = '^ !';
var expected = '--1--2--#';

var r = a.pluck('prop');
expectObservable(r).toBe(expected, undefined, 'too bad');
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should not pluck an empty observable', function () {
var a = cold('|');
var asubs = '(^!)';
var expected = '|';

var invoked = 0;
var r = a
.pluck('whatever')
.do(null, null, function () {
expect(invoked).toBe(0);
});

expectObservable(r).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should allow unsubscribing explicitly and early', function () {
var a = cold('--a--b--c--|', {a: {prop: '1'}, b: {prop: '2'}});
var unsub = ' ! ';
var asubs = '^ ! ';
var expected = '--1--2- ';

var r = a.pluck('prop');
expectObservable(r, unsub).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should pluck twice', function () {
var inputs = {
a: {a: {b: {c: '1'}}},
b: {a: {b: {c: '2'}}},
c: {a: {b: {c: '3'}}},
d: {a: {b: {c: '4'}}},
e: {a: {b: {c: '5'}}},
};
var a = cold('--a-b--c-d---e-|', inputs);
var asubs = '^ !';
var expected = '--1-2--3-4---5-|';

var r = a.pluck('a', 'b').pluck('c');
expectObservable(r).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should not break unsubscription chain when unsubscribed explicitly', function () {
var a = cold('--a--b--c--|', {a: {prop: '1'}, b: {prop: '2'}});
var unsub = ' ! ';
var asubs = '^ ! ';
var expected = '--1--2- ';

var r = a
.mergeMap(function (x) { return Observable.of(x); })
.pluck('prop')
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(r, unsub).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ export class Observable<T> implements CoreOperators<T> {
multicast: (subjectOrSubjectFactory: Subject<T>|(() => Subject<T>)) => ConnectableObservable<T>;
observeOn: (scheduler: Scheduler, delay?: number) => Observable<T>;
partition: (predicate: (x: T) => boolean) => Observable<T>[];
pluck: (...properties: string[]) => Observable<any>;
publish: () => ConnectableObservable<T>;
publishBehavior: (value: any) => ConnectableObservable<T>;
publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import './add/operator/multicast';
import './add/operator/observeOn';
import './add/operator/pairwise';
import './add/operator/partition';
import './add/operator/pluck';
import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import './add/operator/mergeMapTo';
import './add/operator/multicast';
import './add/operator/observeOn';
import './add/operator/partition';
import './add/operator/pluck';
import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
Expand Down
10 changes: 10 additions & 0 deletions src/add/operator/pluck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* Everything in this file is generated by the 'tools/generate-operator-patches.ts' script.
* Any manual edits to this file will be lost next time the script is run.
**/
import {Observable} from '../../Observable';
import {pluck} from '../../operator/pluck';

Observable.prototype.pluck = pluck;

export var _void: void;
35 changes: 35 additions & 0 deletions src/operator/pluck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {Observable} from '../Observable';
import {map} from './map';

/**
* Retrieves the value of a specified nested property from all elements in
* the Observable sequence. If a property can't be resolved, it will return
* `undefined` for that value.
*
* @param {...args} properties the nested properties to pluck
* @returns {Observable} Returns a new Observable sequence of property values
*/
export function pluck(...properties: string[]): Observable<any> {
const length = properties.length;
if (length === 0) {
throw new Error('List of properties cannot be empty.');
}
return map.call(this, plucker(properties, length));
}

function plucker(props: string[], length: number): (x: string) => any {
const mapper = (x: string) => {
let currentProp = x;
for (let i = 0; i < length; i++) {
const p = currentProp[props[i]];
if (typeof p !== 'undefined') {
currentProp = p;
} else {
return undefined;
}
}
return currentProp;
};

return mapper;
}

0 comments on commit cee0eb4

Please sign in to comment.