Skip to content

Commit

Permalink
http: align with stream.Writable
Browse files Browse the repository at this point in the history
Futher aligns OutgoingMessage with stream.Writable. In particular
re-uses the construct/destroy logic from streams.

Due to a lot of subtle assumptions this PR unfortunately touches
a lot of different parts.

PR-URL: nodejs#36816
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag committed Mar 10, 2021
1 parent 38f3238 commit e2f5bb7
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 232 deletions.
52 changes: 32 additions & 20 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToHttpOptions, searchParamsSymbol } = require('internal/url');
const { kOutHeaders, kNeedDrain } = require('internal/http');
const { kOutHeaders } = require('internal/http');
const { connResetException, codes } = require('internal/errors');
const {
ERR_HTTP_HEADERS_SENT,
Expand Down Expand Up @@ -98,7 +98,7 @@ class HTTPClientAsyncResource {
}

function ClientRequest(input, options, cb) {
FunctionPrototypeCall(OutgoingMessage, this);
FunctionPrototypeCall(OutgoingMessage, this, { autoDestroy: false });

if (typeof input === 'string') {
const urlStr = input;
Expand Down Expand Up @@ -298,7 +298,7 @@ function ClientRequest(input, options, cb) {
if (typeof options.createConnection === 'function') {
const oncreate = once((err, socket) => {
if (err) {
process.nextTick(() => this.emit('error', err));
process.nextTick(() => emitError(this, err));
} else {
this.onSocket(socket);
}
Expand Down Expand Up @@ -366,8 +366,8 @@ function emitAbortNT(req) {

function ondrain() {
const msg = this._httpMessage;
if (msg && !msg.finished && msg[kNeedDrain]) {
msg[kNeedDrain] = false;
if (msg && !msg.finished && msg._writableState.needDrain) {
msg._writableState.needDrain = false;
msg.emit('drain');
}
}
Expand All @@ -393,8 +393,7 @@ function socketCloseListener() {
if (!res.complete) {
res.destroy(connResetException('aborted'));
}
req._closed = true;
req.emit('close');
emitClose(req);
if (!res.aborted && res.readable) {
res.push(null);
}
Expand All @@ -404,10 +403,9 @@ function socketCloseListener() {
// receive a response. The error needs to
// fire on the request.
req.socket._hadError = true;
req.emit('error', connResetException('socket hang up'));
emitError(req, connResetException('socket hang up'));
}
req._closed = true;
req.emit('close');
emitClose(req);
}

// Too bad. That output wasn't getting written.
Expand All @@ -431,7 +429,7 @@ function socketErrorListener(err) {
// For Safety. Some additional errors might fire later on
// and we need to make sure we don't double-fire the error event.
req.socket._hadError = true;
req.emit('error', err);
emitError(req, err);
}

const parser = socket.parser;
Expand All @@ -455,7 +453,7 @@ function socketOnEnd() {
// If we don't have a response then we know that the socket
// ended prematurely and we need to emit an error on the request.
req.socket._hadError = true;
req.emit('error', connResetException('socket hang up'));
emitError(req, connResetException('socket hang up'));
}
if (parser) {
parser.finish();
Expand All @@ -478,7 +476,7 @@ function socketOnData(d) {
freeParser(parser, req, socket);
socket.destroy();
req.socket._hadError = true;
req.emit('error', ret);
emitError(req, ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade (if status code 101) or CONNECT
const bytesParsed = ret;
Expand Down Expand Up @@ -510,9 +508,7 @@ function socketOnData(d) {
socket.readableFlowing = null;

req.emit(eventName, res, socket, bodyHead);
req.destroyed = true;
req._closed = true;
req.emit('close');
emitClose(req);
} else {
// Requested Upgrade or used CONNECT method, but have no handler.
socket.destroy();
Expand Down Expand Up @@ -697,8 +693,7 @@ function requestOnPrefinish() {
}

function emitFreeNT(req) {
req._closed = true;
req.emit('close');
emitClose(req);
if (req.socket) {
req.socket.emit('free');
}
Expand Down Expand Up @@ -779,10 +774,10 @@ function onSocketNT(req, socket, err) {
err = connResetException('socket hang up');
}
if (err) {
req.emit('error', err);
emitError(req, err);
}
req._closed = true;
req.emit('close');
emitClose(req);
}

if (socket) {
Expand Down Expand Up @@ -862,6 +857,23 @@ function setSocketTimeout(sock, msecs) {
}
}

function emitError(req, err) {
req.destroyed = true;
req._writableState.errored = err;
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack; // eslint-disable-line no-unused-expressions
req._writableState.errorEmitted = true;
req.emit('error', err);
}

function emitClose(req) {
req.destroyed = true;
req._closed = true;
req._writableState.closed = true;
req._writableState.closeEmitted = true;
req.emit('close');
}

ClientRequest.prototype.setNoDelay = function setNoDelay(noDelay) {
this._deferToConnect('setNoDelay', [noDelay]);
};
Expand Down
Loading

0 comments on commit e2f5bb7

Please sign in to comment.