diff --git a/lib/index.js b/lib/index.js index 4c1a8ec..5db316b 100755 --- a/lib/index.js +++ b/lib/index.js @@ -432,31 +432,42 @@ function defaultReadableOnFinish(readable, callback) { function pipeReadable(xs, onFinish, stream) { var cleanup = onFinish(xs, streamEndCb); + var unbound = false; xs.pipe(stream); // TODO: Replace with onDestroy in v3. - stream._destructors.push(function () { - if (cleanup) { - cleanup(); - } - - if (xs.unpipe) { - xs.unpipe(stream); - } - }); + stream._destructors.push(unbind); function streamEndCb(error) { if (stream._nil_pushed) { return; } + unbind(); + if (error) { stream.write(new StreamError(error)); } stream.end(); } + + function unbind() { + if (unbound) { + return; + } + + unbound = true; + + if (cleanup) { + cleanup(); + } + + if (xs.unpipe) { + xs.unpipe(stream); + } + } } function promiseStream(promise) { @@ -605,10 +616,6 @@ function generatorPush(stream, write) { } return function (err, x) { - if (stream._nil_pushed) { - throw new Error('Cannot write to stream after nil'); - } - // This will set _nil_pushed if necessary. write.call(stream, err ? new StreamError(err) : x); }; @@ -1078,6 +1085,11 @@ Stream.prototype.resume = function () { */ Stream.prototype.end = function () { + if (this._nil_pushed) { + // Allow ending multiple times. + return; + } + this.write(nil); }; @@ -1444,6 +1456,10 @@ Stream.prototype.pull = function (f) { */ Stream.prototype.write = function (x) { + if (this._nil_pushed) { + throw new Error('Cannot write to stream after nil'); + } + // The check for _is_consumer is kind of a hack. Not // needed in v3.0. if (x === _.nil && !this._is_consumer) { diff --git a/test/test.js b/test/test.js index 4d4ce79..331c03a 100755 --- a/test/test.js +++ b/test/test.js @@ -657,6 +657,38 @@ exports.constructor = { s.pull(valueEquals(test, _.nil)); test.done(); }, + 'from Readable - unbind and unpipe as soon as possible': function (test) { + var rs = new Stream.Readable(); + rs._read = function (size) { + this.push('1'); + this.push(null); + }; + + var error = new Error('error'); + var s = _(rs); + var rsPipeDest = getReadablePipeDest(s); + var oldWrite = rsPipeDest.write; + + // We need to catch the exception here + // since pipe uses process.nextTick which + // isn't mocked by sinon. + rsPipeDest.write = function (x) { + try { + oldWrite.call(this, x); + } catch (e) { + } + }; + + var write = sinon.spy(rsPipeDest, 'write'); + rs.emit('error', error); + + _.setImmediate(function () { + test.strictEqual(write.callCount, 2); + test.strictEqual(write.args[0][0].error, error); + test.strictEqual(write.args[1][0], _.nil); + test.done(); + }); + }, 'from Readable - custom onFinish handler': function (test) { test.expect(2); var clock = sinon.useFakeTimers(); @@ -666,7 +698,6 @@ exports.constructor = { }; var cleanup = sinon.spy(); - var cleanUpCalled = false; var s = _(rs, function (_rs, callback) { setTimeout(callback, 1000); return cleanup; @@ -712,7 +743,6 @@ exports.constructor = { }; var cleanup = sinon.spy(); - var cleanUpCalled = false; var error = new Error('error'); var s = _(rs, function (_rs, callback) { setTimeout(function () {