Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Better stream.pipe() tracking.
Browse files Browse the repository at this point in the history
This commit does three things:

1. Uses an exposed counter rather than a hidden array for tracking dest
streams that may have multiple inputs.  This allows for significantly
faster lookups, since the counter can be checked in constant time rather
than searching an array for the dest object.  (A proper O(1) WeakMap
would be better, but that may have to wait for Harmony.)

2. Calls the 'end' event logic when there is an 'error' event on the
source object (and then throws if there are no other error listeners.)
This is important, because otherwise 'error' events would lead to
memory leaks.

3. Clean up the style a bit.  Function Declarations are not allowed
within blocks in ES strict.  Prefer Function Declarations to Function
Expressions, because hoisting allows for more expressive ordering of
logic.

Downside: It adds "_pipeCount" as part of the Stream API.  It'll work
fine if the member is missing, but if anyone tries to use it for some
other purpose, it can mess things up.
  • Loading branch information
isaacs committed Apr 28, 2011
1 parent bbffd9e commit 7c6f014
Showing 1 changed file with 43 additions and 25 deletions.
68 changes: 43 additions & 25 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;

var pipes = [];

Stream.prototype.pipe = function(dest, options) {
var source = this;

pipes.push(dest);

function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
Expand All @@ -49,31 +45,48 @@ Stream.prototype.pipe = function(dest, options) {

dest.on('drain', ondrain);

/*
* If the 'end' option is not supplied, dest.end() will be called when
* source gets the 'end' event.
*/

// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once, and
// only when all sources have ended.
if (!options || options.end !== false) {
function onend() {
var index = pipes.indexOf(dest);
pipes.splice(index, 1);
dest._pipeCount = dest._pipeCount || 0;
dest._pipeCount++;

source.on('end', onend);
source.on('close', onend);
}

var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;

dest._pipeCount--;

if (pipes.indexOf(dest) > -1) {
return;
}
// remove the listeners
cleanup();

dest.end();
if (dest._pipeCount > 0) {
// waiting for other incoming streams to end.
return;
}

source.on('end', onend);
source.on('close', onend);
dest.end();
}

// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (this.listeners('error').length === 1) {

This comment has been minimized.

Copy link
@felixge

felixge May 22, 2011

@isaacs Shouldn't this be === 0? The cleanup() call before this is already removing the 'error' listener we attached ourselves.

throw er; // Unhandled stream error in pipe.
}
}

/*
* Questionable:
*/
source.on('error', onerror);
dest.on('error', onerror);

// guarantee that source streams can be paused and resumed, even
// if the only effect is to proxy the event back up the pipe chain.
if (!source.pause) {
source.pause = function() {
source.emit('pause');
Expand All @@ -86,27 +99,32 @@ Stream.prototype.pipe = function(dest, options) {
};
}

var onpause = function() {
function onpause() {
source.pause();
}

dest.on('pause', onpause);

var onresume = function() {
function onresume() {
if (source.readable) source.resume();
};
}

dest.on('resume', onresume);

var cleanup = function () {
// remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);

source.removeListener('end', onend);
source.removeListener('close', onend);

dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume);

source.removeListener('error', onerror);
dest.removeListener('error', onerror);

source.removeListener('end', cleanup);
source.removeListener('close', cleanup);

Expand Down

1 comment on commit 7c6f014

@bolinfest
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this change is responsible for this regression #728 It appears to have been introduced in v0.4.8.

Please sign in to comment.