Skip to content

Commit

Permalink
Fix issue with takeUntil in nested stream
Browse files Browse the repository at this point in the history
This issue was caused by #180 which made sure that nested streams were
updated properly. Updating them properly caused nested takeUntil'd
streams to end too soon.
  • Loading branch information
Einar Norðfjörð committed Oct 5, 2018
1 parent 48799a9 commit 4b0340c
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 5 deletions.
232 changes: 231 additions & 1 deletion examples/drag-and-drop/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,238 @@ StreamTransformer.prototype['@@transducer/step'] = function(s, v) { return v; };

var lib = flyd;

/**
* Tests whether or not an object is an array.
*
* @private
* @param {*} val The object to test.
* @return {Boolean} `true` if `val` is an array, `false` otherwise.
* @example
*
* _isArray([]); //=> true
* _isArray(null); //=> false
* _isArray({}); //=> false
*/
var _isArray = Array.isArray || function _isArray(val) {
return val != null && val.length >= 0 && Object.prototype.toString.call(val) === '[object Array]';
};

function _isTransformer(obj) {
return typeof obj['@@transducer/step'] === 'function';
}
var _isTransformer_1 = _isTransformer;

/**
* Returns a function that dispatches with different strategies based on the
* object in list position (last argument). If it is an array, executes [fn].
* Otherwise, if it has a function with one of the given method names, it will
* execute that function (functor case). Otherwise, if it is a transformer,
* uses transducer [xf] to return a new transformer (transducer case).
* Otherwise, it will default to executing [fn].
*
* @private
* @param {Array} methodNames properties to check for a custom implementation
* @param {Function} xf transducer to initialize if object is transformer
* @param {Function} fn default ramda implementation
* @return {Function} A function that dispatches on object in list position
*/


function _dispatchable(methodNames, xf, fn) {
return function () {
if (arguments.length === 0) {
return fn();
}
var args = Array.prototype.slice.call(arguments, 0);
var obj = args.pop();
if (!_isArray(obj)) {
var idx = 0;
while (idx < methodNames.length) {
if (typeof obj[methodNames[idx]] === 'function') {
return obj[methodNames[idx]].apply(obj, args);
}
idx += 1;
}
if (_isTransformer_1(obj)) {
var transducer = xf.apply(null, args);
return transducer(obj);
}
}
return fn.apply(this, arguments);
};
}
var _dispatchable_1 = _dispatchable;

var _xfBase = {
init: function () {
return this.xf['@@transducer/init']();
},
result: function (result) {
return this.xf['@@transducer/result'](result);
}
};

var XDrop = /*#__PURE__*/function () {

function XDrop(n, xf) {
this.xf = xf;
this.n = n;
}
XDrop.prototype['@@transducer/init'] = _xfBase.init;
XDrop.prototype['@@transducer/result'] = _xfBase.result;
XDrop.prototype['@@transducer/step'] = function (result, input) {
if (this.n > 0) {
this.n -= 1;
return result;
}
return this.xf['@@transducer/step'](result, input);
};

return XDrop;
}();

var _xdrop = /*#__PURE__*/_curry2_1(function _xdrop(n, xf) {
return new XDrop(n, xf);
});
var _xdrop_1 = _xdrop;

/**
* This checks whether a function has a [methodname] function. If it isn't an
* array it will execute that function otherwise it will default to the ramda
* implementation.
*
* @private
* @param {Function} fn ramda implemtation
* @param {String} methodname property to check for a custom implementation
* @return {Object} Whatever the return value of the method is.
*/


function _checkForMethod(methodname, fn) {
return function () {
var length = arguments.length;
if (length === 0) {
return fn();
}
var obj = arguments[length - 1];
return _isArray(obj) || typeof obj[methodname] !== 'function' ? fn.apply(this, arguments) : obj[methodname].apply(obj, Array.prototype.slice.call(arguments, 0, length - 1));
};
}
var _checkForMethod_1 = _checkForMethod;

/**
* Optimized internal three-arity curry function.
*
* @private
* @category Function
* @param {Function} fn The function to curry.
* @return {Function} The curried function.
*/


function _curry3(fn) {
return function f3(a, b, c) {
switch (arguments.length) {
case 0:
return f3;
case 1:
return _isPlaceholder_1(a) ? f3 : _curry2_1(function (_b, _c) {
return fn(a, _b, _c);
});
case 2:
return _isPlaceholder_1(a) && _isPlaceholder_1(b) ? f3 : _isPlaceholder_1(a) ? _curry2_1(function (_a, _c) {
return fn(_a, b, _c);
}) : _isPlaceholder_1(b) ? _curry2_1(function (_b, _c) {
return fn(a, _b, _c);
}) : _curry1_1(function (_c) {
return fn(a, b, _c);
});
default:
return _isPlaceholder_1(a) && _isPlaceholder_1(b) && _isPlaceholder_1(c) ? f3 : _isPlaceholder_1(a) && _isPlaceholder_1(b) ? _curry2_1(function (_a, _b) {
return fn(_a, _b, c);
}) : _isPlaceholder_1(a) && _isPlaceholder_1(c) ? _curry2_1(function (_a, _c) {
return fn(_a, b, _c);
}) : _isPlaceholder_1(b) && _isPlaceholder_1(c) ? _curry2_1(function (_b, _c) {
return fn(a, _b, _c);
}) : _isPlaceholder_1(a) ? _curry1_1(function (_a) {
return fn(_a, b, c);
}) : _isPlaceholder_1(b) ? _curry1_1(function (_b) {
return fn(a, _b, c);
}) : _isPlaceholder_1(c) ? _curry1_1(function (_c) {
return fn(a, b, _c);
}) : fn(a, b, c);
}
};
}
var _curry3_1 = _curry3;

/**
* Returns the elements of the given list or string (or object with a `slice`
* method) from `fromIndex` (inclusive) to `toIndex` (exclusive).
*
* Dispatches to the `slice` method of the third argument, if present.
*
* @func
* @memberOf R
* @since v0.1.4
* @category List
* @sig Number -> Number -> [a] -> [a]
* @sig Number -> Number -> String -> String
* @param {Number} fromIndex The start index (inclusive).
* @param {Number} toIndex The end index (exclusive).
* @param {*} list
* @return {*}
* @example
*
* R.slice(1, 3, ['a', 'b', 'c', 'd']); //=> ['b', 'c']
* R.slice(1, Infinity, ['a', 'b', 'c', 'd']); //=> ['b', 'c', 'd']
* R.slice(0, -1, ['a', 'b', 'c', 'd']); //=> ['a', 'b', 'c']
* R.slice(-3, -1, ['a', 'b', 'c', 'd']); //=> ['b', 'c']
* R.slice(0, 3, 'ramda'); //=> 'ram'
*/


var slice = /*#__PURE__*/_curry3_1( /*#__PURE__*/_checkForMethod_1('slice', function slice(fromIndex, toIndex, list) {
return Array.prototype.slice.call(list, fromIndex, toIndex);
}));
var slice_1 = slice;

/**
* Returns all but the first `n` elements of the given list, string, or
* transducer/transformer (or object with a `drop` method).
*
* Dispatches to the `drop` method of the second argument, if present.
*
* @func
* @memberOf R
* @since v0.1.0
* @category List
* @sig Number -> [a] -> [a]
* @sig Number -> String -> String
* @param {Number} n
* @param {*} list
* @return {*} A copy of list without the first `n` elements
* @see R.take, R.transduce, R.dropLast, R.dropWhile
* @example
*
* R.drop(1, ['foo', 'bar', 'baz']); //=> ['bar', 'baz']
* R.drop(2, ['foo', 'bar', 'baz']); //=> ['baz']
* R.drop(3, ['foo', 'bar', 'baz']); //=> []
* R.drop(4, ['foo', 'bar', 'baz']); //=> []
* R.drop(3, 'ramda'); //=> 'da'
*/


var drop = /*#__PURE__*/_curry2_1( /*#__PURE__*/_dispatchable_1(['drop'], _xdrop_1, function drop(n, xs) {
return slice_1(Math.max(0, n), Infinity, xs);
}));
var drop_1 = drop;

var dropCurrentValue = lib.transduce(drop_1(1));

var takeuntil = lib.curryN(2, function(src, term) {
return lib.endsOn(lib.merge(term, src.end), lib.combine(function(src, self) {
var end$ = term.hasVal ? dropCurrentValue(term) : term;
return lib.endsOn(lib.merge(end$, src.end), lib.combine(function(src, self) {
self(src());
}, [src]));
});
Expand Down
4 changes: 1 addition & 3 deletions module/switchlatest/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
var flyd = require('../../lib');
var takeUntil = require('../takeuntil');
var drop = require('ramda/src/drop');

var dropCurrentValue = flyd.transduce(drop(1));

module.exports = function(s) {
return flyd.combine(function(stream$, self) {
var value$ = stream$();
flyd.on(self, takeUntil(value$, dropCurrentValue(stream$)));
flyd.on(self, takeUntil(value$, stream$));
}, [s]);
};
6 changes: 5 additions & 1 deletion module/takeuntil/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
var flyd = require('../../lib');
var drop = require('ramda/src/drop');

var dropCurrentValue = flyd.transduce(drop(1));

module.exports = flyd.curryN(2, function(src, term) {
return flyd.endsOn(flyd.merge(term, src.end), flyd.combine(function(src, self) {
var end$ = term.hasVal ? dropCurrentValue(term) : term;
return flyd.endsOn(flyd.merge(end$, src.end), flyd.combine(function(src, self) {
self(src());
}, [src]));
});
20 changes: 20 additions & 0 deletions module/takeuntil/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,24 @@ describe('takeUntil', function() {
assert.deepEqual(result, [1]);
assert(s.end());
});

it('works in nested streams', function() {
var source = stream(1);
var terminator = stream(true);

var value = stream(1).chain(function() {
return takeUntil(source, terminator);
})
.map(function(val) {
return val + 1;
});

source(2)(3)(4)(5);

terminator(true);

source(6)(7)(8)(9);

assert.equal(value(), 6);
})
});

0 comments on commit 4b0340c

Please sign in to comment.