Skip to content

Commit

Permalink
Merge pull request #223 from vqvu/transduce
Browse files Browse the repository at this point in the history
Add transduce transform. Resolves #212.
  • Loading branch information
vqvu committed Feb 13, 2015
2 parents f23493f + b465be2 commit 434d8ca
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 1 deletion.
91 changes: 91 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2943,6 +2943,97 @@ Stream.prototype.scan1 = function (f) {
};
exposeMethod('scan1');

var highlandTransform = {
init: function () { },
result: function (push) {
// Don't push nil here. Otherwise, we can't catch errors from `result`
// and propagate them. The `transduce` implementation will do it.
return push;
},
step: function (push, input) {
push(null, input);
return push;
}
};

/**
* Applies the transformation defined by the the given *transducer* to the
* stream. A transducer is any function that follows the
* [Transducer Protocol](https://github.com/cognitect-labs/transducers-js#transformer-protocol).
* See
* [transduce-js](https://github.com/cognitect-labs/transducers-js#transducers-js)
* for more details on what transducers actually are.
*
* The `result` object that is passed in through the
* [Transformer Protocol](https://github.com/cognitect-labs/transducers-js#transformer-protocol)
* will be the `push` function provided by the [consume](#consume) transform.
*
* Like [scan](#scan), if the transducer throws an exception, the transform
* will stop and emit that error. Any intermediate values that were produced
* before the error will still be emitted.
*
* @id transduce
* @section Transforms
* @name Stream.transduce(xf)
* @param {Function} xf - The transducer.
* @api public
*
* var xf = require('transducer-js').map(_.add(1));
* _([1, 2, 3, 4]).transduce(xf);
* // => [2, 3, 4, 5]
*/

Stream.prototype.transduce = function transduce(xf) {
var transform = xf(highlandTransform);

return this.consume(function (err, x, push, next) {
if (err) {
// Pass through errors, like we always do.
push(err);
next();
}
else if (x === _.nil) {
runResult(push);
}
else {
var res = runStep(push, x);

if (!res) {
return;
}

if (res.__transducers_reduced__) {
runResult(res.value);
}
else {
next();
}
}
});

function runResult(push) {
try {
transform.result(push);
}
catch (e) {
push(e);
}
push(null, _.nil);
}

function runStep(push, x) {
try {
return transform.step(push, x);
}
catch (e) {
push(e);
push(null, _.nil);
return null;
}
}
};
exposeMethod('transduce');

/**
* Concatenates a Stream to the end of this Stream.
*
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"scrawl": "0.0.5",
"sinon": "~1.8.2",
"stream-array": "~1.0.1",
"through": "~2.3.4"
"through": "~2.3.4",
"transducers-js": "~0.4.135"
},
"scripts": {
"test": "nodeunit test/test.js"
Expand Down
116 changes: 116 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var EventEmitter = require('events').EventEmitter,
streamify = require('stream-array'),
concat = require('concat-stream'),
Promise = require('es6-promise').Promise,
transducers = require('transducers-js'),
_ = require('../lib/index');


Expand Down Expand Up @@ -55,6 +56,15 @@ function noValueOnErrorTest(transform, expected) {
}
}

function generatorStream(input, timeout) {
return _(function (push, next) {
for (var i = 0, len = input.length; i < len; i++) {
setTimeout(push.bind(null, null, input[i]), timeout * i);
}
setTimeout(push.bind(null, null, _.nil), timeout * len);
});
}

exports['ratelimit'] = {
setUp: function (callback) {
this.clock = sinon.useFakeTimers();
Expand Down Expand Up @@ -2128,6 +2138,112 @@ exports['collect - GeneratorStream'] = function (test) {
});
};

exports['transduce'] = {
setUp: function (cb) {
var self = this;
this.xf = transducers.map(_.add(1));
this.input = [1, 2, 3];
this.expected = [2, 3, 4];
this.tester = function (expected, test) {
return function (xs) {
test.same(xs, expected);
};
};
cb();
},
'ArrayStream': function (test) {
test.expect(1);
_(this.input)
.transduce(this.xf)
.toArray(this.tester(this.expected, test));
test.done();
},
'GeneratorStream': function (test) {
test.expect(1);
generatorStream(this.input, 10)
.transduce(this.xf)
.toArray(this.tester(this.expected, test));
setTimeout(test.done.bind(test), 10 * (this.input.length + 2));
},
'partial application': function (test) {
test.expect(1);
_.transduce(this.xf)(this.input)
.toArray(this.tester(this.expected, test));
test.done();
},
'passThroughError': function (test) {
test.expect(4);
var s = _([1, 2, 3]).map(function (x) {
if (x === 2) {
throw new Error('error');
}
return x;
}).transduce(this.xf);

s.pull(valueEquals(test, 2));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, 4));
s.pull(valueEquals(test, _.nil));
test.done();
},
'stopOnStepError': function (test) {
test.expect(3);
var s = _([1, 2, 3]).transduce(xf);

s.pull(valueEquals(test, 1));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, _.nil));
test.done();

function xf(transform) {
return {
init: transform.init.bind(transform),
result: transform.result.bind(transform),
step: function (result, x) {
if (x === 2) {
throw new Error('error');
}
result = transform.step(result, x);
return result;
}
};
}
},
'stopOnResultError': function (test) {
test.expect(5);
var s = _([1, 2, 3]).transduce(xf);

s.pull(valueEquals(test, 1));
s.pull(valueEquals(test, 2));
s.pull(valueEquals(test, 3));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, _.nil));
test.done();

function xf(transform) {
return {
init: transform.init.bind(transform),
result: function (result) {
transform.result(result);
throw new Error('error');
},
step: transform.step.bind(transform)
};
}
},
'early termination': function (test) {
test.expect(1);
var xf = transducers.take(1);
_([1, 2, 3])
.transduce(xf)
.toArray(this.tester([1], test));
test.done();
},
'noValueOnError': function (test) {
noValueOnErrorTest(_.transduce(this.xf))(test);
},
};

exports['concat'] = function (test) {
test.expect(2);
_.concat([3,4], [1,2]).toArray(function (xs) {
Expand Down

0 comments on commit 434d8ca

Please sign in to comment.