Skip to content

Commit

Permalink
Merge pull request #466 from vqvu/pipe-memory-leak
Browse files Browse the repository at this point in the history
Fix possible memory leak in pipe.
  • Loading branch information
vqvu committed Mar 31, 2016
2 parents 7454109 + 63dfb17 commit e7b1bf8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 19 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ This file does not aim to be comprehensive (you have git history for that),
rather it lists changes that might impact your own code as a consumer of
this library.

next
-----
### Bugfix
* `pipe` now properly unbinds its `drain` handler from the destination when it
is done. Previously, there would have been a memory leak if the destination is
long-lived (e.g., as with `process.stdout`).
[#466](https://github.com/caolan/highland/pull/466).

2.7.2
-----
### Bugfix
Expand Down
74 changes: 55 additions & 19 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,6 @@ Stream.prototype._send = function (err, x) {
//console.log(['_send', this.id, err, x]);
var token;

if (x === nil) {
this.ended = true;
}
if (this._consumers.length) {
token = err ? new StreamError(err) : x;
// this._consumers may be changed from under us,
Expand Down Expand Up @@ -784,6 +781,57 @@ Stream.prototype._send = function (err, x) {
this.emit('data', x);
}
}

if (x === nil) {
this._onEnd();
}
};


Stream.prototype._onEnd = function _onEnd() {
if (this.ended) {
return;
}

this.pause();

this.ended = true;

if (this.source) {
var source = this.source;
source._removeConsumer(this);
source._removeObserver(this);
}

var i, len;

// _removeConsumer may modify this._consumers.
var consumers = this._consumers;
for (i = 0, len = consumers.length; i < len; i++) {
this._removeConsumer(consumers[i]);
}

// Don't use _removeObserver for efficiency reasons.
var observer;
for (i = 0, len = this._observers.length; i < len; i++) {
observer = this._observers[i];
if (observer.source === this) {
observer.source = null;
}
}

for (i = 0, len = this._destructors.length; i < len; i++) {
this._destructors[i].call(this);
}

this.source = null;
this._consumers = [];
this._incoming = [];
this._outgoing = [];
this._delegate = null;
this._generator = null;
this._observers = [];
this._destructors = [];
};

/**
Expand Down Expand Up @@ -1022,23 +1070,11 @@ Stream.prototype.pipe = function (dest, options) {
*/

Stream.prototype.destroy = function () {
var self = this;
this.end();
_(this._consumers).each(function (consumer) {
self._removeConsumer(consumer);
});
_(this._observers).each(function (observer) {
self._removeObserver(observer);
});

if (this.source) {
var source = this.source;
source._removeConsumer(this);
source._removeObserver(this);
if (this.ended) {
return;
}
_(this._destructors).each(function (destructor) {
destructor.call(self);
});
this.end();
this._onEnd();
};

/**
Expand Down
26 changes: 26 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,32 @@ exports['pipe'] = {
clock.restore();
test.ok(!ended, 'The destination should not have been ended.');
test.done();
},
'clean up drain handler when done': function (test) {
test.expect(2);

var dest = _();
var boundListener = false;
var unboundListener = false;

dest.on('newListener', function (ev) {
if (ev === 'drain') {
boundListener = true;
}
});

dest.on('removeListener', function (ev) {
if (ev === 'drain') {
unboundListener = true;
}
});

_([1, 2, 3]).pipe(dest)
.resume();

test.ok(boundListener, 'No drain listener was bound.');
test.ok(unboundListener, 'No drain listener was unbound.');
test.done();
}
};

Expand Down

0 comments on commit e7b1bf8

Please sign in to comment.