Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transduce transform. Resolves #212. #223

Merged
merged 4 commits into from
Feb 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2939,6 +2939,97 @@ Stream.prototype.scan1 = function (f) {
};
exposeMethod('scan1');

var highlandTransform = {
init: function () { },
result: function (push) {
// Don't push nil here. Otherwise, we can't catch errors from `result`
// and propagate them. The `transduce` implementation will do it.
return push;
},
step: function (push, input) {
push(null, input);
return push;
}
};

/**
* Applies the transformation defined by the the given *transducer* to the
* stream. A transducer is any function that follows the
* [Transducer Protocol](https://github.com/cognitect-labs/transducers-js#transformer-protocol).
* See
* [transduce-js](https://github.com/cognitect-labs/transducers-js#transducers-js)
* for more details on what transducers actually are.
*
* The `result` object that is passed in through the
* [Transformer Protocol](https://github.com/cognitect-labs/transducers-js#transformer-protocol)
* will be the `push` function provided by the [consume](#consume) transform.
*
* Like [scan](#scan), if the transducer throws an exception, the transform
* will stop and emit that error. Any intermediate values that were produced
* before the error will still be emitted.
*
* @id transduce
* @section Transforms
* @name Stream.transduce(xf)
* @param {Function} xf - The transducer.
* @api public
*
* var xf = require('transducer-js').map(_.add(1));
* _([1, 2, 3, 4]).transduce(xf);
* // => [2, 3, 4, 5]
*/

Stream.prototype.transduce = function transduce(xf) {
var transform = xf(highlandTransform);

return this.consume(function (err, x, push, next) {
if (err) {
// Pass through errors, like we always do.
push(err);
next();
}
else if (x === _.nil) {
runResult(push);
}
else {
var res = runStep(push, x);

if (!res) {
return;
}

if (res.__transducers_reduced__) {
runResult(res.value);
}
else {
next();
}
}
});

function runResult(push) {
try {
transform.result(push);
}
catch (e) {
push(e);
}
push(null, _.nil);
}

function runStep(push, x) {
try {
return transform.step(push, x);
}
catch (e) {
push(e);
push(null, _.nil);
return null;
}
}
};
exposeMethod('transduce');

/**
* Concatenates a Stream to the end of this Stream.
*
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"scrawl": "0.0.5",
"sinon": "~1.8.2",
"stream-array": "~1.0.1",
"through": "~2.3.4"
"through": "~2.3.4",
"transducers-js": "~0.4.135"
},
"scripts": {
"test": "nodeunit test/test.js"
Expand Down
116 changes: 116 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var EventEmitter = require('events').EventEmitter,
streamify = require('stream-array'),
concat = require('concat-stream'),
Promise = require('es6-promise').Promise,
transducers = require('transducers-js'),
_ = require('../lib/index');


Expand Down Expand Up @@ -55,6 +56,15 @@ function noValueOnErrorTest(transform, expected) {
}
}

function generatorStream(input, timeout) {
return _(function (push, next) {
for (var i = 0, len = input.length; i < len; i++) {
setTimeout(push.bind(null, null, input[i]), timeout * i);
}
setTimeout(push.bind(null, null, _.nil), timeout * len);
});
}

exports['ratelimit'] = {
setUp: function (callback) {
this.clock = sinon.useFakeTimers();
Expand Down Expand Up @@ -2107,6 +2117,112 @@ exports['collect - GeneratorStream'] = function (test) {
});
};

exports['transduce'] = {
setUp: function (cb) {
var self = this;
this.xf = transducers.map(_.add(1));
this.input = [1, 2, 3];
this.expected = [2, 3, 4];
this.tester = function (expected, test) {
return function (xs) {
test.same(xs, expected);
};
};
cb();
},
'ArrayStream': function (test) {
test.expect(1);
_(this.input)
.transduce(this.xf)
.toArray(this.tester(this.expected, test));
test.done();
},
'GeneratorStream': function (test) {
test.expect(1);
generatorStream(this.input, 10)
.transduce(this.xf)
.toArray(this.tester(this.expected, test));
setTimeout(test.done.bind(test), 10 * (this.input.length + 2));
},
'partial application': function (test) {
test.expect(1);
_.transduce(this.xf)(this.input)
.toArray(this.tester(this.expected, test));
test.done();
},
'passThroughError': function (test) {
test.expect(4);
var s = _([1, 2, 3]).map(function (x) {
if (x === 2) {
throw new Error('error');
}
return x;
}).transduce(this.xf);

s.pull(valueEquals(test, 2));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, 4));
s.pull(valueEquals(test, _.nil));
test.done();
},
'stopOnStepError': function (test) {
test.expect(3);
var s = _([1, 2, 3]).transduce(xf);

s.pull(valueEquals(test, 1));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, _.nil));
test.done();

function xf(transform) {
return {
init: transform.init.bind(transform),
result: transform.result.bind(transform),
step: function (result, x) {
if (x === 2) {
throw new Error('error');
}
result = transform.step(result, x);
return result;
}
};
}
},
'stopOnResultError': function (test) {
test.expect(5);
var s = _([1, 2, 3]).transduce(xf);

s.pull(valueEquals(test, 1));
s.pull(valueEquals(test, 2));
s.pull(valueEquals(test, 3));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, _.nil));
test.done();

function xf(transform) {
return {
init: transform.init.bind(transform),
result: function (result) {
transform.result(result);
throw new Error('error');
},
step: transform.step.bind(transform)
};
}
},
'early termination': function (test) {
test.expect(1);
var xf = transducers.take(1);
_([1, 2, 3])
.transduce(xf)
.toArray(this.tester([1], test));
test.done();
},
'noValueOnError': function (test) {
noValueOnErrorTest(_.transduce(this.xf))(test);
},
};

exports['concat'] = function (test) {
test.expect(2);
_.concat([3,4], [1,2]).toArray(function (xs) {
Expand Down