From f9a17f529ebc27351a53eed58d9315a2a3e9b802 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Mon, 11 Dec 2017 17:55:17 -0500 Subject: [PATCH] net,src: refactor writeQueueSize tracking Currently, writeQueueSize is never used in C++ and barely used within JS. Instead of constantly updating the value on the JS object, create a getter that will retrieve the most up-to-date value from C++. For the vast majority of cases though, create a new prop on Socket.prototype[kLastWriteQueueSize] using a Symbol. Use this to track the current write size, entirely in JS land. PR-URL: https://github.com/nodejs/node/pull/17650 --- lib/_tls_wrap.js | 5 -- lib/net.js | 33 +++++---- src/pipe_wrap.cc | 1 - src/stream_base.cc | 13 +++- src/stream_base.h | 1 + src/stream_wrap.cc | 48 +++++++------ src/stream_wrap.h | 6 +- src/tcp_wrap.cc | 1 - src/tls_wrap.cc | 45 ++++++------ src/tls_wrap.h | 5 +- src/tty_wrap.cc | 6 +- test/parallel/test-tls-buffersize.js | 10 +-- .../test-http-keep-alive-large-write.js | 67 +++++------------- .../test-https-keep-alive-large-write.js | 70 ++++--------------- 14 files changed, 126 insertions(+), 185 deletions(-) diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index d888d777353a76..e17ea3f5948e82 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) { var options = this._tlsOptions; var ssl = this._handle; - // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately. After the handshake is done this will represent the actual - // write queue size - ssl.writeQueueSize = 1; - this.server = options.server; // For clients, we will always have either a given ca list or be using diff --git a/lib/net.js b/lib/net.js index 540de753641e88..7558afaf01e48b 100644 --- a/lib/net.js +++ b/lib/net.js @@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick'); const errors = require('internal/errors'); const dns = require('dns'); +const kLastWriteQueueSize = Symbol('lastWriteQueueSize'); + // `cluster` is only used by `listenInCluster` so for startup performance // reasons it's lazy loaded. var cluster = null; @@ -198,6 +200,7 @@ function Socket(options) { this._handle = null; this._parent = null; this._host = null; + this[kLastWriteQueueSize] = 0; if (typeof options === 'number') options = { fd: options }; // Legacy interface. @@ -398,12 +401,14 @@ Socket.prototype.setTimeout = function(msecs, callback) { Socket.prototype._onTimeout = function() { - if (this._handle) { - // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is + const handle = this._handle; + const lastWriteQueueSize = this[kLastWriteQueueSize]; + if (lastWriteQueueSize > 0 && handle) { + // `lastWriteQueueSize !== writeQueueSize` means there is // an active write in progress, so we suppress the timeout. - const prevWriteQueueSize = this._handle.writeQueueSize; - if (prevWriteQueueSize > 0 && - prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { + const writeQueueSize = handle.writeQueueSize; + if (lastWriteQueueSize !== writeQueueSize) { + this[kLastWriteQueueSize] = writeQueueSize; this._unrefTimer(); return; } @@ -473,7 +478,7 @@ Object.defineProperty(Socket.prototype, 'readyState', { Object.defineProperty(Socket.prototype, 'bufferSize', { get: function() { if (this._handle) { - return this._handle.writeQueueSize + this.writableLength; + return this[kLastWriteQueueSize] + this.writableLength; } } }); @@ -764,12 +769,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { this._bytesDispatched += req.bytes; - // If it was entirely flushed, we can write some more right now. - // However, if more is left in the queue, then wait until that clears. - if (req.async && this._handle.writeQueueSize !== 0) - req.cb = cb; - else + if (!req.async) { cb(); + return; + } + + req.cb = cb; + this[kLastWriteQueueSize] = req.bytes; }; @@ -853,6 +859,9 @@ function afterWrite(status, handle, req, err) { if (self !== process.stderr && self !== process.stdout) debug('afterWrite', status); + if (req.async) + self[kLastWriteQueueSize] = 0; + // callback may come after call to destroy. if (self.destroyed) { debug('afterWrite destroyed'); @@ -872,7 +881,7 @@ function afterWrite(status, handle, req, err) { debug('afterWrite call cb'); if (req.cb) - req.cb.call(self); + req.cb.call(undefined); } diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 76280f0ce77e86..c6dce756cea829 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -165,7 +165,6 @@ PipeWrap::PipeWrap(Environment* env, int r = uv_pipe_init(env->event_loop(), &handle_, ipc); CHECK_EQ(r, 0); // How do we proxy this error up to javascript? // Suggestion: uv_pipe_init() returns void. - UpdateWriteQueueSize(); } diff --git a/src/stream_base.cc b/src/stream_base.cc index bb25fc1cff0e9c..a48e77063ef451 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -193,7 +193,8 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { } err = DoWrite(req_wrap, buf_list, count, nullptr); - req_wrap_obj->Set(env->async(), True(env->isolate())); + if (HasWriteQueue()) + req_wrap_obj->Set(env->async(), True(env->isolate())); if (err) req_wrap->Dispose(); @@ -249,7 +250,8 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { req_wrap = WriteWrap::New(env, req_wrap_obj, this); err = DoWrite(req_wrap, bufs, count, nullptr); - req_wrap_obj->Set(env->async(), True(env->isolate())); + if (HasWriteQueue()) + req_wrap_obj->Set(env->async(), True(env->isolate())); req_wrap_obj->Set(env->buffer_string(), args[1]); if (err) @@ -373,7 +375,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { reinterpret_cast(send_handle)); } - req_wrap_obj->Set(env->async(), True(env->isolate())); + if (HasWriteQueue()) + req_wrap_obj->Set(env->async(), True(env->isolate())); if (err) req_wrap->Dispose(); @@ -467,6 +470,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { return 0; } +bool StreamResource::HasWriteQueue() { + return true; +} + const char* StreamResource::Error() const { return nullptr; diff --git a/src/stream_base.h b/src/stream_base.h index d063176b04a4db..071627f3bf2a67 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -162,6 +162,7 @@ class StreamResource { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; + virtual bool HasWriteQueue(); virtual const char* Error() const; virtual void ClearError(); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index f6cfba84c28a55..094991107ba7aa 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -40,13 +40,15 @@ namespace node { using v8::Context; +using v8::DontDelete; using v8::EscapableHandleScope; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; -using v8::Integer; using v8::Local; using v8::Object; +using v8::ReadOnly; +using v8::Signature; using v8::Value; @@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, void LibuvStreamWrap::AddMethods(Environment* env, v8::Local target, int flags) { - env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize); + Local get_write_queue_size = + FunctionTemplate::New(env->isolate(), + GetWriteQueueSize, + env->as_external(), + Signature::New(env->isolate(), target)); + target->PrototypeTemplate()->SetAccessorProperty( + env->write_queue_size_string(), + get_write_queue_size, + Local(), + static_cast(ReadOnly | DontDelete)); env->SetProtoMethod(target, "setBlocking", SetBlocking); StreamBase::AddMethods(env, target, flags); } @@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() { } -uint32_t LibuvStreamWrap::UpdateWriteQueueSize() { - HandleScope scope(env()->isolate()); - uint32_t write_queue_size = stream()->write_queue_size; - object()->Set(env()->context(), - env()->write_queue_size_string(), - Integer::NewFromUnsigned(env()->isolate(), - write_queue_size)).FromJust(); - return write_queue_size; -} - - int LibuvStreamWrap::ReadStart() { return uv_read_start(stream(), OnAlloc, OnRead); } @@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle, } -void LibuvStreamWrap::UpdateWriteQueueSize( - const FunctionCallbackInfo& args) { +void LibuvStreamWrap::GetWriteQueueSize( + const FunctionCallbackInfo& info) { LibuvStreamWrap* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This()); + + if (wrap->stream() == nullptr) { + info.GetReturnValue().Set(0); + return; + } - uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); - args.GetReturnValue().Set(write_queue_size); + uint32_t write_queue_size = wrap->stream()->write_queue_size; + info.GetReturnValue().Set(write_queue_size); } @@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, } w->Dispatched(); - UpdateWriteQueueSize(); return r; } +bool LibuvStreamWrap::HasWriteQueue() { + return stream()->write_queue_size > 0; +} + + void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { WriteWrap* req_wrap = WriteWrap::from_req(req); CHECK_NE(req_wrap, nullptr); @@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) { StreamBase::AfterWrite(w, status); - UpdateWriteQueueSize(); } } // namespace node diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 414bad393fa9b1..a695f9a08a7729 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) override; + bool HasWriteQueue() override; inline uv_stream_t* stream() const { return stream_; @@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { } AsyncWrap* GetAsyncWrap() override; - uint32_t UpdateWriteQueueSize(); static void AddMethods(Environment* env, v8::Local target, int flags = StreamBase::kFlagNone); private: - static void UpdateWriteQueueSize( - const v8::FunctionCallbackInfo& args); + static void GetWriteQueueSize( + const v8::FunctionCallbackInfo& info); static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 8dd14e2e16c18b..aa0cb7ed1789b5 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -169,7 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local object, ProviderType provider) int r = uv_tcp_init(env->event_loop(), &handle_); CHECK_EQ(r, 0); // How do we proxy this error up to javascript? // Suggestion: uv_tcp_init() returns void. - UpdateWriteQueueSize(); } diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index c661a0ac32ab2b..b2ef5184e077b8 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -35,14 +35,16 @@ namespace node { using crypto::SecureContext; using crypto::SSLWrap; using v8::Context; +using v8::DontDelete; using v8::EscapableHandleScope; using v8::Exception; using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; -using v8::Integer; using v8::Local; using v8::Object; +using v8::ReadOnly; +using v8::Signature; using v8::String; using v8::Value; @@ -309,7 +311,6 @@ void TLSWrap::EncOut() { // No data to write if (BIO_pending(enc_out_) == 0) { - UpdateWriteQueueSize(); if (clear_in_->Length() == 0) InvokeQueued(0); return; @@ -555,17 +556,6 @@ bool TLSWrap::IsClosing() { } -uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) { - HandleScope scope(env()->isolate()); - if (write_queue_size == 0) - write_queue_size = BIO_pending(enc_out_); - object()->Set(env()->context(), - env()->write_queue_size_string(), - Integer::NewFromUnsigned(env()->isolate(), - write_queue_size)).FromJust(); - return write_queue_size; -} - int TLSWrap::ReadStart() { if (stream_ != nullptr) @@ -612,9 +602,6 @@ int TLSWrap::DoWrite(WriteWrap* w, // However, if there is any data that should be written to the socket, // the callback should not be invoked immediately if (BIO_pending(enc_out_) == 0) { - // net.js expects writeQueueSize to be > 0 if the write isn't - // immediately flushed - UpdateWriteQueueSize(1); return stream_->DoWrite(w, bufs, count, send_handle); } } @@ -666,7 +653,6 @@ int TLSWrap::DoWrite(WriteWrap* w, // Try writing data immediately EncOut(); - UpdateWriteQueueSize(); return 0; } @@ -938,12 +924,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo& args) { +void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo& info) { TLSWrap* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This()); - uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); - args.GetReturnValue().Set(write_queue_size); + if (wrap->clear_in_ == nullptr) { + info.GetReturnValue().Set(0); + return; + } + + uint32_t write_queue_size = BIO_pending(wrap->enc_out_); + info.GetReturnValue().Set(write_queue_size); } @@ -966,6 +957,17 @@ void TLSWrap::Initialize(Local target, t->InstanceTemplate()->SetInternalFieldCount(1); t->SetClassName(tlsWrapString); + Local get_write_queue_size = + FunctionTemplate::New(env->isolate(), + GetWriteQueueSize, + env->as_external(), + Signature::New(env->isolate(), t)); + t->PrototypeTemplate()->SetAccessorProperty( + env->write_queue_size_string(), + get_write_queue_size, + Local(), + static_cast(ReadOnly | DontDelete)); + AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset); env->SetProtoMethod(t, "receive", Receive); env->SetProtoMethod(t, "start", Start); @@ -973,7 +975,6 @@ void TLSWrap::Initialize(Local target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "destroySSL", DestroySSL); env->SetProtoMethod(t, "enableCertCb", EnableCertCb); - env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize); StreamBase::AddMethods(env, t, StreamBase::kFlagHasWritev); SSLWrap::AddMethods(env, t); diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 56bdda03fc0d5b..ed047797c2e969 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -131,7 +131,6 @@ class TLSWrap : public AsyncWrap, AsyncWrap* GetAsyncWrap() override; bool IsIPCPipe() override; - uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); // Resource implementation static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx); @@ -188,8 +187,8 @@ class TLSWrap : public AsyncWrap, bool eof_; private: - static void UpdateWriteQueueSize( - const v8::FunctionCallbackInfo& args); + static void GetWriteQueueSize( + const v8::FunctionCallbackInfo& info); }; } // namespace node diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index 18f9feca57a4c2..111c568cb52a8a 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -153,11 +153,9 @@ void TTYWrap::New(const FunctionCallbackInfo& args) { CHECK_GE(fd, 0); int err = 0; - TTYWrap* wrap = new TTYWrap(env, args.This(), fd, args[1]->IsTrue(), &err); + new TTYWrap(env, args.This(), fd, args[1]->IsTrue(), &err); if (err != 0) - return env->ThrowUVException(err, "uv_tty_init"); - - wrap->UpdateWriteQueueSize(); + env->ThrowUVException(err, "uv_tty_init"); } diff --git a/test/parallel/test-tls-buffersize.js b/test/parallel/test-tls-buffersize.js index 49848cd865aca5..c94b95d7b32d31 100644 --- a/test/parallel/test-tls-buffersize.js +++ b/test/parallel/test-tls-buffersize.js @@ -7,17 +7,17 @@ const fixtures = require('../common/fixtures'); const tls = require('tls'); const iter = 10; -const overhead = 30; const server = tls.createServer({ key: fixtures.readKey('agent2-key.pem'), cert: fixtures.readKey('agent2-cert.pem') }, common.mustCall((socket) => { - socket.on('readable', common.mustCallAtLeast(() => { - socket.read(); - }, 1)); + let str = ''; + socket.setEncoding('utf-8'); + socket.on('data', (chunk) => { str += chunk; }); socket.on('end', common.mustCall(() => { + assert.strictEqual(str, 'a'.repeat(iter - 1)); server.close(); })); })); @@ -31,7 +31,7 @@ server.listen(0, common.mustCall(() => { for (let i = 1; i < iter; i++) { client.write('a'); - assert.strictEqual(client.bufferSize, i + overhead); + assert.strictEqual(client.bufferSize, i + 1); } client.on('finish', common.mustCall(() => { diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js index 2cdf539e76b2fc..4119c2353daa53 100644 --- a/test/sequential/test-http-keep-alive-large-write.js +++ b/test/sequential/test-http-keep-alive-large-write.js @@ -6,26 +6,12 @@ const http = require('http'); // This test assesses whether long-running writes can complete // or timeout because the socket is not aware that the backing // stream is still writing. -// To simulate a slow client, we write a really large chunk and -// then proceed through the following cycle: -// 1) Receive first 'data' event and record currently written size -// 2) Once we've read up to currently written size recorded above, -// we pause the stream and wait longer than the server timeout -// 3) Socket.prototype._onTimeout triggers and should confirm -// that the backing stream is still active and writing -// 4) Our timer fires, we resume the socket and start at 1) -const minReadSize = 250000; -const serverTimeout = common.platformTimeout(500); -let offsetTimeout = common.platformTimeout(100); -let serverConnectionHandle; -let writeSize = 3000000; -let didReceiveData = false; -// this represents each cycles write size, where the cycle consists -// of `write > read > _onTimeout` -let currentWriteSize = 0; +const writeSize = 3000000; +let socket; const server = http.createServer(common.mustCall((req, res) => { + server.close(); const content = Buffer.alloc(writeSize, 0x44); res.writeHead(200, { @@ -34,47 +20,28 @@ const server = http.createServer(common.mustCall((req, res) => { 'Vary': 'Accept-Encoding' }); - serverConnectionHandle = res.socket._handle; + socket = res.socket; + const onTimeout = socket._onTimeout; + socket._onTimeout = common.mustCallAtLeast(() => onTimeout.call(socket), 1); res.write(content); res.end(); })); -server.setTimeout(serverTimeout); server.on('timeout', () => { - assert.strictEqual(didReceiveData, false, 'Should not timeout'); + // TODO(apapirovski): This test is faulty on certain Windows systems + // as no queue is ever created + assert(!socket._handle || socket._handle.writeQueueSize === 0, + 'Should not timeout'); }); server.listen(0, common.mustCall(() => { http.get({ path: '/', port: server.address().port - }, common.mustCall((res) => { - const resume = () => res.resume(); - let receivedBufferLength = 0; - let firstReceivedAt; - res.on('data', common.mustCallAtLeast((buf) => { - if (receivedBufferLength === 0) { - currentWriteSize = Math.max( - minReadSize, - writeSize - serverConnectionHandle.writeQueueSize - ); - didReceiveData = false; - firstReceivedAt = Date.now(); - } - receivedBufferLength += buf.length; - if (receivedBufferLength >= currentWriteSize) { - didReceiveData = true; - writeSize = serverConnectionHandle.writeQueueSize; - receivedBufferLength = 0; - res.pause(); - setTimeout( - resume, - serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) - ); - offsetTimeout = 0; - } - }, 1)); - res.on('end', common.mustCall(() => { - server.close(); - })); - })); + }, (res) => { + res.once('data', () => { + socket._onTimeout(); + res.on('data', () => {}); + }); + res.on('end', () => server.close()); + }); })); diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js index 5048f4f9519449..79381ba8735756 100644 --- a/test/sequential/test-https-keep-alive-large-write.js +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -2,31 +2,15 @@ const common = require('../common'); if (!common.hasCrypto) common.skip('missing crypto'); -const assert = require('assert'); const fixtures = require('../common/fixtures'); const https = require('https'); // This test assesses whether long-running writes can complete // or timeout because the socket is not aware that the backing // stream is still writing. -// To simulate a slow client, we write a really large chunk and -// then proceed through the following cycle: -// 1) Receive first 'data' event and record currently written size -// 2) Once we've read up to currently written size recorded above, -// we pause the stream and wait longer than the server timeout -// 3) Socket.prototype._onTimeout triggers and should confirm -// that the backing stream is still active and writing -// 4) Our timer fires, we resume the socket and start at 1) -const minReadSize = 250000; -const serverTimeout = common.platformTimeout(500); -let offsetTimeout = common.platformTimeout(100); -let serverConnectionHandle; -let writeSize = 2000000; -let didReceiveData = false; -// this represents each cycles write size, where the cycle consists -// of `write > read > _onTimeout` -let currentWriteSize = 0; +const writeSize = 30000000; +let socket; const server = https.createServer({ key: fixtures.readKey('agent1-key.pem'), @@ -40,50 +24,24 @@ const server = https.createServer({ 'Vary': 'Accept-Encoding' }); - serverConnectionHandle = res.socket._handle; - res.write(content, () => { - assert.strictEqual(serverConnectionHandle.writeQueueSize, 0); - }); + socket = res.socket; + const onTimeout = socket._onTimeout; + socket._onTimeout = common.mustCallAtLeast(() => onTimeout.call(socket), 1); + res.write(content); res.end(); })); -server.setTimeout(serverTimeout); -server.on('timeout', () => { - assert.strictEqual(didReceiveData, false, 'Should not timeout'); -}); +server.on('timeout', common.mustNotCall()); server.listen(0, common.mustCall(() => { https.get({ path: '/', port: server.address().port, rejectUnauthorized: false - }, common.mustCall((res) => { - const resume = () => res.resume(); - let receivedBufferLength = 0; - let firstReceivedAt; - res.on('data', common.mustCallAtLeast((buf) => { - if (receivedBufferLength === 0) { - currentWriteSize = Math.max( - minReadSize, - writeSize - serverConnectionHandle.writeQueueSize - ); - didReceiveData = false; - firstReceivedAt = Date.now(); - } - receivedBufferLength += buf.length; - if (receivedBufferLength >= currentWriteSize) { - didReceiveData = true; - writeSize = serverConnectionHandle.writeQueueSize; - receivedBufferLength = 0; - res.pause(); - setTimeout( - resume, - serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) - ); - offsetTimeout = 0; - } - }, 1)); - res.on('end', common.mustCall(() => { - server.close(); - })); - })); + }, (res) => { + res.once('data', () => { + socket._onTimeout(); + res.on('data', () => {}); + }); + res.on('end', () => server.close()); + }); }));