Skip to content

Commit

Permalink
net: rework autoSelectFamily implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ShogunPanda committed Feb 13, 2023
1 parent ecd385e commit c8b5f22
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 52 deletions.
11 changes: 0 additions & 11 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ const EE = require('events');
const net = require('net');
const tls = require('tls');
const common = require('_tls_common');
const { kWrapConnectedHandle } = require('internal/net');
const JSStreamSocket = require('internal/js_stream_socket');
const { Buffer } = require('buffer');
let debug = require('internal/util/debuglog').debuglog('tls', (fn) => {
Expand Down Expand Up @@ -633,16 +632,6 @@ TLSSocket.prototype._wrapHandle = function(wrap, handle) {
return res;
};

TLSSocket.prototype[kWrapConnectedHandle] = function(handle) {
this._handle = this._wrapHandle(null, handle);
this.ssl = this._handle;
this._init();

if (this._tlsOptions.enableTrace) {
this._handle.enableTrace();
}
};

// This eliminates a cyclic reference to TLSWrap
// Ref: https://github.com/nodejs/node/commit/f7620fb96d339f704932f9bb9a0dceb9952df2d4
function defineHandleReading(socket, handle) {
Expand Down
1 change: 0 additions & 1 deletion lib/internal/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ function makeSyncWrite(fd) {
}

module.exports = {
kWrapConnectedHandle: Symbol('wrapConnectedHandle'),
isIP,
isIPv4,
isIPv6,
Expand Down
46 changes: 25 additions & 21 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ let debug = require('internal/util/debuglog').debuglog('net', (fn) => {
debug = fn;
});
const {
kWrapConnectedHandle,
isIP,
isIPv4,
isIPv6,
Expand All @@ -53,7 +52,8 @@ const assert = require('internal/assert');
const {
UV_EADDRINUSE,
UV_EINVAL,
UV_ENOTCONN
UV_ENOTCONN,
UV_ECANCELED
} = internalBinding('uv');

const { Buffer } = require('buffer');
Expand Down Expand Up @@ -1064,43 +1064,54 @@ function internalConnect(
}


function internalConnectMultiple(context) {
function internalConnectMultiple(context, canceled) {
clearTimeout(context[kTimeout]);
const self = context.socket;
assert(self.connecting);

// All connections have been tried without success, destroy with error
if (context.current === context.addresses.length) {
if (canceled || context.current === context.addresses.length) {
self.destroy(aggregateErrors(context.errors));
return;
}

assert(self.connecting);

// Reset the TCP handle when trying other addresses
if (context.current > 0) {
if (self?.[kHandle]?._parent) {
self[kHandle]._parent.reinitialize();
} else {
self._handle.reinitialize();
}
}

const { localPort, port, flags } = context;
const { address, family: addressType } = context.addresses[context.current++];
const handle = new TCP(TCPConstants.SOCKET);
let localAddress;
let err;

if (localPort) {
if (addressType === 4) {
localAddress = DEFAULT_IPV4_ADDR;
err = handle.bind(localAddress, localPort);
err = self._handle.bind(localAddress, localPort);
} else { // addressType === 6
localAddress = DEFAULT_IPV6_ADDR;
err = handle.bind6(localAddress, localPort, flags);
err = self._handle.bind6(localAddress, localPort, flags);
}

debug('connect/multiple: binding to localAddress: %s and localPort: %d (addressType: %d)',
localAddress, localPort, addressType);

err = checkBindError(err, localPort, handle);
err = checkBindError(err, localPort, self._handle);
if (err) {
ArrayPrototypePush(context.errors, exceptionWithHostPort(err, 'bind', localAddress, localPort));
internalConnectMultiple(context);
return;
}
}

debug('connect/multiple: attempting to connect to %s:%d (addressType: %d)', address, port, addressType);

const req = new TCPConnectWrap();
req.oncomplete = FunctionPrototypeBind(afterConnectMultiple, undefined, context);
req.address = address;
Expand All @@ -1111,9 +1122,9 @@ function internalConnectMultiple(context) {
ArrayPrototypePush(self.autoSelectFamilyAttemptedAddresses, `${address}:${port}`);

if (addressType === 4) {
err = handle.connect(req, address, port);
err = self._handle.connect(req, address, port);
} else {
err = handle.connect6(req, address, port);
err = self._handle.connect6(req, address, port);
}

if (err) {
Expand Down Expand Up @@ -1337,6 +1348,8 @@ function lookupAndConnectMultiple(self, async_id_symbol, lookup, host, options,
if (!self.connecting) {
return;
} else if (err) {
self.emit('lookup', err, undefined, undefined, host);

// net.createConnection() creates a net.Socket object and immediately
// calls net.Socket.connect() on it (that's us). There are no event
// listeners registered yet so defer the error event to the next tick.
Expand Down Expand Up @@ -1529,7 +1542,7 @@ function afterConnectMultiple(context, status, handle, req, readable, writable)
ArrayPrototypePush(context.errors, ex);

// Try the next address
internalConnectMultiple(context);
internalConnectMultiple(context, status === UV_ECANCELED);
return;
}

Expand All @@ -1540,15 +1553,6 @@ function afterConnectMultiple(context, status, handle, req, readable, writable)
return;
}

// Perform initialization sequence on the handle, then move on with the regular callback
self._handle = handle;
initSocketHandle(self);

if (self[kWrapConnectedHandle]) {
self[kWrapConnectedHandle](handle);
initSocketHandle(self); // This is called again to initialize the TLSWrap
}

if (hasObserver('net')) {
startPerf(
self,
Expand Down
12 changes: 12 additions & 0 deletions src/tcp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ void TCPWrap::Initialize(Local<Object> target,
SetProtoMethod(isolate, t, "setNoDelay", SetNoDelay);
SetProtoMethod(isolate, t, "setKeepAlive", SetKeepAlive);
SetProtoMethod(isolate, t, "reset", Reset);
SetProtoMethod(isolate, t, "reinitialize", Reinitialize);

#ifdef _WIN32
SetProtoMethod(isolate, t, "setSimultaneousAccepts", SetSimultaneousAccepts);
Expand Down Expand Up @@ -142,6 +143,7 @@ void TCPWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(SetNoDelay);
registry->Register(SetKeepAlive);
registry->Register(Reset);
registry->Register(Reinitialize);
#ifdef _WIN32
registry->Register(SetSimultaneousAccepts);
#endif
Expand Down Expand Up @@ -181,6 +183,16 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
// Suggestion: uv_tcp_init() returns void.
}

void TCPWrap::Reinitialize(const FunctionCallbackInfo<Value>& args) {
TCPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));

Environment* env = wrap->env();
int r = uv_tcp_init(env->event_loop(), &wrap->handle_);
CHECK_EQ(r, 0);
}

void TCPWrap::SetNoDelay(const FunctionCallbackInfo<Value>& args) {
TCPWrap* wrap;
Expand Down
1 change: 1 addition & 0 deletions src/tcp_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t> {
ProviderType provider);

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Reinitialize(const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetNoDelay(const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetKeepAlive(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
10 changes: 9 additions & 1 deletion test/parallel/test-http2-ping-settings-heapdump.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ const v8 = require('v8');
// after it is destroyed, either because they are detached from it or have been
// destroyed themselves.

// We use an higher autoSelectFamilyAttemptTimeout in this test as the v8.getHeapSnapshot().resume()
// will slow the connection flow and we don't want the second connection attempt to start.
let autoSelectFamilyAttemptTimeout = common.platformTimeout(1000);
if (common.isWindows) {
// Some of the windows machines in the CI need more time to establish connection
autoSelectFamilyAttemptTimeout = common.platformTimeout(10000);
}

for (const variant of ['ping', 'settings']) {
const server = http2.createServer();
server.on('session', common.mustCall((session) => {
Expand All @@ -30,7 +38,7 @@ for (const variant of ['ping', 'settings']) {
}));

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`,
const client = http2.connect(`http://localhost:${server.address().port}`, { autoSelectFamilyAttemptTimeout },
common.mustCall());
client.on('error', (err) => {
// We destroy the session so it's possible to get ECONNRESET here.
Expand Down
19 changes: 16 additions & 3 deletions test/parallel/test-net-dns-custom-lookup.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,22 @@ function check(addressType, cb) {

function lookup(host, dnsopts, cb) {
dnsopts.family = addressType;

if (addressType === 4) {
process.nextTick(function() {
cb(null, common.localhostIPv4, 4);
if (dnsopts.all) {
cb(null, [{ address: common.localhostIPv4, family: 4 }]);
} else {
cb(null, common.localhostIPv4, 4);
}
});
} else {
process.nextTick(function() {
cb(null, '::1', 6);
if (dnsopts.all) {
cb(null, [{ address: '::1', family: 6 }]);
} else {
cb(null, '::1', 6);
}
});
}
}
Expand All @@ -48,7 +57,11 @@ check(4, function() {
host: 'localhost',
port: 80,
lookup(host, dnsopts, cb) {
cb(null, undefined, 4);
if (dnsopts.all) {
cb(null, [{ address: undefined, family: 4 }]);
} else {
cb(null, undefined, 4);
}
}
}).on('error', common.expectsError({ code: 'ERR_INVALID_IP_ADDRESS' }));
}
4 changes: 2 additions & 2 deletions test/parallel/test-net-dns-lookup.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const server = net.createServer(function(client) {

server.listen(0, common.mustCall(function() {
net.connect(this.address().port, 'localhost')
.on('lookup', common.mustCall(function(err, ip, type, host) {
.on('lookup', common.mustCallAtLeast(function(err, ip, type, host) {
assert.strictEqual(err, null);
assert.match(ip, /^(127\.0\.0\.1|::1)$/);
assert.match(type.toString(), /^(4|6)$/);
assert.strictEqual(host, 'localhost');
}));
}, 1));
}));
6 changes: 5 additions & 1 deletion test/parallel/test-net-options-lookup.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ function connectDoesNotThrow(input) {
{
// Verify that an error is emitted when an invalid address family is returned.
const s = connectDoesNotThrow((host, options, cb) => {
cb(null, '127.0.0.1', 100);
if (options.all) {
cb(null, [{ address: '127.0.0.1', family: 100 }]);
} else {
cb(null, '127.0.0.1', 100);
}
});

s.on('error', common.expectsError({
Expand Down
18 changes: 6 additions & 12 deletions test/parallel/test-net-server-reset.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@ server.on('close', common.mustCall());

assert.strictEqual(server, server.listen(0, () => {
net.createConnection(server.address().port)
.on('error', common.mustCall(
common.expectsError({
code: 'ECONNRESET',
name: 'Error'
}))
);
.on('error', common.mustCall((error) => {
assert.strictEqual(error.code, 'ECONNRESET');
}));
net.createConnection(server.address().port)
.on('error', common.mustCall(
common.expectsError({
code: 'ECONNRESET',
name: 'Error'
}))
);
.on('error', common.mustCall((error) => {
assert.strictEqual(error.code, 'ECONNRESET');
}));
}));

0 comments on commit c8b5f22

Please sign in to comment.