diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ce15f2..63ffdc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ This file does not aim to be comprehensive (you have git history for that), rather it lists changes that might impact your own code as a consumer of this library. +next +----- +### Bugfix +* `pipe` now properly unbinds its `drain` handler from the destination when it + is done. Previously, there would have been a memory leak if the destination is + long-lived (e.g., as with `process.stdout`). + [#466](https://github.com/caolan/highland/pull/466). + 2.7.2 ----- ### Bugfix diff --git a/lib/index.js b/lib/index.js index 1538843..7812650 100755 --- a/lib/index.js +++ b/lib/index.js @@ -752,9 +752,6 @@ Stream.prototype._send = function (err, x) { //console.log(['_send', this.id, err, x]); var token; - if (x === nil) { - this.ended = true; - } if (this._consumers.length) { token = err ? new StreamError(err) : x; // this._consumers may be changed from under us, @@ -784,6 +781,57 @@ Stream.prototype._send = function (err, x) { this.emit('data', x); } } + + if (x === nil) { + this._onEnd(); + } +}; + + +Stream.prototype._onEnd = function _onEnd() { + if (this.ended) { + return; + } + + this.pause(); + + this.ended = true; + + if (this.source) { + var source = this.source; + source._removeConsumer(this); + source._removeObserver(this); + } + + var i, len; + + // _removeConsumer may modify this._consumers. + var consumers = this._consumers; + for (i = 0, len = consumers.length; i < len; i++) { + this._removeConsumer(consumers[i]); + } + + // Don't use _removeObserver for efficiency reasons. + var observer; + for (i = 0, len = this._observers.length; i < len; i++) { + observer = this._observers[i]; + if (observer.source === this) { + observer.source = null; + } + } + + for (i = 0, len = this._destructors.length; i < len; i++) { + this._destructors[i].call(this); + } + + this.source = null; + this._consumers = []; + this._incoming = []; + this._outgoing = []; + this._delegate = null; + this._generator = null; + this._observers = []; + this._destructors = []; }; /** @@ -1022,23 +1070,11 @@ Stream.prototype.pipe = function (dest, options) { */ Stream.prototype.destroy = function () { - var self = this; - this.end(); - _(this._consumers).each(function (consumer) { - self._removeConsumer(consumer); - }); - _(this._observers).each(function (observer) { - self._removeObserver(observer); - }); - - if (this.source) { - var source = this.source; - source._removeConsumer(this); - source._removeObserver(this); + if (this.ended) { + return; } - _(this._destructors).each(function (destructor) { - destructor.call(self); - }); + this.end(); + this._onEnd(); }; /** diff --git a/test/test.js b/test/test.js index 4fecbb4..7000976 100755 --- a/test/test.js +++ b/test/test.js @@ -1732,6 +1732,32 @@ exports['pipe'] = { clock.restore(); test.ok(!ended, 'The destination should not have been ended.'); test.done(); + }, + 'clean up drain handler when done': function (test) { + test.expect(2); + + var dest = _(); + var boundListener = false; + var unboundListener = false; + + dest.on('newListener', function (ev) { + if (ev === 'drain') { + boundListener = true; + } + }); + + dest.on('removeListener', function (ev) { + if (ev === 'drain') { + unboundListener = true; + } + }); + + _([1, 2, 3]).pipe(dest) + .resume(); + + test.ok(boundListener, 'No drain listener was bound.'); + test.ok(unboundListener, 'No drain listener was unbound.'); + test.done(); } };