Skip to content

Commit f3503ba

Browse files
committed
net: shutdown gracefully
Wait for the `shutdown` request completion before emitting the `finish` event and destroying the socket. While this might not be that relevant in case of plain TCP sockets, the TLS implementation sends close-notify packet during shutdown request. Destroying socket whilst this write is in progress tends to cause ECONNRESET errors in our tests.
1 parent 6e78e5f commit f3503ba

File tree

3 files changed

+91
-28
lines changed

3 files changed

+91
-28
lines changed

lib/_stream_writable.js

+20-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ function WriteReq(chunk, encoding, cb) {
2121
this.next = null;
2222
}
2323

24+
function _end(stream, cb) {
25+
cb(null);
26+
}
27+
2428
function WritableState(options, stream) {
2529
options = options || {};
2630

@@ -106,6 +110,13 @@ function WritableState(options, stream) {
106110

107111
// True if the error was already emitted and should not be thrown again
108112
this.errorEmitted = false;
113+
114+
// when `state._end()` was called
115+
this.flushed = false;
116+
117+
// NOTE: added here to not pollute the prototype of the WritableStream and to
118+
// avoid conflicts with user-land methods
119+
this._end = options._end || _end;
109120
}
110121

111122
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
@@ -447,6 +458,7 @@ function needFinish(state) {
447458
state.length === 0 &&
448459
state.bufferedRequest === null &&
449460
!state.finished &&
461+
!state.flushed &&
450462
!state.writing);
451463
}
452464

@@ -462,8 +474,14 @@ function finishMaybe(stream, state) {
462474
if (need) {
463475
if (state.pendingcb === 0) {
464476
prefinish(stream, state);
465-
state.finished = true;
466-
stream.emit('finish');
477+
state.flushed = true;
478+
state._end(stream, function(err) {
479+
if (err)
480+
stream.emit('error', err);
481+
482+
state.finished = true;
483+
stream.emit('finish');
484+
});
467485
} else {
468486
prefinish(stream, state);
469487
}

lib/net.js

+49-26
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ function Socket(options) {
119119

120120
stream.Duplex.call(this, options);
121121

122+
// NOTE: do it here to avoid copying `options`
123+
this._writableState._end = _end;
124+
122125
if (options.handle) {
123126
this._handle = options.handle; // private
124127
} else if (options.fd !== undefined) {
@@ -199,32 +202,7 @@ function onSocketFinish() {
199202
if (!this._handle || !this._handle.shutdown)
200203
return this.destroy();
201204

202-
var req = new ShutdownWrap();
203-
req.oncomplete = afterShutdown;
204-
req.handle = this._handle;
205-
var err = this._handle.shutdown(req);
206-
207-
if (err)
208-
return this._destroy(errnoException(err, 'shutdown'));
209-
}
210-
211-
212-
function afterShutdown(status, handle, req) {
213-
var self = handle.owner;
214-
215-
debug('afterShutdown destroyed=%j', self.destroyed,
216-
self._readableState);
217-
218-
// callback may come after call to destroy.
219-
if (self.destroyed)
220-
return;
221-
222-
if (self._readableState.ended) {
223-
debug('readableState ended, destroying');
224-
self.destroy();
225-
} else {
226-
self.once('_socketEnd', self.destroy);
227-
}
205+
this.once('_socketEnd', this.destroy);
228206
}
229207

230208
// the EOF has been received, and no more bytes are coming.
@@ -690,6 +668,50 @@ Socket.prototype._write = function(data, encoding, cb) {
690668
this._writeGeneric(false, data, encoding, cb);
691669
};
692670

671+
function _end(socket, cb) {
672+
debug('_end');
673+
674+
// If still connecting - defer handling 'finish' until 'connect' will happen
675+
if (socket._connecting) {
676+
debug('_end: not yet connected');
677+
return socket.once('connect', function() {
678+
_end(socket, cb);
679+
});
680+
}
681+
682+
if (!socket.readable || socket._readableState.ended) {
683+
debug('_end: not readable or ended');
684+
return cb();
685+
}
686+
687+
// otherwise, just shutdown, or destroy() if not possible
688+
if (!socket._handle || !socket._handle.shutdown) {
689+
debug('_end: no handle or handle does not support shutdown');
690+
return cb();
691+
}
692+
693+
var req = new ShutdownWrap();
694+
req.oncomplete = afterShutdown;
695+
req.handle = this._handle;
696+
req.flushCb = cb;
697+
var err = socket._handle.shutdown(req);
698+
699+
if (err) {
700+
debug('_end: errno %s', err);
701+
return cb(errnoException(err, 'shutdown'));
702+
}
703+
}
704+
705+
706+
function afterShutdown(status, handle, req) {
707+
var self = handle.owner;
708+
709+
debug('afterShutdown destroyed=%j', self.destroyed,
710+
self._readableState);
711+
712+
req.flushCb();
713+
}
714+
693715
function createWriteReq(req, handle, data, encoding) {
694716
switch (encoding) {
695717
case 'binary':
@@ -864,6 +886,7 @@ Socket.prototype.connect = function(options, cb) {
864886
this._readableState.endEmitted = false;
865887
this._writableState.ended = false;
866888
this._writableState.ending = false;
889+
this._writableState.flushed = false;
867890
this._writableState.finished = false;
868891
this._writableState.errorEmitted = false;
869892
this.destroyed = false;

test/parallel/test-stream2-writable.js

+22
Original file line numberDiff line numberDiff line change
@@ -383,3 +383,25 @@ test('finish is emitted if last chunk is empty', function(t) {
383383
w.write(Buffer(1));
384384
w.end(Buffer(0));
385385
});
386+
387+
test('finish is emitted after shutdown', function(t) {
388+
var w = new W();
389+
var shutdown = false;
390+
391+
w._writableState._end = function(stream, cb) {
392+
assert(stream === w);
393+
setTimeout(function() {
394+
shutdown = true;
395+
cb();
396+
}, 100);
397+
};
398+
w._write = function(chunk, e, cb) {
399+
process.nextTick(cb);
400+
};
401+
w.on('finish', function() {
402+
assert(shutdown);
403+
t.end();
404+
});
405+
w.write(Buffer(1));
406+
w.end(Buffer(0));
407+
});

0 commit comments

Comments
 (0)