diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 06bde5561683a3..ebc31660cbcd59 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -21,6 +21,10 @@ function WriteReq(chunk, encoding, cb) { this.next = null; } +function _end(stream, cb) { + cb(null); +} + function WritableState(options, stream) { options = options || {}; @@ -106,6 +110,13 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + + // when `state._end()` was called + this.flushed = false; + + // NOTE: added here to not pollute the prototype of the WritableStream and to + // avoid conflicts with user-land methods + this._end = options._end || _end; } WritableState.prototype.getBuffer = function writableStateGetBuffer() { @@ -447,6 +458,7 @@ function needFinish(state) { state.length === 0 && state.bufferedRequest === null && !state.finished && + !state.flushed && !state.writing); } @@ -462,8 +474,14 @@ function finishMaybe(stream, state) { if (need) { if (state.pendingcb === 0) { prefinish(stream, state); - state.finished = true; - stream.emit('finish'); + state.flushed = true; + state._end(stream, function(err) { + if (err) + stream.emit('error', err); + + state.finished = true; + stream.emit('finish'); + }); } else { prefinish(stream, state); } diff --git a/lib/net.js b/lib/net.js index 53095210fdd808..8d144c01097341 100644 --- a/lib/net.js +++ b/lib/net.js @@ -119,6 +119,9 @@ function Socket(options) { stream.Duplex.call(this, options); + // NOTE: do it here to avoid copying `options` + this._writableState._end = _end; + if (options.handle) { this._handle = options.handle; // private } else if (options.fd !== undefined) { @@ -199,32 +202,7 @@ function onSocketFinish() { if (!this._handle || !this._handle.shutdown) return this.destroy(); - var req = new ShutdownWrap(); - req.oncomplete = afterShutdown; - req.handle = this._handle; - var err = this._handle.shutdown(req); - - if (err) - return this._destroy(errnoException(err, 'shutdown')); -} - - -function afterShutdown(status, handle, req) { - var self = handle.owner; - - debug('afterShutdown destroyed=%j', self.destroyed, - self._readableState); - - // callback may come after call to destroy. - if (self.destroyed) - return; - - if (self._readableState.ended) { - debug('readableState ended, destroying'); - self.destroy(); - } else { - self.once('_socketEnd', self.destroy); - } + this.once('_socketEnd', this.destroy); } // the EOF has been received, and no more bytes are coming. @@ -690,6 +668,50 @@ Socket.prototype._write = function(data, encoding, cb) { this._writeGeneric(false, data, encoding, cb); }; +function _end(socket, cb) { + debug('_end'); + + // If still connecting - defer handling 'finish' until 'connect' will happen + if (socket._connecting) { + debug('_end: not yet connected'); + return socket.once('connect', function() { + _end(socket, cb); + }); + } + + if (!socket.readable || socket._readableState.ended) { + debug('_end: not readable or ended'); + return cb(); + } + + // otherwise, just shutdown, or destroy() if not possible + if (!socket._handle || !socket._handle.shutdown) { + debug('_end: no handle or handle does not support shutdown'); + return cb(); + } + + var req = new ShutdownWrap(); + req.oncomplete = afterShutdown; + req.handle = this._handle; + req.flushCb = cb; + var err = socket._handle.shutdown(req); + + if (err) { + debug('_end: errno %s', err); + return cb(errnoException(err, 'shutdown')); + } +} + + +function afterShutdown(status, handle, req) { + var self = handle.owner; + + debug('afterShutdown destroyed=%j', self.destroyed, + self._readableState); + + req.flushCb(); +} + function createWriteReq(req, handle, data, encoding) { switch (encoding) { case 'binary': @@ -864,6 +886,7 @@ Socket.prototype.connect = function(options, cb) { this._readableState.endEmitted = false; this._writableState.ended = false; this._writableState.ending = false; + this._writableState.flushed = false; this._writableState.finished = false; this._writableState.errorEmitted = false; this.destroyed = false; diff --git a/test/parallel/test-stream2-writable.js b/test/parallel/test-stream2-writable.js index 1d87d7f920c052..c1115ccdcbc100 100644 --- a/test/parallel/test-stream2-writable.js +++ b/test/parallel/test-stream2-writable.js @@ -383,3 +383,25 @@ test('finish is emitted if last chunk is empty', function(t) { w.write(Buffer(1)); w.end(Buffer(0)); }); + +test('finish is emitted after shutdown', function(t) { + var w = new W(); + var shutdown = false; + + w._writableState._end = function(stream, cb) { + assert(stream === w); + setTimeout(function() { + shutdown = true; + cb(); + }, 100); + }; + w._write = function(chunk, e, cb) { + process.nextTick(cb); + }; + w.on('finish', function() { + assert(shutdown); + t.end(); + }); + w.write(Buffer(1)); + w.end(Buffer(0)); +});