Skip to content

Commit

Permalink
Merge pull request #261 from vqvu/update-transformer-protocol
Browse files Browse the repository at this point in the history
Update to latest transformer protocol & use init (Resolves #260).
  • Loading branch information
vqvu committed Apr 5, 2015
2 parents 8487ca3 + 10f6699 commit d34957e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 28 deletions.
55 changes: 35 additions & 20 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3150,17 +3150,23 @@ Stream.prototype.scan1 = function (f) {
};
exposeMethod('scan1');

var highlandTransform = {
init: function () { },
result: function (push) {
// Don't push nil here. Otherwise, we can't catch errors from `result`
// and propagate them. The `transduce` implementation will do it.
return push;
},
step: function (push, input) {
push(null, input);
return push;
}
function HighlandTransform(push) {
this.push = push;
}

HighlandTransform.prototype['@@transducer/init'] = function () {
return this.push;
};

HighlandTransform.prototype['@@transducer/result'] = function (push) {
// Don't push nil here. Otherwise, we can't catch errors from `result`
// and propagate them. The `transduce` implementation will do it.
return push;
};

HighlandTransform.prototype['@@transducer/step'] = function (push, input) {
push(null, input);
return push;
};

/**
Expand Down Expand Up @@ -3191,46 +3197,55 @@ var highlandTransform = {
*/

Stream.prototype.transduce = function transduce(xf) {
var transform = xf(highlandTransform);
var transform = null,
memo = null;

return this.consume(function (err, x, push, next) {
if (transform == null) {
transform = xf(new HighlandTransform(push));
memo = transform['@@transducer/init']();
}

if (err) {
// Pass through errors, like we always do.
push(err);
next();
}
else if (x === _.nil) {
runResult(push);
// Push may be different from memo depending on the transducer that
// we get.
runResult(push, memo);
}
else {
var res = runStep(push, x);
var res = runStep(push, memo, x);

if (!res) {
return;
}

if (res.__transducers_reduced__) {
runResult(res.value);
memo = res;
if (memo['@@transducer/reduced']) {
runResult(memo['@@transducer/value']);
}
else {
next();
}
}
});

function runResult(push) {
function runResult(push, memo) {
try {
transform.result(push);
transform['@@transducer/result'](memo);
}
catch (e) {
push(e);
}
push(null, _.nil);
}

function runStep(push, x) {
function runStep(push, memo, x) {
try {
return transform.step(push, x);
return transform['@@transducer/step'](memo, x);
}
catch (e) {
push(e);
Expand Down
56 changes: 48 additions & 8 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2381,13 +2381,13 @@ exports['transduce'] = {

function xf(transform) {
return {
init: transform.init.bind(transform),
result: transform.result.bind(transform),
step: function (result, x) {
'@@transducer/init': transform['@@transducer/init'].bind(transform),
'@@transducer/result': transform['@@transducer/result'].bind(transform),
'@@transducer/step': function (result, x) {
if (x === 2) {
throw new Error('error');
}
result = transform.step(result, x);
result = transform['@@transducer/step'](result, x);
return result;
}
};
Expand All @@ -2406,12 +2406,12 @@ exports['transduce'] = {

function xf(transform) {
return {
init: transform.init.bind(transform),
result: function (result) {
transform.result(result);
'@@transducer/init': transform['@@transducer/init'].bind(transform),
'@@transducer/result': function (result) {
transform['@@transducer/result'](result);
throw new Error('error');
},
step: transform.step.bind(transform)
'@@transducer/step': transform['@@transducer/step'].bind(transform)
};
}
},
Expand All @@ -2423,6 +2423,46 @@ exports['transduce'] = {
.toArray(this.tester([1], test));
test.done();
},
'wrapped memo': function (test) {
test.expect(2);
_(this.input)
.transduce(transducers.comp(this.xf, wrap))
.toArray(this.tester(this.expected, test));

_(this.input)
.transduce(transducers.comp(wrap, this.xf))
.toArray(this.tester(this.expected, test));
test.done();

function wrap(transform) {
return {
'@@transducer/init': function () {
return wrapMemo(transform['@@transducer/init']());
},
'@@transducer/result': function (result) {
return wrapMemo(transform['@@transducer/result'](result.memo));
},
'@@transducer/step': function (result, x) {
var res = transform['@@transducer/step'](result.memo, x);
if (res['@@transducer/reduced']) {
return {
'@@transducer/reduced': true,
'@@transducer/value': wrapMemo(res['@transducer/value'])
};
}
else {
return wrapMemo(res);
}
}
};
}

function wrapMemo(x) {
return {
memo: x
};
}
},
'noValueOnError': function (test) {
noValueOnErrorTest(_.transduce(this.xf))(test);
}
Expand Down

0 comments on commit d34957e

Please sign in to comment.