Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v9.x] net,src: refactor writeQueueSize tracking #18084

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) {
var options = this._tlsOptions;
var ssl = this._handle;

// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately. After the handshake is done this will represent the actual
// write queue size
ssl.writeQueueSize = 1;

this.server = options.server;

// For clients, we will always have either a given ca list or be using
Expand Down
33 changes: 21 additions & 12 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick');
const errors = require('internal/errors');
const dns = require('dns');

const kLastWriteQueueSize = Symbol('lastWriteQueueSize');

// `cluster` is only used by `listenInCluster` so for startup performance
// reasons it's lazy loaded.
var cluster = null;
Expand Down Expand Up @@ -198,6 +200,7 @@ function Socket(options) {
this._handle = null;
this._parent = null;
this._host = null;
this[kLastWriteQueueSize] = 0;

if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
Expand Down Expand Up @@ -398,12 +401,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {


Socket.prototype._onTimeout = function() {
if (this._handle) {
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
const handle = this._handle;
const lastWriteQueueSize = this[kLastWriteQueueSize];
if (lastWriteQueueSize > 0 && handle) {
// `lastWriteQueueSize !== writeQueueSize` means there is
// an active write in progress, so we suppress the timeout.
const prevWriteQueueSize = this._handle.writeQueueSize;
if (prevWriteQueueSize > 0 &&
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
const writeQueueSize = handle.writeQueueSize;
if (lastWriteQueueSize !== writeQueueSize) {
this[kLastWriteQueueSize] = writeQueueSize;
this._unrefTimer();
return;
}
Expand Down Expand Up @@ -473,7 +478,7 @@ Object.defineProperty(Socket.prototype, 'readyState', {
Object.defineProperty(Socket.prototype, 'bufferSize', {
get: function() {
if (this._handle) {
return this._handle.writeQueueSize + this.writableLength;
return this[kLastWriteQueueSize] + this.writableLength;
}
}
});
Expand Down Expand Up @@ -764,12 +769,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {

this._bytesDispatched += req.bytes;

// If it was entirely flushed, we can write some more right now.
// However, if more is left in the queue, then wait until that clears.
if (req.async && this._handle.writeQueueSize !== 0)
req.cb = cb;
else
if (!req.async) {
cb();
return;
}

req.cb = cb;
this[kLastWriteQueueSize] = req.bytes;
};


Expand Down Expand Up @@ -853,6 +859,9 @@ function afterWrite(status, handle, req, err) {
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);

if (req.async)
self[kLastWriteQueueSize] = 0;

// callback may come after call to destroy.
if (self.destroyed) {
debug('afterWrite destroyed');
Expand All @@ -872,7 +881,7 @@ function afterWrite(status, handle, req, err) {
debug('afterWrite call cb');

if (req.cb)
req.cb.call(self);
req.cb.call(undefined);
}


Expand Down
1 change: 0 additions & 1 deletion src/pipe_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ PipeWrap::PipeWrap(Environment* env,
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
UpdateWriteQueueSize();
}


Expand Down
13 changes: 10 additions & 3 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}

err = DoWrite(req_wrap, buf_list, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (HasWriteQueue())
req_wrap_obj->Set(env->async(), True(env->isolate()));

if (err)
req_wrap->Dispose();
Expand Down Expand Up @@ -249,7 +250,8 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
req_wrap = WriteWrap::New(env, req_wrap_obj, this);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (HasWriteQueue())
req_wrap_obj->Set(env->async(), True(env->isolate()));
req_wrap_obj->Set(env->buffer_string(), args[1]);

if (err)
Expand Down Expand Up @@ -373,7 +375,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
reinterpret_cast<uv_stream_t*>(send_handle));
}

req_wrap_obj->Set(env->async(), True(env->isolate()));
if (HasWriteQueue())
req_wrap_obj->Set(env->async(), True(env->isolate()));

if (err)
req_wrap->Dispose();
Expand Down Expand Up @@ -467,6 +470,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
return 0;
}

bool StreamResource::HasWriteQueue() {
return true;
}


const char* StreamResource::Error() const {
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class StreamResource {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
virtual bool HasWriteQueue();
virtual const char* Error() const;
virtual void ClearError();

Expand Down
48 changes: 28 additions & 20 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
namespace node {

using v8::Context;
using v8::DontDelete;
using v8::EscapableHandleScope;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::ReadOnly;
using v8::Signature;
using v8::Value;


Expand Down Expand Up @@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
void LibuvStreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
Local<FunctionTemplate> get_write_queue_size =
FunctionTemplate::New(env->isolate(),
GetWriteQueueSize,
env->as_external(),
Signature::New(env->isolate(), target));
target->PrototypeTemplate()->SetAccessorProperty(
env->write_queue_size_string(),
get_write_queue_size,
Local<FunctionTemplate>(),
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
}
Expand Down Expand Up @@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() {
}


uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
HandleScope scope(env()->isolate());
uint32_t write_queue_size = stream()->write_queue_size;
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int LibuvStreamWrap::ReadStart() {
return uv_read_start(stream(), OnAlloc, OnRead);
}
Expand Down Expand Up @@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}


void LibuvStreamWrap::UpdateWriteQueueSize(
const FunctionCallbackInfo<Value>& args) {
void LibuvStreamWrap::GetWriteQueueSize(
const FunctionCallbackInfo<Value>& info) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());

if (wrap->stream() == nullptr) {
info.GetReturnValue().Set(0);
return;
}

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
uint32_t write_queue_size = wrap->stream()->write_queue_size;
info.GetReturnValue().Set(write_queue_size);
}


Expand Down Expand Up @@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
}

w->Dispatched();
UpdateWriteQueueSize();

return r;
}


bool LibuvStreamWrap::HasWriteQueue() {
return stream()->write_queue_size > 0;
}


void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = WriteWrap::from_req(req);
CHECK_NE(req_wrap, nullptr);
Expand All @@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {

void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
StreamBase::AfterWrite(w, status);
UpdateWriteQueueSize();
}

} // namespace node
Expand Down
6 changes: 3 additions & 3 deletions src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
bool HasWriteQueue() override;

inline uv_stream_t* stream() const {
return stream_;
Expand Down Expand Up @@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
}

AsyncWrap* GetAsyncWrap() override;
uint32_t UpdateWriteQueueSize();

static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& info);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);

// Callbacks for libuv
Expand Down
1 change: 0 additions & 1 deletion src/tcp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
int r = uv_tcp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
UpdateWriteQueueSize();
}


Expand Down
45 changes: 23 additions & 22 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ namespace node {
using crypto::SecureContext;
using crypto::SSLWrap;
using v8::Context;
using v8::DontDelete;
using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::ReadOnly;
using v8::Signature;
using v8::String;
using v8::Value;

Expand Down Expand Up @@ -309,7 +311,6 @@ void TLSWrap::EncOut() {

// No data to write
if (BIO_pending(enc_out_) == 0) {
UpdateWriteQueueSize();
if (clear_in_->Length() == 0)
InvokeQueued(0);
return;
Expand Down Expand Up @@ -555,17 +556,6 @@ bool TLSWrap::IsClosing() {
}


uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
HandleScope scope(env()->isolate());
if (write_queue_size == 0)
write_queue_size = BIO_pending(enc_out_);
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int TLSWrap::ReadStart() {
if (stream_ != nullptr)
Expand Down Expand Up @@ -612,9 +602,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0) {
// net.js expects writeQueueSize to be > 0 if the write isn't
// immediately flushed
UpdateWriteQueueSize(1);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}
Expand Down Expand Up @@ -666,7 +653,6 @@ int TLSWrap::DoWrite(WriteWrap* w,

// Try writing data immediately
EncOut();
UpdateWriteQueueSize();

return 0;
}
Expand Down Expand Up @@ -938,12 +924,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB


void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo<Value>& info) {
TLSWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
if (wrap->clear_in_ == nullptr) {
info.GetReturnValue().Set(0);
return;
}

uint32_t write_queue_size = BIO_pending(wrap->enc_out_);
info.GetReturnValue().Set(write_queue_size);
}


Expand All @@ -966,14 +957,24 @@ void TLSWrap::Initialize(Local<Object> target,
t->InstanceTemplate()->SetInternalFieldCount(1);
t->SetClassName(tlsWrapString);

Local<FunctionTemplate> get_write_queue_size =
FunctionTemplate::New(env->isolate(),
GetWriteQueueSize,
env->as_external(),
Signature::New(env->isolate(), t));
t->PrototypeTemplate()->SetAccessorProperty(
env->write_queue_size_string(),
get_write_queue_size,
Local<FunctionTemplate>(),
static_cast<PropertyAttribute>(ReadOnly | DontDelete));

AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
env->SetProtoMethod(t, "receive", Receive);
env->SetProtoMethod(t, "start", Start);
env->SetProtoMethod(t, "setVerifyMode", SetVerifyMode);
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);

StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
Expand Down
Loading