Skip to content

Commit

Permalink
async_wrap: add provider types for net server
Browse files Browse the repository at this point in the history
Adds `TCPSERVERWRAP` and `PIPESERVERWRAP` as provider types. This
makes it possible to distinguish servers from connections.

Backport-PR-URL: #17621
PR-URL: #17157
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
AndreasMadsen authored and MylesBorins committed Dec 12, 2017

Verified

This commit was signed with the committer’s verified signature. The key has expired.
addaleax Anna Henningsen
1 parent f1b26be commit e0ce7cf
Showing 32 changed files with 288 additions and 168 deletions.
6 changes: 3 additions & 3 deletions benchmark/net/tcp-raw-c2s.js
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ const bench = common.createBenchmark(main, {
dur: [5]
});

const TCP = process.binding('tcp_wrap').TCP;
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const PORT = common.PORT;
@@ -36,7 +36,7 @@ function fail(err, syscall) {
}

function server() {
const serverHandle = new TCP();
const serverHandle = new TCP(TCPConstants.SERVER);
var err = serverHandle.bind('127.0.0.1', PORT);
if (err)
fail(err, 'bind');
@@ -92,7 +92,7 @@ function client() {
throw new Error(`invalid type: ${type}`);
}

const clientHandle = new TCP();
const clientHandle = new TCP(TCPConstants.SOCKET);
const connectReq = new TCPConnectWrap();
const err = clientHandle.connect(connectReq, '127.0.0.1', PORT);

6 changes: 3 additions & 3 deletions benchmark/net/tcp-raw-pipe.js
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ const bench = common.createBenchmark(main, {
dur: [5]
});

const TCP = process.binding('tcp_wrap').TCP;
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const PORT = common.PORT;
@@ -35,7 +35,7 @@ function fail(err, syscall) {
}

function server() {
const serverHandle = new TCP();
const serverHandle = new TCP(TCPConstants.SERVER);
var err = serverHandle.bind('127.0.0.1', PORT);
if (err)
fail(err, 'bind');
@@ -89,7 +89,7 @@ function client() {
throw new Error(`invalid type: ${type}`);
}

const clientHandle = new TCP();
const clientHandle = new TCP(TCPConstants.SOCKET);
const connectReq = new TCPConnectWrap();
const err = clientHandle.connect(connectReq, '127.0.0.1', PORT);
var bytes = 0;
6 changes: 3 additions & 3 deletions benchmark/net/tcp-raw-s2c.js
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ const bench = common.createBenchmark(main, {
dur: [5]
});

const TCP = process.binding('tcp_wrap').TCP;
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap;
const WriteWrap = process.binding('stream_wrap').WriteWrap;
const PORT = common.PORT;
@@ -35,7 +35,7 @@ function fail(err, syscall) {
}

function server() {
const serverHandle = new TCP();
const serverHandle = new TCP(TCPConstants.SERVER);
var err = serverHandle.bind('127.0.0.1', PORT);
if (err)
fail(err, 'bind');
@@ -107,7 +107,7 @@ function server() {
}

function client() {
const clientHandle = new TCP();
const clientHandle = new TCP(TCPConstants.SOCKET);
const connectReq = new TCPConnectWrap();
const err = clientHandle.connect(connectReq, '127.0.0.1', PORT);

12 changes: 6 additions & 6 deletions doc/api/async_hooks.md
Original file line number Diff line number Diff line change
@@ -236,7 +236,7 @@ resource's constructor.
```text
FSEVENTWRAP, FSREQWRAP, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPPARSER,
JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP, SHUTDOWNWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPWRAP, TIMERWRAP, TTYWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVER, TCPWRAP, TIMERWRAP, TTYWRAP,
UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
RANDOMBYTESREQUEST, TLSWRAP, Timeout, Immediate, TickObject
```
@@ -275,13 +275,13 @@ require('net').createServer((conn) => {}).listen(8080);
Output when hitting the server with `nc localhost 8080`:

```console
TCPWRAP(2): trigger: 1 execution: 1
TCPSERVERWRAP(2): trigger: 1 execution: 1
TCPWRAP(4): trigger: 2 execution: 0
```

The first `TCPWRAP` is the server which receives the connections.
The `TCPSERVERWRAP` is the server which receives the connections.

The second `TCPWRAP` is the new connection from the client. When a new
The `TCPWRAP` is the new connection from the client. When a new
connection is made the `TCPWrap` instance is immediately constructed. This
happens outside of any JavaScript stack (side note: a `executionAsyncId()` of `0`
means it's being executed from C++, with no JavaScript stack above it).
@@ -354,7 +354,7 @@ require('net').createServer(() => {}).listen(8080, () => {
Output from only starting the server:

```console
TCPWRAP(2): trigger: 1 execution: 1
TCPSERVERWRAP(2): trigger: 1 execution: 1
TickObject(3): trigger: 2 execution: 1
before: 3
Timeout(4): trigger: 3 execution: 3
@@ -387,7 +387,7 @@ Only using `execution` to graph resource allocation results in the following:
TTYWRAP(6) -> Timeout(4) -> TIMERWRAP(5) -> TickObject(3) -> root(1)
```

The `TCPWRAP` is not part of this graph, even though it was the reason for
The `TCPSERVERWRAP` is not part of this graph, even though it was the reason for
`console.log()` being called. This is because binding to a port without a
hostname is a *synchronous* operation, but to maintain a completely asynchronous
API the user's callback is placed in a `process.nextTick()`.
8 changes: 5 additions & 3 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
@@ -34,8 +34,8 @@ const { Buffer } = require('buffer');
const debug = util.debuglog('tls');
const { Timer } = process.binding('timer_wrap');
const tls_wrap = process.binding('tls_wrap');
const { TCP } = process.binding('tcp_wrap');
const { Pipe } = process.binding('pipe_wrap');
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const errors = require('internal/errors');
const kConnectOptions = Symbol('connect-options');
const kDisableRenegotiation = Symbol('disable-renegotiation');
@@ -398,7 +398,9 @@ TLSSocket.prototype._wrapHandle = function(wrap) {

var options = this._tlsOptions;
if (!handle) {
handle = options.pipe ? new Pipe() : new TCP();
handle = options.pipe ?
new Pipe(PipeConstants.SOCKET) :
new TCP(TCPConstants.SOCKET);
handle.owner = this;
}

4 changes: 2 additions & 2 deletions lib/child_process.js
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ const { createPromise,
promiseResolve, promiseReject } = process.binding('util');
const debug = util.debuglog('child_process');
const { Buffer } = require('buffer');
const { Pipe } = process.binding('pipe_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { errname } = process.binding('uv');
const child_process = require('internal/child_process');
const {
@@ -103,7 +103,7 @@ exports.fork = function(modulePath /*, args, options*/) {

exports._forkChild = function(fd) {
// set process.send()
var p = new Pipe(true);
var p = new Pipe(PipeConstants.IPC);
p.open(fd);
p.unref();
const control = setupChannel(process, p);
6 changes: 3 additions & 3 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ const assert = require('assert');

const { Process } = process.binding('process_wrap');
const { WriteWrap } = process.binding('stream_wrap');
const { Pipe } = process.binding('pipe_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { TTY } = process.binding('tty_wrap');
const { TCP } = process.binding('tcp_wrap');
const { UDP } = process.binding('udp_wrap');
@@ -863,7 +863,7 @@ function _validateStdio(stdio, sync) {
};

if (!sync)
a.handle = new Pipe();
a.handle = new Pipe(PipeConstants.SOCKET);

acc.push(a);
} else if (stdio === 'ipc') {
@@ -876,7 +876,7 @@ function _validateStdio(stdio, sync) {
throw new errors.Error('ERR_IPC_SYNC_FORK');
}

ipc = new Pipe(true);
ipc = new Pipe(PipeConstants.IPC);
ipcFd = i;

acc.push({
32 changes: 22 additions & 10 deletions lib/net.js
Original file line number Diff line number Diff line change
@@ -37,8 +37,8 @@ const {

const { Buffer } = require('buffer');
const TTYWrap = process.binding('tty_wrap');
const { TCP } = process.binding('tcp_wrap');
const { Pipe } = process.binding('pipe_wrap');
const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { TCPConnectWrap } = process.binding('tcp_wrap');
const { PipeConnectWrap } = process.binding('pipe_wrap');
const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
@@ -57,10 +57,20 @@ const exceptionWithHostPort = util._exceptionWithHostPort;

function noop() {}

function createHandle(fd) {
function createHandle(fd, is_server) {
const type = TTYWrap.guessHandleType(fd);
if (type === 'PIPE') return new Pipe();
if (type === 'TCP') return new TCP();
if (type === 'PIPE') {
return new Pipe(
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET
);
}

if (type === 'TCP') {
return new TCP(
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
);
}

throw new errors.TypeError('ERR_INVALID_FD_TYPE', type);
}

@@ -200,7 +210,7 @@ function Socket(options) {
this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle);
} else if (options.fd !== undefined) {
this._handle = createHandle(options.fd);
this._handle = createHandle(options.fd, false);
this._handle.open(options.fd);
this[async_id_symbol] = this._handle.getAsyncId();
// options.fd can be string (since it is user-defined),
@@ -1009,7 +1019,9 @@ Socket.prototype.connect = function(...args) {
debug('pipe', pipe, path);

if (!this._handle) {
this._handle = pipe ? new Pipe() : new TCP();
this._handle = pipe ?
new Pipe(PipeConstants.SOCKET) :
new TCP(TCPConstants.SOCKET);
initSocketHandle(this);
}

@@ -1269,7 +1281,7 @@ function createServerHandle(address, port, addressType, fd) {
var isTCP = false;
if (typeof fd === 'number' && fd >= 0) {
try {
handle = createHandle(fd);
handle = createHandle(fd, true);
} catch (e) {
// Not a fd we can listen on. This will trigger an error.
debug('listen invalid fd=%d:', fd, e.message);
@@ -1280,15 +1292,15 @@ function createServerHandle(address, port, addressType, fd) {
handle.writable = true;
assert(!address && !port);
} else if (port === -1 && addressType === -1) {
handle = new Pipe();
handle = new Pipe(PipeConstants.SERVER);
if (process.platform === 'win32') {
var instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES);
if (!isNaN(instances)) {
handle.setPendingInstances(instances);
}
}
} else {
handle = new TCP();
handle = new TCP(TCPConstants.SERVER);
isTCP = true;
}

2 changes: 2 additions & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ namespace node {
V(HTTPPARSER) \
V(JSSTREAM) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(PROMISE) \
@@ -54,6 +55,7 @@ namespace node {
V(SIGNALWRAP) \
V(STATWATCHER) \
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
V(TIMERWRAP) \
V(TTYWRAP) \
4 changes: 3 additions & 1 deletion src/connection_wrap.cc
Original file line number Diff line number Diff line change
@@ -51,7 +51,9 @@ void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
if (status == 0) {
env->set_init_trigger_async_id(wrap_data->get_async_id());
// Instantiate the client javascript object and handle.
Local<Object> client_obj = WrapType::Instantiate(env, wrap_data);
Local<Object> client_obj = WrapType::Instantiate(env,
wrap_data,
WrapType::SOCKET);

// Unwrap the client javascript object.
WrapType* wrap;
48 changes: 42 additions & 6 deletions src/pipe_wrap.cc
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Int32;
using v8::Local;
using v8::Object;
using v8::String;
@@ -48,14 +49,17 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;


Local<Object> PipeWrap::Instantiate(Environment* env, AsyncWrap* parent) {
Local<Object> PipeWrap::Instantiate(Environment* env,
AsyncWrap* parent,
PipeWrap::SocketType type) {
EscapableHandleScope handle_scope(env->isolate());
AsyncHooks::InitScope init_scope(env, parent->get_async_id());
CHECK_EQ(false, env->pipe_constructor_template().IsEmpty());
Local<Function> constructor = env->pipe_constructor_template()->GetFunction();
CHECK_EQ(false, constructor.IsEmpty());
Local<Value> type_value = Int32::New(env->isolate(), type);
Local<Object> instance =
constructor->NewInstance(env->context()).ToLocalChecked();
constructor->NewInstance(env->context(), 1, &type_value).ToLocalChecked();
return handle_scope.Escape(instance);
}

@@ -107,6 +111,15 @@ void PipeWrap::Initialize(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "PipeConnectWrap");
cwt->SetClassName(wrapString);
target->Set(wrapString, cwt->GetFunction());

// Define constants
Local<Object> constants = Object::New(env->isolate());
NODE_DEFINE_CONSTANT(constants, SOCKET);
NODE_DEFINE_CONSTANT(constants, SERVER);
NODE_DEFINE_CONSTANT(constants, IPC);
target->Set(context,
FIXED_ONE_BYTE_STRING(env->isolate(), "constants"),
constants).FromJust();
}


@@ -115,17 +128,40 @@ void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
// Therefore we assert that we are not trying to call this as a
// normal function.
CHECK(args.IsConstructCall());
CHECK(args[0]->IsInt32());
Environment* env = Environment::GetCurrent(args);
new PipeWrap(env, args.This(), args[0]->IsTrue());

int type_value = args[0].As<Int32>()->Value();
PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value);

bool ipc;
ProviderType provider;
switch (type) {
case SOCKET:
provider = PROVIDER_PIPEWRAP;
ipc = false;
break;
case SERVER:
provider = PROVIDER_PIPESERVERWRAP;
ipc = false;
break;
case IPC:
provider = PROVIDER_PIPEWRAP;
ipc = true;
break;
default:
UNREACHABLE();
}

new PipeWrap(env, args.This(), provider, ipc);
}


PipeWrap::PipeWrap(Environment* env,
Local<Object> object,
ProviderType provider,
bool ipc)
: ConnectionWrap(env,
object,
AsyncWrap::PROVIDER_PIPEWRAP) {
: ConnectionWrap(env, object, provider) {
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
Loading

0 comments on commit e0ce7cf

Please sign in to comment.