diff --git a/lib/index.js b/lib/index.js index a3e40b3..e3283a6 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2939,6 +2939,97 @@ Stream.prototype.scan1 = function (f) { }; exposeMethod('scan1'); +var highlandTransform = { + init: function () { }, + result: function (push) { + // Don't push nil here. Otherwise, we can't catch errors from `result` + // and propagate them. The `transduce` implementation will do it. + return push; + }, + step: function (push, input) { + push(null, input); + return push; + } +}; + +/** + * Applies the transformation defined by the the given *transducer* to the + * stream. A transducer is any function that follows the + * [Transducer Protocol](https://github.com/cognitect-labs/transducers-js#transformer-protocol). + * See + * [transduce-js](https://github.com/cognitect-labs/transducers-js#transducers-js) + * for more details on what transducers actually are. + * + * The `result` object that is passed in through the + * [Transformer Protocol](https://github.com/cognitect-labs/transducers-js#transformer-protocol) + * will be the `push` function provided by the [consume](#consume) transform. + * + * Like [scan](#scan), if the transducer throws an exception, the transform + * will stop and emit that error. Any intermediate values that were produced + * before the error will still be emitted. + * + * @id transduce + * @section Transforms + * @name Stream.transduce(xf) + * @param {Function} xf - The transducer. + * @api public + * + * var xf = require('transducer-js').map(_.add(1)); + * _([1, 2, 3, 4]).transduce(xf); + * // => [2, 3, 4, 5] + */ + +Stream.prototype.transduce = function transduce(xf) { + var transform = xf(highlandTransform); + + return this.consume(function (err, x, push, next) { + if (err) { + // Pass through errors, like we always do. + push(err); + next(); + } + else if (x === _.nil) { + runResult(push); + } + else { + var res = runStep(push, x); + + if (!res) { + return; + } + + if (res.__transducers_reduced__) { + runResult(res.value); + } + else { + next(); + } + } + }); + + function runResult(push) { + try { + transform.result(push); + } + catch (e) { + push(e); + } + push(null, _.nil); + } + + function runStep(push, x) { + try { + return transform.step(push, x); + } + catch (e) { + push(e); + push(null, _.nil); + return null; + } + } +}; +exposeMethod('transduce'); + /** * Concatenates a Stream to the end of this Stream. * diff --git a/package.json b/package.json index 7ea2160..ff8d936 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,8 @@ "scrawl": "0.0.5", "sinon": "~1.8.2", "stream-array": "~1.0.1", - "through": "~2.3.4" + "through": "~2.3.4", + "transducers-js": "~0.4.135" }, "scripts": { "test": "nodeunit test/test.js" diff --git a/test/test.js b/test/test.js index c8e8a4f..62b037e 100755 --- a/test/test.js +++ b/test/test.js @@ -5,6 +5,7 @@ var EventEmitter = require('events').EventEmitter, streamify = require('stream-array'), concat = require('concat-stream'), Promise = require('es6-promise').Promise, + transducers = require('transducers-js'), _ = require('../lib/index'); @@ -55,6 +56,15 @@ function noValueOnErrorTest(transform, expected) { } } +function generatorStream(input, timeout) { + return _(function (push, next) { + for (var i = 0, len = input.length; i < len; i++) { + setTimeout(push.bind(null, null, input[i]), timeout * i); + } + setTimeout(push.bind(null, null, _.nil), timeout * len); + }); +} + exports['ratelimit'] = { setUp: function (callback) { this.clock = sinon.useFakeTimers(); @@ -2107,6 +2117,112 @@ exports['collect - GeneratorStream'] = function (test) { }); }; +exports['transduce'] = { + setUp: function (cb) { + var self = this; + this.xf = transducers.map(_.add(1)); + this.input = [1, 2, 3]; + this.expected = [2, 3, 4]; + this.tester = function (expected, test) { + return function (xs) { + test.same(xs, expected); + }; + }; + cb(); + }, + 'ArrayStream': function (test) { + test.expect(1); + _(this.input) + .transduce(this.xf) + .toArray(this.tester(this.expected, test)); + test.done(); + }, + 'GeneratorStream': function (test) { + test.expect(1); + generatorStream(this.input, 10) + .transduce(this.xf) + .toArray(this.tester(this.expected, test)); + setTimeout(test.done.bind(test), 10 * (this.input.length + 2)); + }, + 'partial application': function (test) { + test.expect(1); + _.transduce(this.xf)(this.input) + .toArray(this.tester(this.expected, test)); + test.done(); + }, + 'passThroughError': function (test) { + test.expect(4); + var s = _([1, 2, 3]).map(function (x) { + if (x === 2) { + throw new Error('error'); + } + return x; + }).transduce(this.xf); + + s.pull(valueEquals(test, 2)); + s.pull(errorEquals(test, 'error')); + s.pull(valueEquals(test, 4)); + s.pull(valueEquals(test, _.nil)); + test.done(); + }, + 'stopOnStepError': function (test) { + test.expect(3); + var s = _([1, 2, 3]).transduce(xf); + + s.pull(valueEquals(test, 1)); + s.pull(errorEquals(test, 'error')); + s.pull(valueEquals(test, _.nil)); + test.done(); + + function xf(transform) { + return { + init: transform.init.bind(transform), + result: transform.result.bind(transform), + step: function (result, x) { + if (x === 2) { + throw new Error('error'); + } + result = transform.step(result, x); + return result; + } + }; + } + }, + 'stopOnResultError': function (test) { + test.expect(5); + var s = _([1, 2, 3]).transduce(xf); + + s.pull(valueEquals(test, 1)); + s.pull(valueEquals(test, 2)); + s.pull(valueEquals(test, 3)); + s.pull(errorEquals(test, 'error')); + s.pull(valueEquals(test, _.nil)); + test.done(); + + function xf(transform) { + return { + init: transform.init.bind(transform), + result: function (result) { + transform.result(result); + throw new Error('error'); + }, + step: transform.step.bind(transform) + }; + } + }, + 'early termination': function (test) { + test.expect(1); + var xf = transducers.take(1); + _([1, 2, 3]) + .transduce(xf) + .toArray(this.tester([1], test)); + test.done(); + }, + 'noValueOnError': function (test) { + noValueOnErrorTest(_.transduce(this.xf))(test); + }, +}; + exports['concat'] = function (test) { test.expect(2); _.concat([3,4], [1,2]).toArray(function (xs) {