Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: fixes and refactors
Browse files Browse the repository at this point in the history
PR-URL: #31
  • Loading branch information
jasnell committed Aug 19, 2019
1 parent 5317024 commit 8ea3356
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 433 deletions.
164 changes: 95 additions & 69 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ function onStreamReady(streamHandle, id) {
assert(!session.closing);

// TODO(@jasnell): Get default options from session
const stream = new QuicStream({ /* options */ }, session, id, streamHandle);
const uni = id & 0b10;
const stream = new QuicStream({ writable: !uni }, session, id, streamHandle);
if (uni)
stream.end();
session[kAddStream](id, stream);
process.nextTick(emit.bind(session, 'stream', stream));
}
Expand Down Expand Up @@ -368,6 +371,67 @@ function connectAfterBind(session, lookup, address, type) {
connectAfterLookup.bind(session, type));
}

function createSecureContext(options, init_cb) {
const {
ca,
cert,
ciphers = DEFAULT_QUIC_CIPHERS,
clientCertEngine,
crl,
dhparam,
ecdhCurve,
groups = DEFAULT_GROUPS,
honorCipherOrder,
key,
passphrase,
pfx,
sessionIdContext,
secureProtocol
} = { ...options };

if (typeof ciphers !== 'string')
throw new ERR_INVALID_ARG_TYPE('option.ciphers', 'string', ciphers);
if (typeof groups !== 'string')
throw new ERR_INVALID_ARG_TYPE('option.groups', 'string', groups);

const sc = _createSecureContext({
secureProtocol,
ca,
cert,
ciphers: ciphers || DEFAULT_QUIC_CIPHERS,
clientCertEngine,
crl,
dhparam,
ecdhCurve,
honorCipherOrder,
key,
passphrase,
pfx,
sessionIdContext
});
// Perform additional QUIC specific initialization on the SecureContext
init_cb(sc.context, groups || DEFAULT_GROUPS);
return sc;
}

function onNewKeylogListener(event) {
if (event !== 'keylog' ||
this[kHandle] === undefined ||
this.listenerCount('keylog') !== 0) {
return;
}
this[kHandle].state[IDX_QUIC_SESSION_STATE_KEYLOG_ENABLED] = 1;
}

function onRemoveKeylogListener(event) {
if (event !== 'keylog' ||
this[kHandle] === undefined ||
this.listenerCount('keylog') !== 0) {
return;
}
this[kHandle].state[IDX_QUIC_SESSION_STATE_KEYLOG_ENABLED] = 0;
}

// QuicSocket wraps a UDP socket plus the associated TLS context and QUIC
// Protocol state. There may be *multiple* QUIC connections (QuicSession)
// associated with a single QuicSocket.
Expand Down Expand Up @@ -640,6 +704,10 @@ class QuicSocket extends EventEmitter {
return;
this.#state = kSocketClosing;

// Otherwise, gracefully close each QuicSession, with
// [kMaybeDestroy]() being called after each closes.
const maybeDestroy = this[kMaybeDestroy].bind(this);

// If there are no sessions, call [kMaybeDestroy]()
// immediately to destroy the QuicSocket
if (this.#sessions.size === 0) {
Expand All @@ -648,9 +716,6 @@ class QuicSocket extends EventEmitter {
return;
}

// Otherwise, gracefully close each QuicSession, with
// [kMaybeDestroy]() being called after each closes.
const maybeDestroy = this[kMaybeDestroy].bind(this);
for (const session of this.#sessions)
session.close(maybeDestroy);
}
Expand Down Expand Up @@ -802,67 +867,6 @@ class QuicSocket extends EventEmitter {
}
}

function createSecureContext(options, init_cb) {
const {
ca,
cert,
ciphers = DEFAULT_QUIC_CIPHERS,
clientCertEngine,
crl,
dhparam,
ecdhCurve,
groups = DEFAULT_GROUPS,
honorCipherOrder,
key,
passphrase,
pfx,
sessionIdContext,
secureProtocol
} = { ...options };

if (typeof ciphers !== 'string')
throw new ERR_INVALID_ARG_TYPE('option.ciphers', 'string', ciphers);
if (typeof groups !== 'string')
throw new ERR_INVALID_ARG_TYPE('option.groups', 'string', groups);

const sc = _createSecureContext({
secureProtocol,
ca,
cert,
ciphers: ciphers || DEFAULT_QUIC_CIPHERS,
clientCertEngine,
crl,
dhparam,
ecdhCurve,
honorCipherOrder,
key,
passphrase,
pfx,
sessionIdContext
});
// Perform additional QUIC specific initialization on the SecureContext
init_cb(sc.context, groups || DEFAULT_GROUPS);
return sc;
}

function onNewKeylogListener(event) {
if (event !== 'keylog' ||
this[kHandle] === undefined ||
this.listenerCount('keylog') !== 0) {
return;
}
this[kHandle].state[IDX_QUIC_SESSION_STATE_KEYLOG_ENABLED] = 1;
}

function onRemoveKeylogListener(event) {
if (event !== 'keylog' ||
this[kHandle] === undefined ||
this.listenerCount('keylog') !== 0) {
return;
}
this[kHandle].state[IDX_QUIC_SESSION_STATE_KEYLOG_ENABLED] = 0;
}

class QuicSession extends EventEmitter {
#alpn = undefined;
#cipher = undefined;
Expand All @@ -887,6 +891,18 @@ class QuicSession extends EventEmitter {
this.#servername = servername;
}

[kInspect]() {
const obj = {
alpn: this.#alpn,
cipher: this.cipher,
closing: this.closing,
destroyed: this.destroyed,
servername: this.servername,
streams: this.#streams.size,
};
return `${this.constructor.name} ${util.format(obj)}`;
}

[kSetSocket](socket) {
this.#socket = socket;
}
Expand Down Expand Up @@ -1104,6 +1120,10 @@ class QuicSession extends EventEmitter {
this,
id,
handle);
if (halfOpen) {
stream.push(null);
stream.read();
}
this.#streams.set(id, stream);
return stream;
}
Expand Down Expand Up @@ -1318,15 +1338,15 @@ class QuicStream extends Duplex {
...options,
allowHalfOpen: true,
decodeStrings: true,
emitClose: true
emitClose: true,
autoDestroy: true,
});
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this[kHandle] = handle;
this.#id = id;
this.#session = session;
this._readableState.readingMore = true;
}

get serverInitiated() {
Expand Down Expand Up @@ -1361,8 +1381,14 @@ class QuicStream extends Duplex {
}

[kInspect]() {
const direction = this.bidirectional ? 'bidirectional' : 'unidirectional';
const initiated = this.serverInitiated ? 'server' : 'client';
const obj = {
id: this.#id
id: this.#id,
direction,
initiated,
writableState: this._writableState,
readableState: this._readableState,
};
return `QuicStream ${util.format(obj)}`;
}
Expand Down Expand Up @@ -1444,8 +1470,8 @@ class QuicStream extends Duplex {
const handle = this[kHandle];
if (handle !== undefined) {
this[kHandle] = undefined;
handle[owner_symbol] = undefined;
handle.destroy();
handle[owner_symbol] = undefined;
}
callback(error);
}
Expand Down
20 changes: 9 additions & 11 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,21 @@ function onWriteComplete(status) {
this.callback(null);
}

function createWriteWrap(handle) {
function createWriteWrap(handle, callback) {
const req = new WriteWrap();

req.handle = handle;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
req.callback = callback;

return req;
}

function writevGeneric(self, data, cb) {
const req = createWriteWrap(self[kHandle]);
const req = createWriteWrap(self[kHandle], cb);
const allBuffers = data.allBuffers;
var chunks;
var i;
Expand All @@ -126,29 +127,26 @@ function writevGeneric(self, data, cb) {
// Retain chunks
if (err === 0) req._chunks = chunks;

afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(self, req, err);
return req;
}

function writeGeneric(self, data, encoding, cb) {
const req = createWriteWrap(self[kHandle]);
const req = createWriteWrap(self[kHandle], cb);
const err = handleWriteReq(req, data, encoding);

afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(self, req, err);
return req;
}

function afterWriteDispatched(self, req, err, cb) {
function afterWriteDispatched(self, req, err) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];

if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);

if (!req.async) {
cb();
} else {
req.callback = cb;
if (!req.async && typeof req.callback === 'function') {
req.callback();
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/node_quic_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ class QuicBuffer : public MemoryRetainer {
CHECK_EQ(length_, 0);
}

inline uint64_t Copy(
uv_buf_t* bufs,
size_t nbufs) {
uint64_t total = 0;
for (size_t n = 0; n < nbufs; n++) {
MallocedBuffer<uint8_t> data(bufs[n].len);
memcpy(data.data, bufs[n].base, bufs[n].len);
total += bufs[n].len;
Push(std::move(data));
}
return total;
}

// Push one or more uv_buf_t instances into the buffer.
// the done_cb callback will be invoked when the last
// uv_buf_t in the bufs array is consumed and popped out
Expand Down
Loading

0 comments on commit 8ea3356

Please sign in to comment.