Skip to content

Commit

Permalink
#143 Make Kefir.combine() also accept sources as objects (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
32bitkid authored and rpominov committed Dec 31, 2016
1 parent 813e337 commit 0671f0c
Show file tree
Hide file tree
Showing 4 changed files with 487 additions and 169 deletions.
35 changes: 34 additions & 1 deletion docs-src/descriptions/multiple-sources.jade
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,40 @@ pre(title='events in time').
result: ------•--•-•X
9 12 14

div

p.
Also, #[b combine] supports passing objects as both #[b obss] #[i and] #[b passiveObss].
The #[b combinator] function will then be called with a single argument, a new object with
the latest value from each observable. If no #[b combinator] is provided, it emits
the object containing latest values.

pre.javascript(title='example')
:escapehtml
var aStream = Kefir.sequentially(100, [1, 3]);
var bStream = Kefir.sequentially(100, [2, 4]).delay(40);

var result = Kefir.combine({ a: aStream, b: bStream });
result.log();

pre(title='console output')
:escapehtml
> [combine] <value> { a: 1, b: 2 }
> [combine] <value> { a: 3, b: 2 }
> [combine] <value> { a: 3, b: 4 }
> [combine] <end>

pre(title='events in time').
a: ----1----3X
b: ------2----4X

result: ------•--•-•X

p.
#[img(data-emoji="point_up")] If there are duplicate keys in both #[b obss]
#[i and] #[b passiveObss], only the latest values from #[b obss] will appear
in the combined object for the duplicated keys.

p.
The result stream emits a value only when it has at least one value from each of source observables.
Ends when all the active source observables (#[b obss] array) end.
Expand All @@ -70,7 +104,6 @@ p.




+descr-method('zip', 'zip', 'Kefir.zip(sources, [combinator])', 'obs.zip(otherObs, [combinator])').
Creates a stream with values from #[b sources]
lined up with each other. For example if you have two sources with values
Expand Down
3 changes: 3 additions & 0 deletions kefir.js.flow
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ declare var Kefir: {

combine<E>(obss: Observable<any,E>[], combinator?: Function): Observable<any,E>;
combine<E>(obss: Observable<any,E>[], passiveObss?: Observable<any,E>[], combinator?: Function): Observable<any,E>;
combine<E>(obss: {[key:string]:Observable<any,E>}, combinator?: Function): Observable<any,E>;
combine<E>(obss: {[key:string]:Observable<any,E>}, passiveObss?: {[key:string]:Observable<any,E>}, combinator?: Function): Observable<any,E>;

zip<V,E>(obss: Observable<V,E>[]): Observable<Array<V>,E>;
zip<E>(obss: Observable<any,E>[], combinator: Function): Observable<any,E>;
merge<V,E>(obss: Observable<V,E>[]): Observable<V,E>;
Expand Down
49 changes: 44 additions & 5 deletions src/many-sources/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ import {concat, fillArray} from '../utils/collections';
import {spread} from '../utils/functions';
import never from '../primary/never';


function collect(source, keys, values) {
for (var prop in source) {
if( source.hasOwnProperty( prop ) ) {
keys.push(prop);
values.push(source[prop]);
}
}
}

function defaultErrorsCombinator(errors) {
let latestError;
Expand All @@ -23,7 +30,7 @@ function Combine(active, passive, combinator) {
Stream.call(this);
this._activeCount = active.length;
this._sources = concat(active, passive);
this._combinator = combinator ? spread(combinator, this._sources.length) : (x => x);
this._combinator = combinator;
this._aliveCount = 0;
this._latestValues = new Array(this._sources.length);
this._latestErrors = new Array(this._sources.length);
Expand Down Expand Up @@ -153,11 +160,43 @@ inherit(Combine, Stream, {

});

function combineAsArray(active, passive = [], combinator) {
if (!Array.isArray(passive)) {
throw new Error('Combine can only combine active and passive collections of the same type.');
}

combinator = combinator ? spread(combinator, active.length + passive.length) : (x => x);
return active.length === 0 ? never() : new Combine(active, passive, combinator);
}

function combineAsObject(active, passive = {}, combinator) {
if (typeof passive !== 'object' || Array.isArray(passive)) {
throw new Error('Combine can only combine active and passive collections of the same type.');
}

export default function combine(active, passive = [], combinator) {
let keys = [],
activeObservables = [],
passiveObservables = [];

collect(active, keys, activeObservables);
collect(passive, keys, passiveObservables);

const objectify = values => {
let event = {};
for(let i = values.length - 1; 0 <= i; i--) {
event[keys[i]] = values[i];
}
return combinator ? combinator(event) : event;
}

return activeObservables.length === 0 ? never() : new Combine(activeObservables, passiveObservables, objectify);
}

export default function combine(active, passive, combinator) {
if (typeof passive === 'function') {
combinator = passive;
passive = [];
passive = undefined;
}
return active.length === 0 ? never() : new Combine(active, passive, combinator);

return Array.isArray(active) ? combineAsArray(active, passive, combinator) : combineAsObject(active, passive, combinator);
}
Loading

0 comments on commit 0671f0c

Please sign in to comment.