From b03723236b711ac90629561050f86c9e62f9526a Mon Sep 17 00:00:00 2001 From: Ashok Date: Sun, 18 Mar 2018 13:05:47 +0530 Subject: [PATCH 01/14] lib: merge stream handling code for http2 streams & net.Socket --- lib/internal/http2/core.js | 36 +++----------- lib/internal/streams/stream_base.js | 74 +++++++++++++++++++++++++++++ lib/net.js | 66 +++---------------------- node.gyp | 1 + 4 files changed, 87 insertions(+), 90 deletions(-) create mode 100644 lib/internal/streams/stream_base.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 9978df87d6d58d..287d9b9f023e1c 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -12,7 +12,6 @@ const binding = process.binding('http2'); const { FileHandle } = process.binding('fs'); const { StreamPipe } = internalBinding('stream_pipe'); const assert = require('assert'); -const { Buffer } = require('buffer'); const EventEmitter = require('events'); const net = require('net'); const tls = require('tls'); @@ -107,6 +106,7 @@ const { validateTimerDuration, refreshFnSymbol } = require('internal/timers'); +const StreamBase = require('internal/streams/stream_base'); const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); const { constants, nameForErrorCode } = binding; @@ -1426,28 +1426,6 @@ class ClientHttp2Session extends Http2Session { } } -function createWriteReq(req, handle, data, encoding) { - switch (encoding) { - case 'utf8': - case 'utf-8': - return handle.writeUtf8String(req, data); - case 'ascii': - return handle.writeAsciiString(req, data); - case 'ucs2': - case 'ucs-2': - case 'utf16le': - case 'utf-16le': - return handle.writeUcs2String(req, data); - case 'latin1': - case 'binary': - return handle.writeLatin1String(req, data); - case 'buffer': - return handle.writeBuffer(req, data); - default: - return handle.writeBuffer(req, Buffer.from(data, encoding)); - } -} - function trackWriteState(stream, bytes) { const session = stream[kSession]; stream[kState].writeQueueSize += bytes; @@ -1671,16 +1649,13 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); + const streamId = this[kID]; const handle = this[kHandle]; const req = new WriteWrap(); - req.stream = this[kID]; - req.handle = handle; req.callback = cb; - req.oncomplete = afterDoStreamWrite; - req.async = false; - const err = createWriteReq(req, handle, data, encoding); - if (err) - return this.destroy(errnoException(err, 'write', req.error), cb); + this.createWriteWrap(req, streamId, handle, afterDoStreamWrite); + this.writeGeneric(req, null, data, encoding, cb); + trackWriteState(this, req.bytes); } @@ -1908,6 +1883,7 @@ class Http2Stream extends Duplex { } } } +util._extend(Http2Stream.prototype, StreamBase); function processHeaders(headers) { assertIsObject(headers, 'headers'); diff --git a/lib/internal/streams/stream_base.js b/lib/internal/streams/stream_base.js new file mode 100644 index 00000000000000..331f5f7c4bb009 --- /dev/null +++ b/lib/internal/streams/stream_base.js @@ -0,0 +1,74 @@ +'use strict'; + +const { Buffer } = require('buffer'); +const errors = require('internal/errors'); + +const errnoException = errors.errnoException; + +const StreamBaseMethods = { + createWriteWrap(req, streamId, handle, oncomplete) { + if (streamId) + req.stream = streamId; + + req.handle = handle; + req.oncomplete = oncomplete; + req.async = false; + }, + + writeGeneric(req, writev, data, encoding, cb) { + const { handle, error } = req; + let err; + + if (writev) { + const allBuffers = data.allBuffers; + let chunks; + let i; + if (allBuffers) { + chunks = data; + for (i = 0; i < data.length; i++) + data[i] = data[i].chunk; + } else { + chunks = new Array(data.length << 1); + for (i = 0; i < data.length; i++) { + const entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; + } + } + err = this._handle.writev(req, chunks, allBuffers); + + // Retain chunks + if (err === 0) req._chunks = chunks; + } else { + err = this._handleWriteReq(req, handle, data, encoding); + } + + if (err) { + return this.destroy(errnoException(err, 'write', error), cb); + } + }, + + _handleWriteReq(req, handle, data, encoding) { + switch (encoding) { + case 'buffer': + return handle.writeBuffer(req, data); + case 'latin1': + case 'binary': + return handle.writeLatin1String(req, data); + case 'utf8': + case 'utf-8': + return handle.writeUtf8String(req, data); + case 'ascii': + return handle.writeAsciiString(req, data); + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + return handle.writeUcs2String(req, data); + default: + return handle.writeBuffer(req, Buffer.from(data, encoding)); + } + } +}; + +module.exports = StreamBaseMethods; diff --git a/lib/net.js b/lib/net.js index e3cd8559b98e90..e63ae98821456f 100644 --- a/lib/net.js +++ b/lib/net.js @@ -52,6 +52,7 @@ const { defaultTriggerAsyncIdScope, symbols: { async_id_symbol } } = require('internal/async_hooks'); +const StreamBase = require('internal/streams/stream_base'); const errors = require('internal/errors'); const { ERR_INVALID_ADDRESS_FAMILY, @@ -308,6 +309,7 @@ function Socket(options) { this[BYTES_READ] = 0; } util.inherits(Socket, stream.Duplex); +util._extend(Socket.prototype, StreamBase); // Refresh existing timeouts. Socket.prototype._unrefTimer = function _unrefTimer() { @@ -740,38 +742,10 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { return false; } - var req = new WriteWrap(); - req.handle = this._handle; - req.oncomplete = afterWrite; - req.async = false; - var err; - - if (writev) { - var allBuffers = data.allBuffers; - var chunks; - var i; - if (allBuffers) { - chunks = data; - for (i = 0; i < data.length; i++) - data[i] = data[i].chunk; - } else { - chunks = new Array(data.length << 1); - for (i = 0; i < data.length; i++) { - var entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } - } - err = this._handle.writev(req, chunks, allBuffers); - - // Retain chunks - if (err === 0) req._chunks = chunks; - } else { - err = createWriteReq(req, this._handle, data, encoding); - } - - if (err) - return this.destroy(errnoException(err, 'write', req.error), cb); + const req = new WriteWrap(); + const handle = this._handle; + this.createWriteWrap(req, null, handle, afterWrite); + this.writeGeneric(req, writev, data, encoding, cb); this._bytesDispatched += req.bytes; @@ -794,34 +768,6 @@ Socket.prototype._write = function(data, encoding, cb) { this._writeGeneric(false, data, encoding, cb); }; -function createWriteReq(req, handle, data, encoding) { - switch (encoding) { - case 'latin1': - case 'binary': - return handle.writeLatin1String(req, data); - - case 'buffer': - return handle.writeBuffer(req, data); - - case 'utf8': - case 'utf-8': - return handle.writeUtf8String(req, data); - - case 'ascii': - return handle.writeAsciiString(req, data); - - case 'ucs2': - case 'ucs-2': - case 'utf16le': - case 'utf-16le': - return handle.writeUcs2String(req, data); - - default: - return handle.writeBuffer(req, Buffer.from(data, encoding)); - } -} - - protoGetter('bytesWritten', function bytesWritten() { var bytes = this._bytesDispatched; const state = this._writableState; diff --git a/node.gyp b/node.gyp index 91d57631d6d55e..3585e4c4db26b8 100644 --- a/node.gyp +++ b/node.gyp @@ -145,6 +145,7 @@ 'lib/internal/v8_prof_processor.js', 'lib/internal/vm/Module.js', 'lib/internal/streams/lazy_transform.js', + 'lib/internal/streams/stream_base.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/BufferList.js', 'lib/internal/streams/duplex_base.js', From efa0f4f81d907a9e3ae09d9185ac9297bfbbf385 Mon Sep 17 00:00:00 2001 From: Ashok Date: Wed, 21 Mar 2018 15:51:46 +0530 Subject: [PATCH 02/14] lib: separate writev responsibilities from writeGeneric --- lib/internal/http2/core.js | 2 +- lib/internal/streams/stream_base.js | 52 ++++++++++++++++------------- lib/net.js | 3 +- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 287d9b9f023e1c..423410d3da78c7 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1654,7 +1654,7 @@ class Http2Stream extends Duplex { const req = new WriteWrap(); req.callback = cb; this.createWriteWrap(req, streamId, handle, afterDoStreamWrite); - this.writeGeneric(req, null, data, encoding, cb); + this.writeGeneric(req, data, encoding, cb); trackWriteState(this, req.bytes); } diff --git a/lib/internal/streams/stream_base.js b/lib/internal/streams/stream_base.js index 331f5f7c4bb009..5e45ccb52b6eea 100644 --- a/lib/internal/streams/stream_base.js +++ b/lib/internal/streams/stream_base.js @@ -15,33 +15,37 @@ const StreamBaseMethods = { req.async = false; }, - writeGeneric(req, writev, data, encoding, cb) { - const { handle, error } = req; - let err; - - if (writev) { - const allBuffers = data.allBuffers; - let chunks; - let i; - if (allBuffers) { - chunks = data; - for (i = 0; i < data.length; i++) - data[i] = data[i].chunk; - } else { - chunks = new Array(data.length << 1); - for (i = 0; i < data.length; i++) { - const entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } + writevGeneric(req, data, cb) { + const { error } = req; + const allBuffers = data.allBuffers; + let chunks; + let i; + if (allBuffers) { + chunks = data; + for (i = 0; i < data.length; i++) + data[i] = data[i].chunk; + } else { + chunks = new Array(data.length << 1); + for (i = 0; i < data.length; i++) { + const entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; } - err = this._handle.writev(req, chunks, allBuffers); + } + const err = this._handle.writev(req, chunks, allBuffers); - // Retain chunks - if (err === 0) req._chunks = chunks; - } else { - err = this._handleWriteReq(req, handle, data, encoding); + // Retain chunks + if (err === 0) req._chunks = chunks; + + if (err) { + return this.destroy(errnoException(err, 'write', error), cb); } + }, + + writeGeneric(req, data, encoding, cb) { + const { handle, error } = req; + + const err = this._handleWriteReq(req, handle, data, encoding); if (err) { return this.destroy(errnoException(err, 'write', error), cb); diff --git a/lib/net.js b/lib/net.js index e63ae98821456f..b898c5b192aca9 100644 --- a/lib/net.js +++ b/lib/net.js @@ -745,7 +745,8 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { const req = new WriteWrap(); const handle = this._handle; this.createWriteWrap(req, null, handle, afterWrite); - this.writeGeneric(req, writev, data, encoding, cb); + if (writev) this.writevGeneric(req, data, cb); + else this.writeGeneric(req, data, encoding, cb); this._bytesDispatched += req.bytes; From 19aedde5c34a7fe01bf8255ba54930ed7fe4c5f2 Mon Sep 17 00:00:00 2001 From: Ashok Date: Wed, 21 Mar 2018 21:18:21 +0530 Subject: [PATCH 03/14] lib: fix calling of cb twice --- lib/internal/streams/stream_base.js | 2 ++ lib/net.js | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/stream_base.js b/lib/internal/streams/stream_base.js index 5e45ccb52b6eea..070719550022f3 100644 --- a/lib/internal/streams/stream_base.js +++ b/lib/internal/streams/stream_base.js @@ -40,6 +40,7 @@ const StreamBaseMethods = { if (err) { return this.destroy(errnoException(err, 'write', error), cb); } + return null; }, writeGeneric(req, data, encoding, cb) { @@ -50,6 +51,7 @@ const StreamBaseMethods = { if (err) { return this.destroy(errnoException(err, 'write', error), cb); } + return null; }, _handleWriteReq(req, handle, data, encoding) { diff --git a/lib/net.js b/lib/net.js index b898c5b192aca9..c114ed46a1c25e 100644 --- a/lib/net.js +++ b/lib/net.js @@ -742,11 +742,14 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { return false; } + let ret; const req = new WriteWrap(); const handle = this._handle; this.createWriteWrap(req, null, handle, afterWrite); - if (writev) this.writevGeneric(req, data, cb); - else this.writeGeneric(req, data, encoding, cb); + if (writev) ret = this.writevGeneric(req, data, cb); + else ret = this.writeGeneric(req, data, encoding, cb); + + if (ret) return ret; this._bytesDispatched += req.bytes; From 5c8eb7506b86857d102e5f5dcf5651855b2694e0 Mon Sep 17 00:00:00 2001 From: Ashok Date: Thu, 22 Mar 2018 18:18:45 +0530 Subject: [PATCH 04/14] lib: extract streamId out of stream_base to caller --- lib/internal/http2/core.js | 4 ++-- lib/internal/streams/stream_base.js | 5 +---- lib/net.js | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 423410d3da78c7..92b28d725484e4 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1649,11 +1649,11 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const streamId = this[kID]; const handle = this[kHandle]; const req = new WriteWrap(); + req.stream = this[kID]; req.callback = cb; - this.createWriteWrap(req, streamId, handle, afterDoStreamWrite); + this.createWriteWrap(req, handle, afterDoStreamWrite); this.writeGeneric(req, data, encoding, cb); trackWriteState(this, req.bytes); diff --git a/lib/internal/streams/stream_base.js b/lib/internal/streams/stream_base.js index 070719550022f3..33c39eed505531 100644 --- a/lib/internal/streams/stream_base.js +++ b/lib/internal/streams/stream_base.js @@ -6,10 +6,7 @@ const errors = require('internal/errors'); const errnoException = errors.errnoException; const StreamBaseMethods = { - createWriteWrap(req, streamId, handle, oncomplete) { - if (streamId) - req.stream = streamId; - + createWriteWrap(req, handle, oncomplete) { req.handle = handle; req.oncomplete = oncomplete; req.async = false; diff --git a/lib/net.js b/lib/net.js index c114ed46a1c25e..3b4fe0cd9d4f56 100644 --- a/lib/net.js +++ b/lib/net.js @@ -745,7 +745,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { let ret; const req = new WriteWrap(); const handle = this._handle; - this.createWriteWrap(req, null, handle, afterWrite); + this.createWriteWrap(req, handle, afterWrite); if (writev) ret = this.writevGeneric(req, data, cb); else ret = this.writeGeneric(req, data, encoding, cb); From 92e4f71768311c790eff811da738bb1b7972b177 Mon Sep 17 00:00:00 2001 From: Ashok Date: Thu, 22 Mar 2018 20:34:36 +0530 Subject: [PATCH 05/14] lib: add symbols instead of methods to hide impl details --- lib/internal/http2/core.js | 12 ++-- lib/internal/stream_base.js | 96 +++++++++++++++++++++++++++++ lib/internal/streams/stream_base.js | 77 ----------------------- lib/net.js | 17 +++-- node.gyp | 2 +- 5 files changed, 117 insertions(+), 87 deletions(-) create mode 100644 lib/internal/stream_base.js delete mode 100644 lib/internal/streams/stream_base.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 92b28d725484e4..707be11c356e6f 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -106,7 +106,11 @@ const { validateTimerDuration, refreshFnSymbol } = require('internal/timers'); -const StreamBase = require('internal/streams/stream_base'); +const { + kCreateWriteWrap, + kWriteGeneric, + StreamBase +} = require('internal/stream_base'); const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); const { constants, nameForErrorCode } = binding; @@ -1653,8 +1657,8 @@ class Http2Stream extends Duplex { const req = new WriteWrap(); req.stream = this[kID]; req.callback = cb; - this.createWriteWrap(req, handle, afterDoStreamWrite); - this.writeGeneric(req, data, encoding, cb); + this[kCreateWriteWrap](req, handle, afterDoStreamWrite); + this[kWriteGeneric](req, data, encoding, cb); trackWriteState(this, req.bytes); } @@ -1883,7 +1887,7 @@ class Http2Stream extends Duplex { } } } -util._extend(Http2Stream.prototype, StreamBase); +StreamBase.apply(Http2Stream.prototype); function processHeaders(headers) { assertIsObject(headers, 'headers'); diff --git a/lib/internal/stream_base.js b/lib/internal/stream_base.js new file mode 100644 index 00000000000000..cabc4903af28a4 --- /dev/null +++ b/lib/internal/stream_base.js @@ -0,0 +1,96 @@ +'use strict'; + +const { Buffer } = require('buffer'); +const errors = require('internal/errors'); + +const errnoException = errors.errnoException; + +const kCreateWriteWrap = Symbol('createWriteWrap'); +const kWritevGeneric = Symbol('writevGeneric'); +const kWriteGeneric = Symbol('writeGeneric'); + +function _handleWriteReq(req, handle, data, encoding) { + switch (encoding) { + case 'buffer': + return handle.writeBuffer(req, data); + case 'latin1': + case 'binary': + return handle.writeLatin1String(req, data); + case 'utf8': + case 'utf-8': + return handle.writeUtf8String(req, data); + case 'ascii': + return handle.writeAsciiString(req, data); + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + return handle.writeUcs2String(req, data); + default: + return handle.writeBuffer(req, Buffer.from(data, encoding)); + } +} + +const StreamBase = { + [kCreateWriteWrap](req, handle, oncomplete) { + req.handle = handle; + req.oncomplete = oncomplete; + req.async = false; + }, + + [kWritevGeneric](req, data, cb) { + const { error } = req; + const allBuffers = data.allBuffers; + let chunks; + let i; + if (allBuffers) { + chunks = data; + for (i = 0; i < data.length; i++) + data[i] = data[i].chunk; + } else { + chunks = new Array(data.length << 1); + for (i = 0; i < data.length; i++) { + const entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; + } + } + const err = this._handle.writev(req, chunks, allBuffers); + + // Retain chunks + if (err === 0) req._chunks = chunks; + + if (err) { + return this.destroy(errnoException(err, 'write', error), cb); + } + return null; + }, + + [kWriteGeneric](req, data, encoding, cb) { + const { handle, error } = req; + + const err = _handleWriteReq(req, handle, data, encoding); + + if (err) { + return this.destroy(errnoException(err, 'write', error), cb); + } + return null; + }, + + apply(obj) { + [ + kCreateWriteWrap, + kWritevGeneric, + kWriteGeneric + ].forEach(function(sym) { + obj[sym] = StreamBase[sym]; + }); + } +}; + +module.exports = { + kCreateWriteWrap, + kWritevGeneric, + kWriteGeneric, + StreamBase +}; diff --git a/lib/internal/streams/stream_base.js b/lib/internal/streams/stream_base.js deleted file mode 100644 index 33c39eed505531..00000000000000 --- a/lib/internal/streams/stream_base.js +++ /dev/null @@ -1,77 +0,0 @@ -'use strict'; - -const { Buffer } = require('buffer'); -const errors = require('internal/errors'); - -const errnoException = errors.errnoException; - -const StreamBaseMethods = { - createWriteWrap(req, handle, oncomplete) { - req.handle = handle; - req.oncomplete = oncomplete; - req.async = false; - }, - - writevGeneric(req, data, cb) { - const { error } = req; - const allBuffers = data.allBuffers; - let chunks; - let i; - if (allBuffers) { - chunks = data; - for (i = 0; i < data.length; i++) - data[i] = data[i].chunk; - } else { - chunks = new Array(data.length << 1); - for (i = 0; i < data.length; i++) { - const entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } - } - const err = this._handle.writev(req, chunks, allBuffers); - - // Retain chunks - if (err === 0) req._chunks = chunks; - - if (err) { - return this.destroy(errnoException(err, 'write', error), cb); - } - return null; - }, - - writeGeneric(req, data, encoding, cb) { - const { handle, error } = req; - - const err = this._handleWriteReq(req, handle, data, encoding); - - if (err) { - return this.destroy(errnoException(err, 'write', error), cb); - } - return null; - }, - - _handleWriteReq(req, handle, data, encoding) { - switch (encoding) { - case 'buffer': - return handle.writeBuffer(req, data); - case 'latin1': - case 'binary': - return handle.writeLatin1String(req, data); - case 'utf8': - case 'utf-8': - return handle.writeUtf8String(req, data); - case 'ascii': - return handle.writeAsciiString(req, data); - case 'ucs2': - case 'ucs-2': - case 'utf16le': - case 'utf-16le': - return handle.writeUcs2String(req, data); - default: - return handle.writeBuffer(req, Buffer.from(data, encoding)); - } - } -}; - -module.exports = StreamBaseMethods; diff --git a/lib/net.js b/lib/net.js index 3b4fe0cd9d4f56..35197f1359c51a 100644 --- a/lib/net.js +++ b/lib/net.js @@ -52,7 +52,12 @@ const { defaultTriggerAsyncIdScope, symbols: { async_id_symbol } } = require('internal/async_hooks'); -const StreamBase = require('internal/streams/stream_base'); +const { + kCreateWriteWrap, + kWritevGeneric, + kWriteGeneric, + StreamBase +} = require('internal/stream_base'); const errors = require('internal/errors'); const { ERR_INVALID_ADDRESS_FAMILY, @@ -309,7 +314,7 @@ function Socket(options) { this[BYTES_READ] = 0; } util.inherits(Socket, stream.Duplex); -util._extend(Socket.prototype, StreamBase); +StreamBase.apply(Socket.prototype); // Refresh existing timeouts. Socket.prototype._unrefTimer = function _unrefTimer() { @@ -745,9 +750,11 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { let ret; const req = new WriteWrap(); const handle = this._handle; - this.createWriteWrap(req, handle, afterWrite); - if (writev) ret = this.writevGeneric(req, data, cb); - else ret = this.writeGeneric(req, data, encoding, cb); + this[kCreateWriteWrap](req, handle, afterWrite); + if (writev) + ret = this[kWritevGeneric](req, data, cb); + else + ret = this[kWriteGeneric](req, data, encoding, cb); if (ret) return ret; diff --git a/node.gyp b/node.gyp index 3585e4c4db26b8..dcdd010cf4d86b 100644 --- a/node.gyp +++ b/node.gyp @@ -144,8 +144,8 @@ 'lib/internal/v8_prof_polyfill.js', 'lib/internal/v8_prof_processor.js', 'lib/internal/vm/Module.js', + 'lib/internal/stream_base.js', 'lib/internal/streams/lazy_transform.js', - 'lib/internal/streams/stream_base.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/BufferList.js', 'lib/internal/streams/duplex_base.js', From babe8584d317548d9ea821bb93be3be5cbc550a1 Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 09:50:54 +0530 Subject: [PATCH 06/14] lib: remove unneeded lines --- lib/internal/stream_base.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/internal/stream_base.js b/lib/internal/stream_base.js index cabc4903af28a4..f0ac0a4e50ecb9 100644 --- a/lib/internal/stream_base.js +++ b/lib/internal/stream_base.js @@ -60,10 +60,8 @@ const StreamBase = { // Retain chunks if (err === 0) req._chunks = chunks; - if (err) { + if (err) return this.destroy(errnoException(err, 'write', error), cb); - } - return null; }, [kWriteGeneric](req, data, encoding, cb) { @@ -71,10 +69,8 @@ const StreamBase = { const err = _handleWriteReq(req, handle, data, encoding); - if (err) { + if (err) return this.destroy(errnoException(err, 'write', error), cb); - } - return null; }, apply(obj) { From 27818c4a4dd040577ada5ddabb153f513e287135 Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 10:15:24 +0530 Subject: [PATCH 07/14] lib: use Object.assign instead of apply --- lib/internal/http2/core.js | 2 +- lib/internal/stream_base.js | 10 ---------- lib/net.js | 2 +- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 707be11c356e6f..223f1dbc4bdded 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1887,7 +1887,7 @@ class Http2Stream extends Duplex { } } } -StreamBase.apply(Http2Stream.prototype); +Object.assign(Http2Stream.prototype, StreamBase); function processHeaders(headers) { assertIsObject(headers, 'headers'); diff --git a/lib/internal/stream_base.js b/lib/internal/stream_base.js index f0ac0a4e50ecb9..37e1a303dcf02b 100644 --- a/lib/internal/stream_base.js +++ b/lib/internal/stream_base.js @@ -71,16 +71,6 @@ const StreamBase = { if (err) return this.destroy(errnoException(err, 'write', error), cb); - }, - - apply(obj) { - [ - kCreateWriteWrap, - kWritevGeneric, - kWriteGeneric - ].forEach(function(sym) { - obj[sym] = StreamBase[sym]; - }); } }; diff --git a/lib/net.js b/lib/net.js index 35197f1359c51a..cf4a53f6292969 100644 --- a/lib/net.js +++ b/lib/net.js @@ -314,7 +314,7 @@ function Socket(options) { this[BYTES_READ] = 0; } util.inherits(Socket, stream.Duplex); -StreamBase.apply(Socket.prototype); +Object.assign(Socket.prototype, StreamBase); // Refresh existing timeouts. Socket.prototype._unrefTimer = function _unrefTimer() { From 0144de48bf2963b60adc3af653fd7d2c4308af4e Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 10:19:13 +0530 Subject: [PATCH 08/14] lib: rename mixin StreamBase to StreamSharedMethods --- lib/internal/http2/core.js | 6 +++--- lib/internal/{stream_base.js => stream_shared_methods.js} | 4 ++-- lib/net.js | 6 +++--- node.gyp | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) rename lib/internal/{stream_base.js => stream_shared_methods.js} (97%) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 223f1dbc4bdded..2aae1661cbc842 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -109,8 +109,8 @@ const { const { kCreateWriteWrap, kWriteGeneric, - StreamBase -} = require('internal/stream_base'); + StreamSharedMethods +} = require('internal/stream_shared_methods'); const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); const { constants, nameForErrorCode } = binding; @@ -1887,7 +1887,7 @@ class Http2Stream extends Duplex { } } } -Object.assign(Http2Stream.prototype, StreamBase); +Object.assign(Http2Stream.prototype, StreamSharedMethods); function processHeaders(headers) { assertIsObject(headers, 'headers'); diff --git a/lib/internal/stream_base.js b/lib/internal/stream_shared_methods.js similarity index 97% rename from lib/internal/stream_base.js rename to lib/internal/stream_shared_methods.js index 37e1a303dcf02b..34ca4d968bf480 100644 --- a/lib/internal/stream_base.js +++ b/lib/internal/stream_shared_methods.js @@ -31,7 +31,7 @@ function _handleWriteReq(req, handle, data, encoding) { } } -const StreamBase = { +const StreamSharedMethods = { [kCreateWriteWrap](req, handle, oncomplete) { req.handle = handle; req.oncomplete = oncomplete; @@ -78,5 +78,5 @@ module.exports = { kCreateWriteWrap, kWritevGeneric, kWriteGeneric, - StreamBase + StreamSharedMethods }; diff --git a/lib/net.js b/lib/net.js index cf4a53f6292969..cff1a332104e7e 100644 --- a/lib/net.js +++ b/lib/net.js @@ -56,8 +56,8 @@ const { kCreateWriteWrap, kWritevGeneric, kWriteGeneric, - StreamBase -} = require('internal/stream_base'); + StreamSharedMethods +} = require('internal/stream_shared_methods'); const errors = require('internal/errors'); const { ERR_INVALID_ADDRESS_FAMILY, @@ -314,7 +314,7 @@ function Socket(options) { this[BYTES_READ] = 0; } util.inherits(Socket, stream.Duplex); -Object.assign(Socket.prototype, StreamBase); +Object.assign(Socket.prototype, StreamSharedMethods); // Refresh existing timeouts. Socket.prototype._unrefTimer = function _unrefTimer() { diff --git a/node.gyp b/node.gyp index dcdd010cf4d86b..45743675ff115f 100644 --- a/node.gyp +++ b/node.gyp @@ -144,7 +144,7 @@ 'lib/internal/v8_prof_polyfill.js', 'lib/internal/v8_prof_processor.js', 'lib/internal/vm/Module.js', - 'lib/internal/stream_base.js', + 'lib/internal/stream_shared_methods.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/BufferList.js', From e4365d60321b96067336fb22228514dfeaee3e10 Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 17:45:19 +0530 Subject: [PATCH 09/14] lib: use stream shared funcs as top level instead of properties of prototypes --- lib/internal/http2/core.js | 13 ++-- lib/internal/stream_shared_methods.js | 85 +++++++++++++-------------- lib/net.js | 18 +++--- 3 files changed, 53 insertions(+), 63 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 2aae1661cbc842..213fd481e6cbcd 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -107,9 +107,8 @@ const { refreshFnSymbol } = require('internal/timers'); const { - kCreateWriteWrap, - kWriteGeneric, - StreamSharedMethods + createWriteWrap, + writeGeneric } = require('internal/stream_shared_methods'); const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); @@ -1653,12 +1652,11 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const handle = this[kHandle]; - const req = new WriteWrap(); + const req = createWriteWrap(this[kHandle], afterDoStreamWrite); req.stream = this[kID]; req.callback = cb; - this[kCreateWriteWrap](req, handle, afterDoStreamWrite); - this[kWriteGeneric](req, data, encoding, cb); + + writeGeneric(this, req, data, encoding, cb); trackWriteState(this, req.bytes); } @@ -1887,7 +1885,6 @@ class Http2Stream extends Duplex { } } } -Object.assign(Http2Stream.prototype, StreamSharedMethods); function processHeaders(headers) { assertIsObject(headers, 'headers'); diff --git a/lib/internal/stream_shared_methods.js b/lib/internal/stream_shared_methods.js index 34ca4d968bf480..a4b4ef098dab9c 100644 --- a/lib/internal/stream_shared_methods.js +++ b/lib/internal/stream_shared_methods.js @@ -2,14 +2,13 @@ const { Buffer } = require('buffer'); const errors = require('internal/errors'); +const { WriteWrap } = process.binding('stream_wrap'); const errnoException = errors.errnoException; -const kCreateWriteWrap = Symbol('createWriteWrap'); -const kWritevGeneric = Symbol('writevGeneric'); -const kWriteGeneric = Symbol('writeGeneric'); +function handleWriteReq(req, data, encoding) { + const { handle } = req; -function _handleWriteReq(req, handle, data, encoding) { switch (encoding) { case 'buffer': return handle.writeBuffer(req, data); @@ -31,52 +30,50 @@ function _handleWriteReq(req, handle, data, encoding) { } } -const StreamSharedMethods = { - [kCreateWriteWrap](req, handle, oncomplete) { - req.handle = handle; - req.oncomplete = oncomplete; - req.async = false; - }, +function createWriteWrap(handle, oncomplete) { + const req = new WriteWrap(); - [kWritevGeneric](req, data, cb) { - const { error } = req; - const allBuffers = data.allBuffers; - let chunks; - let i; - if (allBuffers) { - chunks = data; - for (i = 0; i < data.length; i++) - data[i] = data[i].chunk; - } else { - chunks = new Array(data.length << 1); - for (i = 0; i < data.length; i++) { - const entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } - } - const err = this._handle.writev(req, chunks, allBuffers); + req.handle = handle; + req.oncomplete = oncomplete; + req.async = false; - // Retain chunks - if (err === 0) req._chunks = chunks; + return req; +} - if (err) - return this.destroy(errnoException(err, 'write', error), cb); - }, +function writevGeneric(self, req, data, cb) { + const allBuffers = data.allBuffers; + let chunks; + let i; + if (allBuffers) { + chunks = data; + for (i = 0; i < data.length; i++) + data[i] = data[i].chunk; + } else { + chunks = new Array(data.length << 1); + for (i = 0; i < data.length; i++) { + const entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; + } + } + const err = self._handle.writev(req, chunks, allBuffers); - [kWriteGeneric](req, data, encoding, cb) { - const { handle, error } = req; + // Retain chunks + if (err === 0) req._chunks = chunks; - const err = _handleWriteReq(req, handle, data, encoding); + if (err) + return self.destroy(errnoException(err, 'write', req.error), cb); +} - if (err) - return this.destroy(errnoException(err, 'write', error), cb); - } -}; +function writeGeneric(self, req, data, encoding, cb) { + const err = handleWriteReq(req, data, encoding); + + if (err) + return self.destroy(errnoException(err, 'write', req.error), cb); +} module.exports = { - kCreateWriteWrap, - kWritevGeneric, - kWriteGeneric, - StreamSharedMethods + createWriteWrap, + writevGeneric, + writeGeneric }; diff --git a/lib/net.js b/lib/net.js index cff1a332104e7e..a65d8694ad71ed 100644 --- a/lib/net.js +++ b/lib/net.js @@ -46,17 +46,16 @@ const { TCP, constants: TCPConstants } = process.binding('tcp_wrap'); const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap'); const { TCPConnectWrap } = process.binding('tcp_wrap'); const { PipeConnectWrap } = process.binding('pipe_wrap'); -const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); +const { ShutdownWrap } = process.binding('stream_wrap'); const { newAsyncId, defaultTriggerAsyncIdScope, symbols: { async_id_symbol } } = require('internal/async_hooks'); const { - kCreateWriteWrap, - kWritevGeneric, - kWriteGeneric, - StreamSharedMethods + createWriteWrap, + writevGeneric, + writeGeneric } = require('internal/stream_shared_methods'); const errors = require('internal/errors'); const { @@ -314,7 +313,6 @@ function Socket(options) { this[BYTES_READ] = 0; } util.inherits(Socket, stream.Duplex); -Object.assign(Socket.prototype, StreamSharedMethods); // Refresh existing timeouts. Socket.prototype._unrefTimer = function _unrefTimer() { @@ -748,13 +746,11 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { } let ret; - const req = new WriteWrap(); - const handle = this._handle; - this[kCreateWriteWrap](req, handle, afterWrite); + const req = createWriteWrap(this._handle, afterWrite); if (writev) - ret = this[kWritevGeneric](req, data, cb); + ret = writevGeneric(this, req, data, cb); else - ret = this[kWriteGeneric](req, data, encoding, cb); + ret = writeGeneric(this, req, data, encoding, cb); if (ret) return ret; From 560635d627b1d79fc739b9953b786e916620fa8b Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 17:53:39 +0530 Subject: [PATCH 10/14] lib: mv lib/internal/stream_shared_methods.js lib/internal/stream_base_commons.js --- lib/internal/http2/core.js | 2 +- .../{stream_shared_methods.js => stream_base_commons.js} | 0 lib/net.js | 2 +- node.gyp | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) rename lib/internal/{stream_shared_methods.js => stream_base_commons.js} (100%) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 213fd481e6cbcd..3012063a090e93 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -109,7 +109,7 @@ const { const { createWriteWrap, writeGeneric -} = require('internal/stream_shared_methods'); +} = require('internal/stream_base_commons'); const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); const { constants, nameForErrorCode } = binding; diff --git a/lib/internal/stream_shared_methods.js b/lib/internal/stream_base_commons.js similarity index 100% rename from lib/internal/stream_shared_methods.js rename to lib/internal/stream_base_commons.js diff --git a/lib/net.js b/lib/net.js index a65d8694ad71ed..de051275965a54 100644 --- a/lib/net.js +++ b/lib/net.js @@ -56,7 +56,7 @@ const { createWriteWrap, writevGeneric, writeGeneric -} = require('internal/stream_shared_methods'); +} = require('internal/stream_base_commons'); const errors = require('internal/errors'); const { ERR_INVALID_ADDRESS_FAMILY, diff --git a/node.gyp b/node.gyp index 45743675ff115f..23c6e339856ea9 100644 --- a/node.gyp +++ b/node.gyp @@ -144,7 +144,7 @@ 'lib/internal/v8_prof_polyfill.js', 'lib/internal/v8_prof_processor.js', 'lib/internal/vm/Module.js', - 'lib/internal/stream_shared_methods.js', + 'lib/internal/stream_base_commons.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/BufferList.js', From 833c52a15b3d8c9e7779242b1bb8805156d267a3 Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 20:22:33 +0530 Subject: [PATCH 11/14] lib: add comment for readability --- lib/net.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/net.js b/lib/net.js index de051275965a54..fad41e318653a2 100644 --- a/lib/net.js +++ b/lib/net.js @@ -752,6 +752,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { else ret = writeGeneric(this, req, data, encoding, cb); + // this only runs when writeGeneric returns error. if (ret) return ret; this._bytesDispatched += req.bytes; From fcfd8d332a6113dda34f6d8d70d29ebd71a5e9c1 Mon Sep 17 00:00:00 2001 From: Ashok Date: Fri, 23 Mar 2018 22:06:00 +0530 Subject: [PATCH 12/14] lib: refactor _writev in Http2Stream --- lib/internal/http2/core.js | 24 +++++++----------------- lib/internal/stream_base_commons.js | 2 +- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 3012063a090e93..46159518106ef0 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -18,7 +18,6 @@ const tls = require('tls'); const util = require('util'); const fs = require('fs'); const { - errnoException, codes: { ERR_HTTP2_ALTSVC_INVALID_ORIGIN, ERR_HTTP2_ALTSVC_LENGTH, @@ -108,10 +107,11 @@ const { } = require('internal/timers'); const { createWriteWrap, - writeGeneric + writeGeneric, + writevGeneric } = require('internal/stream_base_commons'); -const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap'); +const { ShutdownWrap } = process.binding('stream_wrap'); const { constants, nameForErrorCode } = binding; const NETServer = net.Server; @@ -1685,22 +1685,12 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const handle = this[kHandle]; - const req = new WriteWrap(); + const req = createWriteWrap(this[kHandle], afterDoStreamWrite); req.stream = this[kID]; - req.handle = handle; req.callback = cb; - req.oncomplete = afterDoStreamWrite; - req.async = false; - const chunks = new Array(data.length << 1); - for (var i = 0; i < data.length; i++) { - const entry = data[i]; - chunks[i * 2] = entry.chunk; - chunks[i * 2 + 1] = entry.encoding; - } - const err = handle.writev(req, chunks); - if (err) - return this.destroy(errnoException(err, 'write', req.error), cb); + + writevGeneric(this, req, data, cb); + trackWriteState(this, req.bytes); } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index a4b4ef098dab9c..829c80fb125616 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -56,7 +56,7 @@ function writevGeneric(self, req, data, cb) { chunks[i * 2 + 1] = entry.encoding; } } - const err = self._handle.writev(req, chunks, allBuffers); + const err = req.handle.writev(req, chunks, allBuffers); // Retain chunks if (err === 0) req._chunks = chunks; From 43b912d815567fa9fcc1cd6ba0331bebbc92d07f Mon Sep 17 00:00:00 2001 From: Ashok Date: Sat, 24 Mar 2018 23:11:42 +0530 Subject: [PATCH 13/14] lib: rephrase comment --- lib/net.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/net.js b/lib/net.js index fad41e318653a2..4e994340d126dc 100644 --- a/lib/net.js +++ b/lib/net.js @@ -752,7 +752,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { else ret = writeGeneric(this, req, data, encoding, cb); - // this only runs when writeGeneric returns error. + // Bail out if handle.write* returned an error if (ret) return ret; this._bytesDispatched += req.bytes; From 5055714b4b88e868522f26f250289df88a7eac5d Mon Sep 17 00:00:00 2001 From: Ashok Date: Mon, 26 Mar 2018 18:18:04 +0530 Subject: [PATCH 14/14] lib: revert usage of const,let for perf reasons --- lib/internal/http2/core.js | 2 +- lib/internal/stream_base_commons.js | 14 +++++++------- lib/net.js | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 46159518106ef0..d79daf829d7a6b 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1685,7 +1685,7 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const req = createWriteWrap(this[kHandle], afterDoStreamWrite); + var req = createWriteWrap(this[kHandle], afterDoStreamWrite); req.stream = this[kID]; req.callback = cb; diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 829c80fb125616..d902a501524791 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -31,7 +31,7 @@ function handleWriteReq(req, data, encoding) { } function createWriteWrap(handle, oncomplete) { - const req = new WriteWrap(); + var req = new WriteWrap(); req.handle = handle; req.oncomplete = oncomplete; @@ -41,9 +41,9 @@ function createWriteWrap(handle, oncomplete) { } function writevGeneric(self, req, data, cb) { - const allBuffers = data.allBuffers; - let chunks; - let i; + var allBuffers = data.allBuffers; + var chunks; + var i; if (allBuffers) { chunks = data; for (i = 0; i < data.length; i++) @@ -51,12 +51,12 @@ function writevGeneric(self, req, data, cb) { } else { chunks = new Array(data.length << 1); for (i = 0; i < data.length; i++) { - const entry = data[i]; + var entry = data[i]; chunks[i * 2] = entry.chunk; chunks[i * 2 + 1] = entry.encoding; } } - const err = req.handle.writev(req, chunks, allBuffers); + var err = req.handle.writev(req, chunks, allBuffers); // Retain chunks if (err === 0) req._chunks = chunks; @@ -66,7 +66,7 @@ function writevGeneric(self, req, data, cb) { } function writeGeneric(self, req, data, encoding, cb) { - const err = handleWriteReq(req, data, encoding); + var err = handleWriteReq(req, data, encoding); if (err) return self.destroy(errnoException(err, 'write', req.error), cb); diff --git a/lib/net.js b/lib/net.js index 4e994340d126dc..c9116fb1a80a81 100644 --- a/lib/net.js +++ b/lib/net.js @@ -745,14 +745,14 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { return false; } - let ret; - const req = createWriteWrap(this._handle, afterWrite); + var ret; + var req = createWriteWrap(this._handle, afterWrite); if (writev) ret = writevGeneric(this, req, data, cb); else ret = writeGeneric(this, req, data, encoding, cb); - // Bail out if handle.write* returned an error + // Bail out if handle.write* returned an error if (ret) return ret; this._bytesDispatched += req.bytes;