Skip to content

Commit

Permalink
Unbind/unpipe from readable asap for laziness.
Browse files Browse the repository at this point in the history
  • Loading branch information
vqvu committed Jul 5, 2016
1 parent af3cfe5 commit 84eade6
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 15 deletions.
42 changes: 29 additions & 13 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -432,31 +432,42 @@ function defaultReadableOnFinish(readable, callback) {

function pipeReadable(xs, onFinish, stream) {
var cleanup = onFinish(xs, streamEndCb);
var unbound = false;

xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
stream._destructors.push(function () {
if (cleanup) {
cleanup();
}

if (xs.unpipe) {
xs.unpipe(stream);
}
});
stream._destructors.push(unbind);

function streamEndCb(error) {
if (stream._nil_pushed) {
return;
}

unbind();

if (error) {
stream.write(new StreamError(error));
}

stream.end();
}

function unbind() {
if (unbound) {
return;
}

unbound = true;

if (cleanup) {
cleanup();
}

if (xs.unpipe) {
xs.unpipe(stream);
}
}
}

function promiseStream(promise) {
Expand Down Expand Up @@ -605,10 +616,6 @@ function generatorPush(stream, write) {
}

return function (err, x) {
if (stream._nil_pushed) {
throw new Error('Cannot write to stream after nil');
}

// This will set _nil_pushed if necessary.
write.call(stream, err ? new StreamError(err) : x);
};
Expand Down Expand Up @@ -1078,6 +1085,11 @@ Stream.prototype.resume = function () {
*/

Stream.prototype.end = function () {
if (this._nil_pushed) {
// Allow ending multiple times.
return;
}

this.write(nil);
};

Expand Down Expand Up @@ -1444,6 +1456,10 @@ Stream.prototype.pull = function (f) {
*/

Stream.prototype.write = function (x) {
if (this._nil_pushed) {
throw new Error('Cannot write to stream after nil');
}

// The check for _is_consumer is kind of a hack. Not
// needed in v3.0.
if (x === _.nil && !this._is_consumer) {
Expand Down
34 changes: 32 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,38 @@ exports.constructor = {
s.pull(valueEquals(test, _.nil));
test.done();
},
'from Readable - unbind and unpipe as soon as possible': function (test) {
var rs = new Stream.Readable();
rs._read = function (size) {
this.push('1');
this.push(null);
};

var error = new Error('error');
var s = _(rs);
var rsPipeDest = getReadablePipeDest(s);
var oldWrite = rsPipeDest.write;

// We need to catch the exception here
// since pipe uses process.nextTick which
// isn't mocked by sinon.
rsPipeDest.write = function (x) {
try {
oldWrite.call(this, x);
} catch (e) {
}
};

var write = sinon.spy(rsPipeDest, 'write');
rs.emit('error', error);

_.setImmediate(function () {
test.strictEqual(write.callCount, 2);
test.strictEqual(write.args[0][0].error, error);
test.strictEqual(write.args[1][0], _.nil);
test.done();
});
},
'from Readable - custom onFinish handler': function (test) {
test.expect(2);
var clock = sinon.useFakeTimers();
Expand All @@ -666,7 +698,6 @@ exports.constructor = {
};

var cleanup = sinon.spy();
var cleanUpCalled = false;
var s = _(rs, function (_rs, callback) {
setTimeout(callback, 1000);
return cleanup;
Expand Down Expand Up @@ -712,7 +743,6 @@ exports.constructor = {
};

var cleanup = sinon.spy();
var cleanUpCalled = false;
var error = new Error('error');
var s = _(rs, function (_rs, callback) {
setTimeout(function () {
Expand Down

0 comments on commit 84eade6

Please sign in to comment.