From 0671f0ce12e2dc5d3a6154b138ab90ba8cdf57dc Mon Sep 17 00:00:00 2001 From: "J. Holmes" <32bitkid@gmail.com> Date: Sat, 31 Dec 2016 08:25:34 -0600 Subject: [PATCH] #143 Make Kefir.combine() also accept sources as objects (#225) --- docs-src/descriptions/multiple-sources.jade | 35 +- kefir.js.flow | 3 + src/many-sources/combine.js | 49 +- test/specs/combine.coffee | 569 ++++++++++++++------ 4 files changed, 487 insertions(+), 169 deletions(-) diff --git a/docs-src/descriptions/multiple-sources.jade b/docs-src/descriptions/multiple-sources.jade index 8e20d7d1..f9f14769 100644 --- a/docs-src/descriptions/multiple-sources.jade +++ b/docs-src/descriptions/multiple-sources.jade @@ -61,6 +61,40 @@ pre(title='events in time'). result: ------•--•-•X 9 12 14 +div + +p. + Also, #[b combine] supports passing objects as both #[b obss] #[i and] #[b passiveObss]. + The #[b combinator] function will then be called with a single argument, a new object with + the latest value from each observable. If no #[b combinator] is provided, it emits + the object containing latest values. + +pre.javascript(title='example') + :escapehtml + var aStream = Kefir.sequentially(100, [1, 3]); + var bStream = Kefir.sequentially(100, [2, 4]).delay(40); + + var result = Kefir.combine({ a: aStream, b: bStream }); + result.log(); + +pre(title='console output') + :escapehtml + > [combine] { a: 1, b: 2 } + > [combine] { a: 3, b: 2 } + > [combine] { a: 3, b: 4 } + > [combine] + +pre(title='events in time'). + a: ----1----3X + b: ------2----4X + + result: ------•--•-•X + +p. + #[img(data-emoji="point_up")] If there are duplicate keys in both #[b obss] + #[i and] #[b passiveObss], only the latest values from #[b obss] will appear + in the combined object for the duplicated keys. + p. The result stream emits a value only when it has at least one value from each of source observables. Ends when all the active source observables (#[b obss] array) end. @@ -70,7 +104,6 @@ p. - +descr-method('zip', 'zip', 'Kefir.zip(sources, [combinator])', 'obs.zip(otherObs, [combinator])'). Creates a stream with values from #[b sources] lined up with each other. For example if you have two sources with values diff --git a/kefir.js.flow b/kefir.js.flow index d89c6794..b288abec 100644 --- a/kefir.js.flow +++ b/kefir.js.flow @@ -151,6 +151,9 @@ declare var Kefir: { combine(obss: Observable[], combinator?: Function): Observable; combine(obss: Observable[], passiveObss?: Observable[], combinator?: Function): Observable; + combine(obss: {[key:string]:Observable}, combinator?: Function): Observable; + combine(obss: {[key:string]:Observable}, passiveObss?: {[key:string]:Observable}, combinator?: Function): Observable; + zip(obss: Observable[]): Observable,E>; zip(obss: Observable[], combinator: Function): Observable; merge(obss: Observable[]): Observable; diff --git a/src/many-sources/combine.js b/src/many-sources/combine.js index 840fb392..1eb7025f 100644 --- a/src/many-sources/combine.js +++ b/src/many-sources/combine.js @@ -5,7 +5,14 @@ import {concat, fillArray} from '../utils/collections'; import {spread} from '../utils/functions'; import never from '../primary/never'; - +function collect(source, keys, values) { + for (var prop in source) { + if( source.hasOwnProperty( prop ) ) { + keys.push(prop); + values.push(source[prop]); + } + } +} function defaultErrorsCombinator(errors) { let latestError; @@ -23,7 +30,7 @@ function Combine(active, passive, combinator) { Stream.call(this); this._activeCount = active.length; this._sources = concat(active, passive); - this._combinator = combinator ? spread(combinator, this._sources.length) : (x => x); + this._combinator = combinator; this._aliveCount = 0; this._latestValues = new Array(this._sources.length); this._latestErrors = new Array(this._sources.length); @@ -153,11 +160,43 @@ inherit(Combine, Stream, { }); +function combineAsArray(active, passive = [], combinator) { + if (!Array.isArray(passive)) { + throw new Error('Combine can only combine active and passive collections of the same type.'); + } + + combinator = combinator ? spread(combinator, active.length + passive.length) : (x => x); + return active.length === 0 ? never() : new Combine(active, passive, combinator); +} + +function combineAsObject(active, passive = {}, combinator) { + if (typeof passive !== 'object' || Array.isArray(passive)) { + throw new Error('Combine can only combine active and passive collections of the same type.'); + } -export default function combine(active, passive = [], combinator) { + let keys = [], + activeObservables = [], + passiveObservables = []; + + collect(active, keys, activeObservables); + collect(passive, keys, passiveObservables); + + const objectify = values => { + let event = {}; + for(let i = values.length - 1; 0 <= i; i--) { + event[keys[i]] = values[i]; + } + return combinator ? combinator(event) : event; + } + + return activeObservables.length === 0 ? never() : new Combine(activeObservables, passiveObservables, objectify); +} + +export default function combine(active, passive, combinator) { if (typeof passive === 'function') { combinator = passive; - passive = []; + passive = undefined; } - return active.length === 0 ? never() : new Combine(active, passive, combinator); + + return Array.isArray(active) ? combineAsArray(active, passive, combinator) : combineAsObject(active, passive, combinator); } diff --git a/test/specs/combine.coffee b/test/specs/combine.coffee index 283edc04..c6f0c7cd 100644 --- a/test/specs/combine.coffee +++ b/test/specs/combine.coffee @@ -2,218 +2,461 @@ describe 'combine', -> - - it 'should return stream', -> - expect(Kefir.combine([])).toBeStream() - expect(Kefir.combine([stream(), prop()])).toBeStream() - expect(stream().combine(stream())).toBeStream() - expect(prop().combine(prop())).toBeStream() - - it 'should be ended if empty array provided', -> - expect(Kefir.combine([])).toEmit [''] - - it 'should be ended if array of ended observables provided', -> - a = send(stream(), ['']) - b = send(prop(), ['']) - c = send(stream(), ['']) - expect(Kefir.combine([a, b, c])).toEmit [''] - expect(a.combine(b)).toEmit [''] - - it 'should be ended and has current if array of ended properties provided and each of them has current', -> - a = send(prop(), [1, '']) - b = send(prop(), [2, '']) - c = send(prop(), [3, '']) - expect(Kefir.combine([a, b, c])).toEmit [{current: [1, 2, 3]}, ''] - expect(a.combine(b)).toEmit [{current: [1, 2]}, ''] - - it 'should activate sources', -> - a = stream() - b = prop() - c = stream() - expect(Kefir.combine([a, b, c])).toActivate(a, b, c) - expect(a.combine(b)).toActivate(a, b) - - it 'should handle events and current from observables', -> - a = stream() - b = send(prop(), [0]) - c = stream() - expect(Kefir.combine([a, b, c])).toEmit [[1, 0, 2], [1, 3, 2], [1, 4, 2], [1, 4, 5], [1, 4, 6], ''], -> - send(a, [1]) - send(c, [2]) - send(b, [3]) - send(a, ['']) - send(b, [4, '']) - send(c, [5, 6, '']) - a = stream() - b = send(prop(), [0]) - expect(a.combine(b)).toEmit [[1, 0], [1, 2], [1, 3], ''], -> - send(a, [1]) - send(b, [2]) - send(a, ['']) - send(b, [3, '']) - - it 'should accept optional combinator function', -> - a = stream() - b = send(prop(), [0]) - c = stream() - join = (args...) -> args.join('+') - expect(Kefir.combine([a, b, c], join)).toEmit ['1+0+2', '1+3+2', '1+4+2', '1+4+5', '1+4+6', ''], -> - send(a, [1]) - send(c, [2]) - send(b, [3]) - send(a, ['']) - send(b, [4, '']) - send(c, [5, 6, '']) - a = stream() - b = send(prop(), [0]) - expect(a.combine(b, join)).toEmit ['1+0', '1+2', '1+3', ''], -> - send(a, [1]) - send(b, [2]) - send(a, ['']) - send(b, [3, '']) - - it 'when activating second time and has 2+ properties in sources, should emit current value at most once', -> - a = send(prop(), [0]) - b = send(prop(), [1]) - cb = Kefir.combine([a, b]) - activate(cb) - deactivate(cb) - expect(cb).toEmit [{current: [0, 1]}] - - it 'errors should flow', -> - a = stream() - b = prop() - c = stream() - expect(Kefir.combine([a, b, c])).errorsToFlow(a) - a = stream() - b = prop() - c = stream() - expect(Kefir.combine([a, b, c])).errorsToFlow(b) - a = stream() - b = prop() - c = stream() - expect(Kefir.combine([a, b, c])).errorsToFlow(c) - - it 'should handle errors correctly', -> - # a: ---e---v---v----- - # 1 - # b: ----v---e----v--- - # 2 - # c: -----v---e--v---- - # 3 - # result: ---eee-vee-eev-- - # 111 23 32 - - a = stream() - b = stream() - c = stream() - expect(Kefir.combine([a, b, c])).toEmit [ - {error: -1}, - {error: -1}, - {error: -1}, - [3, 1, 2], - {error: -2}, - {error: -3}, - {error: -3}, - {error: -2}, - [4, 6, 5] - ], -> - send(a, [{error: -1}]) - send(b, [1]) - send(c, [2]) - send(a, [3]) - send(b, [{error: -2}]) - send(c, [{error: -3}]) - send(a, [4]) - send(c, [5]) - send(b, [6]) - - - - describe 'sampledBy functionality (3 arity combine)', -> + describe 'array', -> it 'should return stream', -> - expect(Kefir.combine([], [])).toBeStream() - expect(Kefir.combine([stream(), prop()], [stream(), prop()])).toBeStream() + expect(Kefir.combine([])).toBeStream() + expect(Kefir.combine([stream(), prop()])).toBeStream() + expect(stream().combine(stream())).toBeStream() + expect(prop().combine(prop())).toBeStream() it 'should be ended if empty array provided', -> - expect(Kefir.combine([stream(), prop()], [])).toEmit [] - expect(Kefir.combine([], [stream(), prop()])).toEmit [''] + expect(Kefir.combine([])).toEmit [''] it 'should be ended if array of ended observables provided', -> a = send(stream(), ['']) b = send(prop(), ['']) c = send(stream(), ['']) - expect(Kefir.combine([a, b, c], [stream(), prop()])).toEmit [''] + expect(Kefir.combine([a, b, c])).toEmit [''] + expect(a.combine(b)).toEmit [''] - it 'should be ended and emmit current (once) if array of ended properties provided and each of them has current', -> + it 'should be ended and has current if array of ended properties provided and each of them has current', -> a = send(prop(), [1, '']) b = send(prop(), [2, '']) c = send(prop(), [3, '']) - s1 = Kefir.combine([a, b], [c]) - expect(s1).toEmit [{current: [1, 2, 3]}, ''] - expect(s1).toEmit [''] + expect(Kefir.combine([a, b, c])).toEmit [{current: [1, 2, 3]}, ''] + expect(a.combine(b)).toEmit [{current: [1, 2]}, ''] it 'should activate sources', -> a = stream() b = prop() c = stream() - expect(Kefir.combine([a, b], [c])).toActivate(a, b, c) + expect(Kefir.combine([a, b, c])).toActivate(a, b, c) + expect(a.combine(b)).toActivate(a, b) it 'should handle events and current from observables', -> a = stream() b = send(prop(), [0]) c = stream() - d = stream() - expect(Kefir.combine([c, d], [a, b])).toEmit [[2, 3, 1, 0], [5, 3, 1, 4], [6, 3, 1, 4], [6, 7, 1, 4], ''], -> + expect(Kefir.combine([a, b, c])).toEmit [[1, 0, 2], [1, 3, 2], [1, 4, 2], [1, 4, 5], [1, 4, 6], ''], -> send(a, [1]) send(c, [2]) - send(d, [3]) + send(b, [3]) + send(a, ['']) send(b, [4, '']) send(c, [5, 6, '']) - send(d, [7, '']) - + a = stream() + b = send(prop(), [0]) + expect(a.combine(b)).toEmit [[1, 0], [1, 2], [1, 3], ''], -> + send(a, [1]) + send(b, [2]) + send(a, ['']) + send(b, [3, '']) it 'should accept optional combinator function', -> + a = stream() + b = send(prop(), [0]) + c = stream() join = (args...) -> args.join('+') + expect(Kefir.combine([a, b, c], join)).toEmit ['1+0+2', '1+3+2', '1+4+2', '1+4+5', '1+4+6', ''], -> + send(a, [1]) + send(c, [2]) + send(b, [3]) + send(a, ['']) + send(b, [4, '']) + send(c, [5, 6, '']) + a = stream() + b = send(prop(), [0]) + expect(a.combine(b, join)).toEmit ['1+0', '1+2', '1+3', ''], -> + send(a, [1]) + send(b, [2]) + send(a, ['']) + send(b, [3, '']) + + it 'when activating second time and has 2+ properties in sources, should emit current value at most once', -> + a = send(prop(), [0]) + b = send(prop(), [1]) + cb = Kefir.combine([a, b]) + activate(cb) + deactivate(cb) + expect(cb).toEmit [{current: [0, 1]}] + + it 'errors should flow', -> + a = stream() + b = prop() + c = stream() + expect(Kefir.combine([a, b, c])).errorsToFlow(a) + a = stream() + b = prop() + c = stream() + expect(Kefir.combine([a, b, c])).errorsToFlow(b) + a = stream() + b = prop() + c = stream() + expect(Kefir.combine([a, b, c])).errorsToFlow(c) + + it 'should handle errors correctly', -> + # a: ---e---v---v----- + # 1 + # b: ----v---e----v--- + # 2 + # c: -----v---e--v---- + # 3 + # result: ---eee-vee-eev-- + # 111 23 32 + + a = stream() + b = stream() + c = stream() + expect(Kefir.combine([a, b, c])).toEmit [ + {error: -1}, + {error: -1}, + {error: -1}, + [3, 1, 2], + {error: -2}, + {error: -3}, + {error: -3}, + {error: -2}, + [4, 6, 5] + ], -> + send(a, [{error: -1}]) + send(b, [1]) + send(c, [2]) + send(a, [3]) + send(b, [{error: -2}]) + send(c, [{error: -3}]) + send(a, [4]) + send(c, [5]) + send(b, [6]) + + + + describe 'sampledBy functionality (3 arity combine)', -> + + it 'should return stream', -> + expect(Kefir.combine([], [])).toBeStream() + expect(Kefir.combine([stream(), prop()], [stream(), prop()])).toBeStream() + + it 'should be ended if empty array provided', -> + expect(Kefir.combine([stream(), prop()], [])).toEmit [] + expect(Kefir.combine([], [stream(), prop()])).toEmit [''] + + it 'should be ended if array of ended observables provided', -> + a = send(stream(), ['']) + b = send(prop(), ['']) + c = send(stream(), ['']) + expect(Kefir.combine([a, b, c], [stream(), prop()])).toEmit [''] + + it 'should be ended and emmit current (once) if array of ended properties provided and each of them has current', -> + a = send(prop(), [1, '']) + b = send(prop(), [2, '']) + c = send(prop(), [3, '']) + s1 = Kefir.combine([a, b], [c]) + expect(s1).toEmit [{current: [1, 2, 3]}, ''] + expect(s1).toEmit [''] + + it 'should activate sources', -> + a = stream() + b = prop() + c = stream() + expect(Kefir.combine([a, b], [c])).toActivate(a, b, c) + + it 'should handle events and current from observables', -> + a = stream() + b = send(prop(), [0]) + c = stream() + d = stream() + expect(Kefir.combine([c, d], [a, b])).toEmit [[2, 3, 1, 0], [5, 3, 1, 4], [6, 3, 1, 4], [6, 7, 1, 4], ''], -> + send(a, [1]) + send(c, [2]) + send(d, [3]) + send(b, [4, '']) + send(c, [5, 6, '']) + send(d, [7, '']) + + + it 'should accept optional combinator function', -> + join = (args...) -> args.join('+') + a = stream() + b = send(prop(), [0]) + c = stream() + d = stream() + expect(Kefir.combine([c, d], [a, b], join)).toEmit ['2+3+1+0', '5+3+1+4', '6+3+1+4', '6+7+1+4', ''], -> + send(a, [1]) + send(c, [2]) + send(d, [3]) + send(b, [4, '']) + send(c, [5, 6, '']) + send(d, [7, '']) + + + it 'when activating second time and has 2+ properties in sources, should emit current value at most once', -> + a = send(prop(), [0]) + b = send(prop(), [1]) + c = send(prop(), [2]) + sb = Kefir.combine([a, b], [c]) + activate(sb) + deactivate(sb) + expect(sb).toEmit [{current: [0, 1, 2]}] + + it 'errors should flow', -> + a = stream() + b = prop() + c = stream() + d = prop() + expect(Kefir.combine([a, b], [c, d])).errorsToFlow(a) + a = stream() + b = prop() + c = stream() + d = prop() + expect(Kefir.combine([a, b], [c, d])).errorsToFlow(b) + + # https://github.com/rpominov/kefir/issues/98 + it 'should work nice for emitating atomic updates', -> + a = stream() + b = a.map (x) -> x + 2 + c = a.map (x) -> x * 2 + expect(Kefir.combine([b], [c])).toEmit [[3, 2], [4, 4], [5, 6]], -> + send(a, [1, 2, 3]) + + describe 'object', -> + + it 'should return stream', -> + expect(Kefir.combine({})).toBeStream() + expect(Kefir.combine({s: stream(), p: prop()})).toBeStream() + + it 'should be ended if empty array provided', -> + expect(Kefir.combine({})).toEmit [''] + + it 'should be ended if array of ended observables provided', -> + a = send(stream(), ['']) + b = send(prop(), ['']) + c = send(stream(), ['']) + expect(Kefir.combine({a, b, c})).toEmit [''] + + it 'should be ended and has current if array of ended properties provided and each of them has current', -> + a = send(prop(), [1, '']) + b = send(prop(), [2, '']) + c = send(prop(), [3, '']) + expect(Kefir.combine({a, b, c})).toEmit [{current: {a:1, b:2, c:3}}, ''] + + it 'should activate sources', -> + a = stream() + b = prop() + c = stream() + expect(Kefir.combine({a, b, c})).toActivate(a, b, c) + + it 'should handle events and current from observables', -> a = stream() b = send(prop(), [0]) c = stream() - d = stream() - expect(Kefir.combine([c, d], [a, b], join)).toEmit ['2+3+1+0', '5+3+1+4', '6+3+1+4', '6+7+1+4', ''], -> + expect(Kefir.combine({a, b, c})).toEmit [ + {a:1, b:0, c:2}, + {a:1, b:3, c:2}, + {a:1, b:4, c:2}, + {a:1, b:4, c:5}, + {a:1, b:4, c:6}, + '' + ], -> send(a, [1]) send(c, [2]) - send(d, [3]) + send(b, [3]) + send(a, ['']) send(b, [4, '']) send(c, [5, 6, '']) - send(d, [7, '']) + it 'should accept optional combinator function', -> + a = stream() + b = send(prop(), [0]) + c = stream() + join = (ev) -> ev.a + '+' + ev.b + '+' + ev.c + expect(Kefir.combine({a, b, c}, join)).toEmit ['1+0+2', '1+3+2', '1+4+2', '1+4+5', '1+4+6', ''], -> + send(a, [1]) + send(c, [2]) + send(b, [3]) + send(a, ['']) + send(b, [4, '']) + send(c, [5, 6, '']) it 'when activating second time and has 2+ properties in sources, should emit current value at most once', -> a = send(prop(), [0]) b = send(prop(), [1]) - c = send(prop(), [2]) - sb = Kefir.combine([a, b], [c]) - activate(sb) - deactivate(sb) - expect(sb).toEmit [{current: [0, 1, 2]}] + cb = Kefir.combine({a, b}) + activate(cb) + deactivate(cb) + expect(cb).toEmit [{current: {a:0, b:1}}] it 'errors should flow', -> a = stream() b = prop() c = stream() - d = prop() - expect(Kefir.combine([a, b], [c, d])).errorsToFlow(a) + expect(Kefir.combine({a, b, c})).errorsToFlow(a) + a = stream() + b = prop() + c = stream() + expect(Kefir.combine({a, b, c})).errorsToFlow(b) a = stream() b = prop() c = stream() - d = prop() - expect(Kefir.combine([a, b], [c, d])).errorsToFlow(b) + expect(Kefir.combine({a, b, c})).errorsToFlow(c) + + it 'should handle errors correctly', -> + # a: ---e---v---v----- + # 1 + # b: ----v---e----v--- + # 2 + # c: -----v---e--v---- + # 3 + # result: ---eee-vee-eev-- + # 111 23 32 + + a = stream() + b = stream() + c = stream() + expect(Kefir.combine({a, b, c})).toEmit [ + {error: -1}, + {error: -1}, + {error: -1}, + {a:3, b:1, c:2}, + {error: -2}, + {error: -3}, + {error: -3}, + {error: -2}, + {a:4, b:6, c:5} + ], -> + send(a, [{error: -1}]) + send(b, [1]) + send(c, [2]) + send(a, [3]) + send(b, [{error: -2}]) + send(c, [{error: -3}]) + send(a, [4]) + send(c, [5]) + send(b, [6]) + + + + describe 'sampledBy functionality (3 arity combine)', -> + + it 'should return stream', -> + expect(Kefir.combine({}, {})).toBeStream() + expect(Kefir.combine({s1:stream(), p1:prop()}, {s2:stream(), p2:prop()})).toBeStream() + + it 'should be ended if empty array provided', -> + expect(Kefir.combine({s1:stream(), p1:prop()}, {})).toEmit [] + expect(Kefir.combine({}, {s2:stream(), p2:prop()})).toEmit [''] + + it 'should be ended if array of ended observables provided', -> + a = send(stream(), ['']) + b = send(prop(), ['']) + c = send(stream(), ['']) + expect(Kefir.combine({a, b, c}, {d:stream(), e:prop()})).toEmit [''] + + it 'should be ended and emmit current (once) if array of ended properties provided and each of them has current', -> + a = send(prop(), [1, '']) + b = send(prop(), [2, '']) + c = send(prop(), [3, '']) + s1 = Kefir.combine({a, b}, {c}) + expect(s1).toEmit [{current: {a:1, b:2, c:3}}, ''] + expect(s1).toEmit [''] + + it 'should activate sources', -> + a = stream() + b = prop() + c = stream() + expect(Kefir.combine({a, b}, {c})).toActivate(a, b, c) + + it 'should handle events and current from observables', -> + a = stream() + b = send(prop(), [0]) + c = stream() + d = stream() + expect(Kefir.combine({c, d}, {a, b})).toEmit [ + {a:1, b:0, c:2, d:3}, + {a:1, b:4, c:5, d:3}, + {a:1, b:4, c:6, d:3}, + {a:1, b:4, c:6, d:7}, + '' + ], -> + send(a, [1]) + send(c, [2]) + send(d, [3]) + send(b, [4, '']) + send(c, [5, 6, '']) + send(d, [7, '']) + + + it 'should accept optional combinator function', -> + join = (msg) -> msg.c + "+" + msg.d + "+" + msg.a + "+" + msg.b + a = stream() + b = send(prop(), [0]) + c = stream() + d = stream() + expect(Kefir.combine({c, d}, {a, b}, join)).toEmit ['2+3+1+0', '5+3+1+4', '6+3+1+4', '6+7+1+4', ''], -> + send(a, [1]) + send(c, [2]) + send(d, [3]) + send(b, [4, '']) + send(c, [5, 6, '']) + send(d, [7, '']) + + + it 'when activating second time and has 2+ properties in sources, should emit current value at most once', -> + a = send(prop(), [0]) + b = send(prop(), [1]) + c = send(prop(), [2]) + sb = Kefir.combine({a, b}, {c}) + activate(sb) + deactivate(sb) + expect(sb).toEmit [{current: {a:0, b:1, c:2}}] + + it 'errors should flow', -> + a = stream() + b = prop() + c = stream() + d = prop() + expect(Kefir.combine({a, b}, {c, d})).errorsToFlow(a) + a = stream() + b = prop() + c = stream() + d = prop() + expect(Kefir.combine({a, b}, {c, d})).errorsToFlow(b) + + # https://github.com/rpominov/kefir/issues/98 + it 'should work nice for emitating atomic updates', -> + a = stream() + b = a.map (x) -> x + 2 + c = a.map (x) -> x * 2 + expect(Kefir.combine({b}, {c})).toEmit [ + {b:3, c:2}, + {b:4, c:4}, + {b:5, c:6} + ], -> + send(a, [1, 2, 3]) + + it 'should prefer active keys over passive keys', -> + a = stream() + b = stream() + _a = stream() + + expect(Kefir.combine({a, b}, {a: _a})).toEmit [ + {a: 1, b: 4}, + {a: 2, b: 4}, + {a: 3, b: 4} + ], -> + send(_a, [-1]) + send(a, [1]) + send(b, [4]) + send(_a, [-2]) + send(a, [2]) + send(_a, [-3]) + send(a, [3]) - # https://github.com/rpominov/kefir/issues/98 - it 'should work nice for emitating atomic updates', -> + describe 'mismatches', -> + it 'should not allow mismatched argument types', -> a = stream() - b = a.map (x) -> x + 2 - c = a.map (x) -> x * 2 - expect(Kefir.combine([b], [c])).toEmit [[3, 2], [4, 4], [5, 6]], -> - send(a, [1, 2, 3]) + b = stream() + arrayAndObject = -> Kefir.combine([a],{b}) + objectAndArray = -> Kefir.combine({a},[b]) + expect(arrayAndObject).toThrow() + expect(objectAndArray).toThrow()