Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v6.x backport] tls: fix writeQueueSize prop, long write timeouts #16420

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,8 @@ TLSSocket.prototype._init = function(socket, wrap) {
var ssl = this._handle;

// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately
// TODO(indutny): revise this solution, it might be 1 before handshake and
// represent real writeQueueSize during regular writes.
// immediately. After the handshake is done this will represent the actual
// write queue size
ssl.writeQueueSize = 1;

this.server = options.server;
Expand Down
10 changes: 10 additions & 0 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ Socket.prototype.setTimeout = function(msecs, callback) {


Socket.prototype._onTimeout = function() {
if (this._handle) {
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
// an active write in progress, so we suppress the timeout.
const prevWriteQueueSize = this._handle.writeQueueSize;
if (prevWriteQueueSize > 0 &&
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
this._unrefTimer();
return;
}
}
debug('_onTimeout');
this.emit('timeout');
};
Expand Down
22 changes: 18 additions & 4 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ StreamWrap::StreamWrap(Environment* env,
void StreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<StreamWrap>(env, target, flags);
}
Expand Down Expand Up @@ -113,11 +114,14 @@ bool StreamWrap::IsIPCPipe() {
}


void StreamWrap::UpdateWriteQueueSize() {
uint32_t StreamWrap::UpdateWriteQueueSize() {
HandleScope scope(env()->isolate());
Local<Integer> write_queue_size =
Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
object()->Set(env()->write_queue_size_string(), write_queue_size);
uint32_t write_queue_size = stream()->write_queue_size;
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


Expand Down Expand Up @@ -242,6 +246,16 @@ void StreamWrap::OnRead(uv_stream_t* handle,
}


void StreamWrap::UpdateWriteQueueSize(
const FunctionCallbackInfo<Value>& args) {
StreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
}


void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
StreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
Expand Down
4 changes: 3 additions & 1 deletion src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ class StreamWrap : public HandleWrap, public StreamBase {
}

AsyncWrap* GetAsyncWrap() override;
void UpdateWriteQueueSize();
uint32_t UpdateWriteQueueSize();

static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);

// Callbacks for libuv
Expand Down
34 changes: 32 additions & 2 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::String;
Expand Down Expand Up @@ -276,6 +277,7 @@ void TLSWrap::EncOut() {

// No data to write
if (BIO_pending(enc_out_) == 0) {
UpdateWriteQueueSize();
if (clear_in_->Length() == 0)
InvokeQueued(0);
return;
Expand Down Expand Up @@ -530,6 +532,18 @@ bool TLSWrap::IsClosing() {
}


uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
HandleScope scope(env()->isolate());
if (write_queue_size == 0)
write_queue_size = BIO_pending(enc_out_);
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int TLSWrap::ReadStart() {
return stream_->ReadStart();
}
Expand Down Expand Up @@ -570,8 +584,12 @@ int TLSWrap::DoWrite(WriteWrap* w,
ClearOut();
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0)
if (BIO_pending(enc_out_) == 0) {
// net.js expects writeQueueSize to be > 0 if the write isn't
// immediately flushed
UpdateWriteQueueSize(1);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}

// Queue callback to execute it on next tick
Expand Down Expand Up @@ -621,13 +639,15 @@ int TLSWrap::DoWrite(WriteWrap* w,

// Try writing data immediately
EncOut();
UpdateWriteQueueSize();

return 0;
}


void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
// Intentionally empty
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->UpdateWriteQueueSize();
}


Expand Down Expand Up @@ -891,6 +911,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB


void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
TLSWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
}


void TLSWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
Expand All @@ -912,6 +941,7 @@ void TLSWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);

StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
Expand Down
5 changes: 5 additions & 0 deletions src/tls_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TLSWrap : public AsyncWrap,

AsyncWrap* GetAsyncWrap() override;
bool IsIPCPipe() override;
uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);

// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
Expand Down Expand Up @@ -166,6 +167,10 @@ class TLSWrap : public AsyncWrap,
// If true - delivered EOF to the js-land, either after `close_notify`, or
// after the `UV_EOF` on socket.
bool eof_;

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
};

} // namespace node
Expand Down
17 changes: 17 additions & 0 deletions test/parallel/test-net-timeout-no-handle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const common = require('../common');
const net = require('net');
const assert = require('assert');

const socket = new net.Socket();
socket.setTimeout(common.platformTimeout(50));

socket.on('timeout', common.mustCall(() => {
assert.strictEqual(socket._handle, null);
}));

socket.on('connect', common.mustNotCall());

// since the timeout is unrefed, the code will exit without this
setTimeout(() => {}, common.platformTimeout(200));
43 changes: 43 additions & 0 deletions test/parallel/test-tls-buffersize.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const tls = require('tls');

const iter = 10;
const overhead = 30;

const server = tls.createServer({
key: fixtures.readKey('agent2-key.pem'),
cert: fixtures.readKey('agent2-cert.pem')
}, common.mustCall((socket) => {
socket.on('readable', common.mustCallAtLeast(() => {
socket.read();
}, 1));

socket.on('end', common.mustCall(() => {
server.close();
}));
}));

server.listen(0, common.mustCall(() => {
const client = tls.connect({
port: server.address().port,
rejectUnauthorized: false
}, common.mustCall(() => {
assert.strictEqual(client.bufferSize, 0);

for (let i = 1; i < iter; i++) {
client.write('a');
assert.strictEqual(client.bufferSize, i + overhead);
}

client.on('finish', common.mustCall(() => {
assert.strictEqual(client.bufferSize, 0);
}));

client.end();
}));
}));
80 changes: 80 additions & 0 deletions test/sequential/test-http-keep-alive-large-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');

// This test assesses whether long-running writes can complete
// or timeout because the socket is not aware that the backing
// stream is still writing.
// To simulate a slow client, we write a really large chunk and
// then proceed through the following cycle:
// 1) Receive first 'data' event and record currently written size
// 2) Once we've read up to currently written size recorded above,
// we pause the stream and wait longer than the server timeout
// 3) Socket.prototype._onTimeout triggers and should confirm
// that the backing stream is still active and writing
// 4) Our timer fires, we resume the socket and start at 1)

const minReadSize = 250000;
const serverTimeout = common.platformTimeout(500);
let offsetTimeout = common.platformTimeout(100);
let serverConnectionHandle;
let writeSize = 3000000;
let didReceiveData = false;
// this represents each cycles write size, where the cycle consists
// of `write > read > _onTimeout`
let currentWriteSize = 0;

const server = http.createServer(common.mustCall((req, res) => {
const content = Buffer.alloc(writeSize, 0x44);

res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Length': content.length.toString(),
'Vary': 'Accept-Encoding'
});

serverConnectionHandle = res.socket._handle;
res.write(content);
res.end();
}));
server.setTimeout(serverTimeout);
server.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});

server.listen(0, common.mustCall(() => {
http.get({
path: '/',
port: server.address().port
}, common.mustCall((res) => {
const resume = () => res.resume();
let receivedBufferLength = 0;
let firstReceivedAt;
res.on('data', common.mustCallAtLeast((buf) => {
if (receivedBufferLength === 0) {
currentWriteSize = Math.max(
minReadSize,
writeSize - serverConnectionHandle.writeQueueSize
);
didReceiveData = false;
firstReceivedAt = Date.now();
}
receivedBufferLength += buf.length;
if (receivedBufferLength >= currentWriteSize) {
didReceiveData = true;
writeSize = serverConnectionHandle.writeQueueSize;
receivedBufferLength = 0;
res.pause();
setTimeout(
resume,
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
);
offsetTimeout = 0;
}
}, 1));
res.on('end', common.mustCall(() => {
server.close();
}));
}));
}));
Loading