From 0f657ab0ca10135c75bd25e345bb3e4ca308c104 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 20 May 2019 17:11:07 -0700 Subject: [PATCH] quic: fixes and refactors --- lib/internal/quic/core.js | 9 +- lib/internal/stream_base_commons.js | 20 +- src/node_quic_buffer.h | 13 + src/node_quic_session.cc | 337 +++++++++++------------ src/node_quic_session.h | 38 ++- src/node_quic_socket.cc | 36 +-- src/node_quic_socket.h | 2 - src/node_quic_stream.cc | 162 +++++------ src/node_quic_stream.h | 70 ++--- test/parallel/test-quic-client-server.js | 20 +- 10 files changed, 346 insertions(+), 361 deletions(-) diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 85da6e8c99..40154605fd 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -640,6 +640,10 @@ class QuicSocket extends EventEmitter { return; this.#state = kSocketClosing; + // Otherwise, gracefully close each QuicSession, with + // [kMaybeDestroy]() being called after each closes. + const maybeDestroy = this[kMaybeDestroy].bind(this); + // If there are no sessions, call [kMaybeDestroy]() // immediately to destroy the QuicSocket if (this.#sessions.size === 0) { @@ -648,9 +652,6 @@ class QuicSocket extends EventEmitter { return; } - // Otherwise, gracefully close each QuicSession, with - // [kMaybeDestroy]() being called after each closes. - const maybeDestroy = this[kMaybeDestroy].bind(this); for (const session of this.#sessions) session.close(maybeDestroy); } @@ -1444,8 +1445,8 @@ class QuicStream extends Duplex { const handle = this[kHandle]; if (handle !== undefined) { this[kHandle] = undefined; - handle[owner_symbol] = undefined; handle.destroy(); + handle[owner_symbol] = undefined; } callback(error); } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 88896083f1..cc0c0fbd28 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -92,7 +92,7 @@ function onWriteComplete(status) { this.callback(null); } -function createWriteWrap(handle) { +function createWriteWrap(handle, callback) { const req = new WriteWrap(); req.handle = handle; @@ -100,12 +100,13 @@ function createWriteWrap(handle) { req.async = false; req.bytes = 0; req.buffer = null; + req.callback = callback; return req; } function writevGeneric(self, data, cb) { - const req = createWriteWrap(self[kHandle]); + const req = createWriteWrap(self[kHandle], cb); const allBuffers = data.allBuffers; var chunks; var i; @@ -126,29 +127,26 @@ function writevGeneric(self, data, cb) { // Retain chunks if (err === 0) req._chunks = chunks; - afterWriteDispatched(self, req, err, cb); + afterWriteDispatched(self, req, err); return req; } function writeGeneric(self, data, encoding, cb) { - const req = createWriteWrap(self[kHandle]); + const req = createWriteWrap(self[kHandle], cb); const err = handleWriteReq(req, data, encoding); - - afterWriteDispatched(self, req, err, cb); + afterWriteDispatched(self, req, err); return req; } -function afterWriteDispatched(self, req, err, cb) { +function afterWriteDispatched(self, req, err) { req.bytes = streamBaseState[kBytesWritten]; req.async = !!streamBaseState[kLastWriteWasAsync]; if (err !== 0) return self.destroy(errnoException(err, 'write', req.error), cb); - if (!req.async) { - cb(); - } else { - req.callback = cb; + if (!req.async && typeof req.callback === 'function') { + req.callback(); } } diff --git a/src/node_quic_buffer.h b/src/node_quic_buffer.h index 3993cec103..5b86e6271f 100644 --- a/src/node_quic_buffer.h +++ b/src/node_quic_buffer.h @@ -187,6 +187,19 @@ class QuicBuffer : public MemoryRetainer { CHECK_EQ(length_, 0); } + inline uint64_t Copy( + uv_buf_t* bufs, + size_t nbufs) { + uint64_t total = 0; + for (size_t n = 0; n < nbufs; n++) { + MallocedBuffer data(bufs[n].len); + memcpy(data.data, bufs[n].base, bufs[n].len); + total += bufs[n].len; + Push(std::move(data)); + } + return total; + } + // Push one or more uv_buf_t instances into the buffer. // the done_cb callback will be invoked when the last // uv_buf_t in the bufs array is consumed and popped out diff --git a/src/node_quic_session.cc b/src/node_quic_session.cc index ad66db3bbf..6360a28b0d 100644 --- a/src/node_quic_session.cc +++ b/src/node_quic_session.cc @@ -239,6 +239,17 @@ int QuicSession::OnExtendMaxStreamsUni( return 0; } +int QuicSession::OnExtendMaxStreamData( + ngtcp2_conn* conn, + int64_t stream_id, + uint64_t max_data, + void* user_data, + void* stream_user_data) { + QuicSession* session = static_cast(user_data); + session->ExtendMaxStreamData(stream_id, max_data); + return 0; +} + // Called by ngtcp2 for both client and server connections // when ngtcp2 has determined that the TLS handshake has // been completed. @@ -415,7 +426,7 @@ int QuicSession::OnReceiveStreamData( void* stream_user_data) { QuicSession* session = static_cast(user_data); RETURN_IF_FAIL( - session->ReceiveStreamData(stream_id, fin, offset, data, datalen), 0, + session->ReceiveStreamData(stream_id, fin, data, datalen), 0, NGTCP2_ERR_CALLBACK_FAILURE); return 0; } @@ -826,6 +837,12 @@ void QuicSession::AddStream(QuicStream* stream) { streams_.emplace(stream->GetID(), stream); } +void QuicSession::ExtendMaxStreamData( + int64_t stream_id, + uint64_t max_data) { + // TODO(@jasnell): Extend max stream data +} + // Forwards detailed debugging information from ngtcp2. void QuicSession::DebugLog( void* user_data, @@ -1128,6 +1145,11 @@ bool QuicSession::IsInClosingPeriod() { return ngtcp2_conn_is_in_closing_period(connection_); } +bool QuicSession::IsInDrainingPeriod() { + CHECK(!IsDestroyed()); + return ngtcp2_conn_is_in_draining_period(connection_); +} + void QuicSession::OnIdleTimeout( uv_timer_t* timer) { QuicSession* session = static_cast(timer->data); @@ -1305,21 +1327,17 @@ const ngtcp2_cid* QuicSession::scid() const { int QuicSession::ReceiveStreamData( int64_t stream_id, int fin, - uint64_t offset, const uint8_t* data, size_t datalen) { CHECK(!IsDestroyed()); - // Locate the QuicStream to receive this data. If - // one does not exist, create it and notify the JS side... - // then pass on the received data + HandleScope scope(env()->isolate()); Local context = env()->context(); Context::Scope context_scope(context); + QuicStream* stream = FindStream(stream_id); if (stream == nullptr) { if (IsClosing()) { - // If the QuicSession is closing, reject and shutdown any - // new streams that are received from the peer. return ngtcp2_conn_shutdown_stream( connection_, stream_id, @@ -1327,19 +1345,11 @@ int QuicSession::ReceiveStreamData( } stream = CreateStream(stream_id); } + CHECK_NOT_NULL(stream); + stream->ReceiveData(fin, data, datalen); - ngtcp2_conn_extend_max_stream_offset( - connection_, - stream_id, - datalen); - ngtcp2_conn_extend_max_offset( - connection_, - datalen); - - if (stream->ReceiveData(fin, data, datalen) != 0) - return -1; - - StartIdleTimer(-1); + ngtcp2_conn_extend_max_stream_offset(connection_, stream_id, datalen); + ngtcp2_conn_extend_max_offset(connection_, datalen); return 0; } @@ -1363,6 +1373,27 @@ void QuicSession::RemoveStream( streams_.erase(stream_id); } +// Write any packets current pending for the ngtcp2 connection +int QuicSession::WritePackets() { + QuicPathStorage path; + for ( ;; ) { + MallocedBuffer data(max_pktlen_); + ssize_t nwrite = + ngtcp2_conn_write_pkt( + connection_, + &path.path, + data.data, + max_pktlen_, + uv_hrtime()); + if (nwrite <= 0) + return nwrite; + data.Realloc(nwrite); + remote_address_.Update(&path.path.remote); + sendbuf_.Push(std::move(data)); + RETURN_RET_IF_FAIL(SendPacket(), 0); + } +} + namespace { void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) { ngtcp2_vec* v = *pvec; @@ -1391,15 +1422,13 @@ int Empty(const ngtcp2_vec* vec, size_t cnt) { // Sends 0RTT stream data. int QuicSession::Send0RTTStreamData( QuicStream* stream, - int fin, - QuicBuffer* data, QuicBuffer::drain_from from) { CHECK(!IsDestroyed()); ssize_t ndatalen = 0; - int err; std::vector vec; - size_t count = data->DrainInto(&vec, from); + uint8_t fin = stream->IsShutdown() ? 1 : 0; + size_t count = stream->DrainInto(&vec, from); size_t c = count; ngtcp2_vec* v = vec.data(); @@ -1440,15 +1469,13 @@ int QuicSession::Send0RTTStreamData( dest.Realloc(nwrite); sendbuf_.Push(std::move(dest)); - err = SendPacket(); - if (err != 0) - return err; + RETURN_RET_IF_FAIL(SendPacket(), 0); if (Empty(v, c)) break; } // Advance the read head of the source buffer - data->SeekHead(count); + stream->Commit(count); return 0; } @@ -1456,19 +1483,19 @@ int QuicSession::Send0RTTStreamData( // Sends buffered stream data. int QuicSession::SendStreamData( QuicStream* stream, - int should_send_fin, - QuicBuffer* data, QuicBuffer::drain_from from) { CHECK(!IsDestroyed()); ssize_t ndatalen = 0; QuicPathStorage path; - int err; std::vector vec; - size_t count = data->DrainInto(&vec, from); - size_t c = count; + size_t count = stream->DrainInto(&vec, from); + + size_t c = vec.size(); ngtcp2_vec* v = vec.data(); + // Event if there's no data to write, we iterate through just in case + // ngtcp2 has other frames that it needs to encode. for (;;) { MallocedBuffer dest(max_pktlen_); ssize_t nwrite = @@ -1479,23 +1506,18 @@ int QuicSession::SendStreamData( max_pktlen_, &ndatalen, stream->GetID(), - should_send_fin, + stream->IsShutdown() ? 1 : 0, reinterpret_cast(v), c, uv_hrtime()); if (nwrite < 0) { - auto should_break = false; - switch (nwrite) { - case NGTCP2_ERR_STREAM_DATA_BLOCKED: - case NGTCP2_ERR_EARLY_DATA_REJECTED: - case NGTCP2_ERR_STREAM_SHUT_WR: - case NGTCP2_ERR_STREAM_NOT_FOUND: - should_break = true; - break; - } - if (should_break) + if (nwrite == NGTCP2_ERR_STREAM_DATA_BLOCKED || + nwrite == NGTCP2_ERR_EARLY_DATA_REJECTED || + nwrite == NGTCP2_ERR_STREAM_SHUT_WR || + nwrite == NGTCP2_ERR_STREAM_NOT_FOUND) { break; + } return HandleError(nwrite); } @@ -1509,28 +1531,26 @@ int QuicSession::SendStreamData( sendbuf_.Push(std::move(dest)); remote_address_.Update(&path.path.remote); - err = SendPacket(); - if (err != 0) - return err; + RETURN_RET_IF_FAIL(SendPacket(), 0); if (Empty(v, c)) break; } // Advance the read head of the source buffer - data->SeekHead(count); + stream->Commit(count); return 0; } -// Transmits the current contents of the internal sendbuf to the peer -// By default, SendPacket will drain from the txbuf_ read head. +// Transmits the current contents of the internal sendbuf_ to the peer +// By default, SendPacket will drain from the txbuf_ read head. If +// retransmit is true, the entire contents of txbuf_ will be drained. int QuicSession::SendPacket(bool retransmit) { CHECK(!IsDestroyed()); // Move the contents of sendbuf_ to the tail of txbuf_ and reset sendbuf_ if (sendbuf_.Length() > 0) *txbuf_ += std::move(sendbuf_); - // Then pass the txbuf_ off to the socket for transmission - Debug(this, "There are %llu bytes in txbuf_", txbuf_->Length()); + Debug(this, "There are %llu bytes in txbuf_ to send", txbuf_->Length()); return Socket()->SendPacket( &remote_address_, txbuf_, @@ -2106,55 +2126,57 @@ int QuicServerSession::Receive( const struct sockaddr* addr, unsigned int flags) { CHECK(!IsDestroyed()); - Debug(this, "Received packet. nread = %d bytes", nread); + + SendScope scope(this); + int err; // Closing period starts once ngtcp2 has detected that the session - // is being shutdown locally. - if (IsInClosingPeriod()) { - Debug(this, "In closing period"); - // If we receive anything while we're shutting down, just repeat - // the connection close. It's possible that the peer either hasn't - // received it yet or it got lost. To be a bit nicer, we could - // implement this with an exponential backoff but this strategy - // works for now. + // is being shutdown locally. Note that this is different that the + // IsClosing() function, which indicates a graceful shutdown that + // allows the session and streams to finish naturally. When + // IsInClosingPeriod is true, ngtcp2 is actively in the process + // of shutting down the connection and a CONNECTION_CLOSE has + // already been sent. The only thing we can do at this point is + // either ignore the packet or send another CONNECTION_CLOSE. + // + // TODO(@jasnell): Currently, send a CONNECTION_CLOSE on every + // packet received. To be a bit nicer, however, we could + // use an exponential backoff. + if (IsInClosingPeriod()) return SendConnectionClose(0); - } - // Draining period starts once we've detected and idle timeout on + // Draining period starts once we've detected an idle timeout on // this session and we're in the process of shutting down. We // don't want to accept any new packets during this time, so we // simply ignore them. - if (IsDraining()) { - Debug(this, "Draining..."); + if (IsDraining()) return 0; - } + // With QUIC, it is possible for the remote address to change + // from one packet to the next. remote_address_.Copy(addr); QuicPath path(Socket()->GetLocalAddress(), &remote_address_); - if (IsHandshakeCompleted()) { - err = ngtcp2_conn_read_pkt( - connection_, - *path, - data, nread, - uv_hrtime()); - if (err != 0) { - Debug(this, "Error reading packet. Error %d\n", err); - if (err == NGTCP2_ERR_DRAINING) { - StartDrainingPeriod(); - return -1; // Closing - } - return HandleError(err); - } - Debug(this, "Successfully read packet"); + if (!IsHandshakeCompleted()) { + err = DoHandshake(*path, data, nread); + if (err != 0) + SendConnectionClose(err); return 0; } - Debug(this, "TLS Handshake %s", initial_ ? "starting" : "continuing"); - err = DoHandshake(*path, data, nread); - if (err != 0) - return HandleError(err); + err = ngtcp2_conn_read_pkt( + connection_, + *path, + data, nread, + uv_hrtime()); + if (err != 0) { + if (err == NGTCP2_ERR_DRAINING) { + StartDrainingPeriod(); + return -1; // Closing + } + SendConnectionClose(err); + } return 0; } @@ -2208,47 +2230,37 @@ int QuicServerSession::SendPendingData(bool retransmit) { return 0; Debug(this, "Sending pending data for server session"); + int err; - if (IsInClosingPeriod()) + // If we're in the process of closing or draining the connection, do nothing. + if (IsInClosingPeriod() || IsInDrainingPeriod()) return 0; + // If there's anything currently in the sendbuf_, send it. RETURN_RET_IF_FAIL(SendPacket(), 0); + // If the handshake is not yet complete, perform the handshake if (!IsHandshakeCompleted()) { - int err = DoHandshake(nullptr, nullptr, 0); + err = DoHandshake(nullptr, nullptr, 0); if (err == 0) ScheduleMonitor(); return err; } - for (auto stream : streams_) - RETURN_RET_IF_FAIL(stream.second->SendPendingData(retransmit), 0); - - QuicPathStorage path; - - // We call ngtcp2_conn_write_pkt repeatedly until it has - // no more data to write. - for ( ;; ) { - MallocedBuffer data(max_pktlen_); - ssize_t n = - ngtcp2_conn_write_pkt( - connection_, - &path.path, - data.data, - max_pktlen_, - uv_hrtime()); - if (n < 0) { - Debug(this, "There was an error writing the packet. Error %d", n); - return HandleError(n); - } - if (n == 0) - break; - - remote_address_.Update(&path.path.remote); - sendbuf_.Push(std::move(data)); - RETURN_RET_IF_FAIL(SendPacket(), 0); + // For every stream, transmit the stream data, returning + // early if we're unable to send stream data for some + // reason. + for (auto stream : streams_) { + RETURN_RET_IF_FAIL( + SendStreamData( + stream.second, + retransmit ? + QuicBuffer::DRAIN_FROM_ROOT : QuicBuffer::DRAIN_FROM_HEAD), 0); } - Debug(this, "Done sending pending server session data"); + + err = WritePackets(); + if (err < 0) + return HandleError(err); ScheduleMonitor(); return 0; @@ -2522,9 +2534,8 @@ int QuicClientSession::SelectPreferredAddress( } int QuicClientSession::Start() { - int err; for (auto stream : streams_) - RETURN_RET_IF_FAIL(stream.second->Send0RTTData(), 0); + RETURN_RET_IF_FAIL(Send0RTTStreamData(stream.second), 0); return DoHandshakeWriteOnce(); } @@ -2708,23 +2719,17 @@ int QuicClientSession::DoHandshake( CHECK(!IsDestroyed()); - int err; - err = SendPacket(); - if (err != 0) - return err; + RETURN_RET_IF_FAIL(SendPacket(), 0); - err = DoHandshakeReadOnce(path, data, datalen); + int err = DoHandshakeReadOnce(path, data, datalen); if (err != 0) { Close(); return -1; } // Zero Round Trip - for (auto stream : streams_) { - err = stream.second->Send0RTTData(); - if (err != 0) - return err; - } + for (auto stream : streams_) + RETURN_RET_IF_FAIL(Send0RTTStreamData(stream.second), 0); ssize_t nwrite; for (;;) { @@ -2819,29 +2824,27 @@ int QuicClientSession::Receive( const struct sockaddr* addr, unsigned int flags) { CHECK(!IsDestroyed()); - Debug(this, "Received packet. nread = %d bytes", nread); - int err; + SendScope scope(this); + + // It's possible for the remote address to change from one + // packet to the next remote_address_.Copy(addr); QuicPath path(Socket()->GetLocalAddress(), &remote_address_); - if (IsHandshakeCompleted()) { - err = ngtcp2_conn_read_pkt( - connection_, - *path, - data, nread, - uv_hrtime()); - if (err != 0) { - // TODO(@jasnell): Close with the error code? - Close(); - return err; - } - Debug(this, "Successfully read packet"); - } else { - Debug(this, "TLS Handshake continuing"); + if (!IsHandshakeCompleted()) return DoHandshake(*path, data, nread); + + int err = ngtcp2_conn_read_pkt( + connection_, + *path, + data, nread, + uv_hrtime()); + if (err != 0) { + Close(); + return err; } - StartIdleTimer(-1); + return 0; } @@ -2897,10 +2900,11 @@ int QuicClientSession::SendPendingData(bool retransmit) { return 0; Debug(this, "Sending pending data for client session"); - int err = SendPacket(); - if (err != 0) - return err; + // First, send any data currently sitting in the sendbuf_ buffer + RETURN_RET_IF_FAIL(SendPacket(), 0); + int err; + // If we're retransmitting, reset the loss detection timer if (retransmit) { err = ngtcp2_conn_on_loss_detection_timer(connection_, uv_hrtime()); if (err != 0) { @@ -2911,49 +2915,24 @@ int QuicClientSession::SendPendingData(bool retransmit) { } } + // If the TLS handshake is not yet complete, do that and return. if (!IsHandshakeCompleted()) { Debug(this, "Handshake is not completed"); err = DoHandshake(nullptr, nullptr, 0); - // ScheduleMonitor(); + ScheduleMonitor(); return err; } - // Call ngtcp2_conn_write_pkt repeatedly until there is no more - // data to send. - for ( ;; ) { - Debug(this, "Writing packet data"); - MallocedBuffer data(max_pktlen_); - ssize_t nwrite = - ngtcp2_conn_write_pkt( - connection_, - nullptr, - data.data, - max_pktlen_, - uv_hrtime()); - if (nwrite < 0) { - Debug(this, "There was an error writing the packet. Error %d", nwrite); - return HandleError(nwrite); - } - if (nwrite == 0) - break; - data.Realloc(nwrite); - sendbuf_.Push(std::move(data)); - - err = SendPacket(); - if (err != 0) - return err; - } + err = WritePackets(); + if (err < 0) + return HandleError(err); if (!retransmit) { - for (auto stream : streams_) { - err = stream.second->SendPendingData(); - if (err != 0) - return err; - } + // For each stream, send any pending data + for (auto stream : streams_) + RETURN_RET_IF_FAIL(SendStreamData(stream.second), 0); } - Debug(this, "Done sending pending client session data"); - ScheduleMonitor(); return 0; } @@ -3249,7 +3228,7 @@ void NewQuicClientSession(const FunctionCallbackInfo& args) { select_preferred_address_policy, alpn); - socket->SendPendingData(); + session->SendPendingData(); args.GetReturnValue().Set(session->object()); } diff --git a/src/node_quic_session.h b/src/node_quic_session.h index a3c0bcf220..9fa6db5d88 100644 --- a/src/node_quic_session.h +++ b/src/node_quic_session.h @@ -35,7 +35,7 @@ constexpr int ERR_INVALID_TLS_SESSION_TICKET = -2; V(MAX_STREAM_DATA_BIDI_LOCAL, max_stream_data_bidi_local, 256 * 1024) \ V(MAX_STREAM_DATA_BIDI_REMOTE, max_stream_data_bidi_remote, 256 * 1024) \ V(MAX_STREAM_DATA_UNI, max_stream_data_uni, 256 * 1024) \ - V(MAX_DATA, max_data, 1 * (1024 ^ 2)) \ + V(MAX_DATA, max_data, 1 * 1024 * 1024) \ V(MAX_STREAMS_BIDI, max_streams_bidi, 100) \ V(MAX_STREAMS_UNI, max_streams_uni, 3) \ V(IDLE_TIMEOUT, idle_timeout, 10 * 1000) \ @@ -114,20 +114,15 @@ class QuicSession : public AsyncWrap, int ReceiveStreamData( int64_t stream_id, int fin, - uint64_t offset, const uint8_t* data, size_t datalen); void RemoveStream( int64_t stream_id); int Send0RTTStreamData( QuicStream* stream, - int fin, - QuicBuffer* data, QuicBuffer::drain_from from = QuicBuffer::DRAIN_FROM_HEAD); int SendStreamData( QuicStream* stream, - int fin, - QuicBuffer* data, QuicBuffer::drain_from from = QuicBuffer::DRAIN_FROM_HEAD); int SetRemoteTransportParams( ngtcp2_transport_params* params); @@ -225,6 +220,7 @@ class QuicSession : public AsyncWrap, int64_t id); void HandshakeCompleted(); inline bool IsInClosingPeriod(); + inline bool IsInDrainingPeriod(); int PathValidation( const ngtcp2_path* path, ngtcp2_path_validation_result res); @@ -425,6 +421,12 @@ class QuicSession : public AsyncWrap, ngtcp2_conn* conn, uint64_t max_streams, void* user_data); + static int OnExtendMaxStreamData( + ngtcp2_conn* conn, + int64_t stream_id, + uint64_t max_data, + void* user_data, + void* stream_user_data); static void OnKeylog(const SSL* ssl, const char* line); @@ -511,6 +513,10 @@ class QuicSession : public AsyncWrap, ngtcp2_cid* cid, uint8_t* token, size_t cidlen); + void ExtendMaxStreamData( + int64_t stream_id, + uint64_t max_data); + int WritePackets(); inline QuicStream* CreateStream( int64_t stream_id); @@ -588,6 +594,22 @@ class QuicSession : public AsyncWrap, mem::Allocator allocator_; + // SendScope will cause the session to flush it's + // current pending data queue to the underlying + // socket. + class SendScope { + public: + explicit SendScope(QuicSession* session) : session_(session) {} + ~SendScope() { + if (session_->IsDestroyed()) + return; + session_->SendPendingData(); + session_->StartIdleTimer(-1); + } + private: + QuicSession* session_; + }; + friend class QuicServerSession; friend class QuicClientSession; }; @@ -713,7 +735,7 @@ class QuicServerSession : public QuicSession { OnStreamReset, OnExtendMaxStreamsBidi, OnExtendMaxStreamsUni, - nullptr // extend_max_stream_data + OnExtendMaxStreamData }; friend class QuicSession; @@ -866,7 +888,7 @@ class QuicClientSession : public QuicSession { nullptr, // stream_reset nullptr, // extend_max_remote_streams_bidi nullptr, // extend_max_remote_streams_uni - nullptr // extend_max_stream_data + OnExtendMaxStreamData }; friend class QuicSession; diff --git a/src/node_quic_socket.cc b/src/node_quic_socket.cc index dced4e2550..9d061aeb6b 100644 --- a/src/node_quic_socket.cc +++ b/src/node_quic_socket.cc @@ -226,6 +226,7 @@ void QuicSocket::Receive( const struct sockaddr* addr, unsigned int flags) { Debug(this, "Receiving %d bytes from the UDP socket.", nread); + IncrementSocketStat(nread, &socket_stats_, &socket_stats::bytes_received); ngtcp2_pkt_hd hd; int err; @@ -260,47 +261,38 @@ void QuicSocket::Receive( if (scid_it == std::end(dcid_to_scid_)) { Debug(this, "There is no existing session for dcid %s", dcid_hex.c_str()); if (!server_listening_) { - Debug(this, "Ignoring unhandled packet."); + Debug(this, "Ignoring packet because socket is not listening."); return; } - Debug(this, "Dispatching packet to server."); session = ServerReceive(&dcid, &hd, nread, data, addr, flags); if (!session) { Debug(this, "Could not initialize a new QuicServerSession."); - // TODO(@jasnell): What should we do here? + // TODO(@jasnell): Should this be fatal for the QuicSocket? return; } IncrementSocketStat(1, &socket_stats_, &socket_stats::server_sessions); } else { - Debug(this, "An existing QuicSession for this packet was found."); session_it = sessions_.find((*scid_it).second); session = (*session_it).second; CHECK_NE(session_it, std::end(sessions_)); } } else { - Debug(this, "An existing QuicSession for this packet was found."); session = (*session_it).second; } CHECK_NOT_NULL(session); + Debug(this, "Dispatching packet to session"); // An appropriate handler was found! Dispatch the data if (session->IsDestroyed()) { - // Ignoring packet for destroyed session + Debug(this, "Ignoring packet because session is destroyed"); return; } - Debug(this, "Dispatching packet to session for dcid %s", dcid_hex.c_str()); err = session->Receive(&hd, nread, data, addr, flags); if (err != 0) { - Debug(this, - "The QuicSession failed to process the packet successfully. Error %d", - err); + Debug(this, "Ignoring unsuccessfully processed packet. Error %d", err); return; } - - IncrementSocketStat(nread, &socket_stats_, &socket_stats::bytes_received); IncrementSocketStat(1, &socket_stats_, &socket_stats::packets_received); - - SendPendingData(); } int QuicSocket::ReceiveStart() { @@ -331,22 +323,6 @@ void QuicSocket::ReportSendError(int error) { return; } -void QuicSocket::SendPendingData( - bool retransmit) { - - HandleScope handle_scope(env()->isolate()); - InternalCallbackScope callback_scope(this); - - Debug(this, "Sending pending data. Retransmit? %s", - retransmit ? "yes" : "no"); - for (auto session : sessions_) { - int err = session.second->SendPendingData(retransmit); - if (err != 0) { - // TODO(@jasnell): handle error - } - } -} - int QuicSocket::SendVersionNegotiation( const ngtcp2_pkt_hd* chd, const sockaddr* addr) { diff --git a/src/node_quic_socket.h b/src/node_quic_socket.h index 5b7be8c6c8..7716c5e270 100644 --- a/src/node_quic_socket.h +++ b/src/node_quic_socket.h @@ -74,8 +74,6 @@ class QuicSocket : public HandleWrap { const sockaddr* addr); void ReportSendError( int error); - void SendPendingData( - bool retransmit = false); int SetBroadcast( bool on); int SetMulticastInterface( diff --git a/src/node_quic_stream.cc b/src/node_quic_stream.cc index f45ea27247..6ba9f39b64 100644 --- a/src/node_quic_stream.cc +++ b/src/node_quic_stream.cc @@ -11,6 +11,8 @@ #include "node_quic_util.h" #include "v8.h" +#include + namespace node { using v8::Context; @@ -57,15 +59,13 @@ QuicStream::QuicStream( AsyncWrap(session->env(), wrap, AsyncWrap::PROVIDER_QUICSTREAM), StreamBase(session->env()), session_(session), - flags_(0), stream_id_(stream_id), - reset_(false), - should_send_fin_(false), + flags_(QUIC_STREAM_FLAG_NONE), available_outbound_length_(0) { CHECK_NOT_NULL(session); + session->AddStream(this); StreamBase::AttachToObject(GetObject()); PushStreamListener(&stream_listener_); - session->AddStream(this); } QuicStream::~QuicStream() { @@ -74,31 +74,34 @@ QuicStream::~QuicStream() { CHECK_EQ(0, streambuf_.Length()); } -void QuicStream::Close( - uint16_t app_error_code) { +// QuicStream::Close() is called by the QuicSession when ngtcp2 detects that +// a stream has been closed. This, in turn, calls out to the JavaScript to +// start the process of tearing down and destroying the QuicStream instance. +void QuicStream::Close(uint16_t app_error_code) { Debug(this, "Stream %llu closed with code %d", GetID(), app_error_code); HandleScope scope(env()->isolate()); + Context::Scope context_context(env()->context()); flags_ |= QUIC_STREAM_FLAG_CLOSED; Local arg = Number::New(env()->isolate(), app_error_code); MakeCallback(env()->quic_on_stream_close_function(), 1, &arg); } +// Receiving a reset means that any data we've accumulated to send +// can be discarded and we don't want to keep writing data, so +// we want to clear our outbound buffers here and notify +// the JavaScript side that we've been reset so that we stop +// pumping data out. void QuicStream::Reset(uint64_t final_size, uint16_t app_error_code) { - // Receiving a reset means that any data we've accumulated to send - // can be discarded and we don't want to keep writing data, so - // we likely want to clear our outbound buffers here and notify - // the JavaScript side that we've been reset so that we stop - // pumping data out. Debug(this, "Resetting stream %llu with app error code %d, and final size %llu", GetID(), app_error_code, final_size); - reset_ = true; HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); streambuf_.Cancel(); Local argv[] = { - Number::New(env()->isolate(), final_size), + Number::New(env()->isolate(), static_cast(final_size)), Integer::New(env()->isolate(), app_error_code) }; MakeCallback(env()->quic_on_stream_reset_function(), arraysize(argv), argv); @@ -113,10 +116,8 @@ void QuicStream::Destroy() { int QuicStream::DoShutdown(ShutdownWrap* req_wrap) { if (IsDestroyed()) return UV_EPIPE; - Debug(this, "Writable side shutdown"); flags_ |= QUIC_STREAM_FLAG_SHUT; - should_send_fin_ = true; - + session_->SendStreamData(this); return 1; } @@ -125,40 +126,59 @@ int QuicStream::DoWrite( uv_buf_t* bufs, size_t nbufs, uv_stream_t* send_handle) { - CHECK_NULL(send_handle); - // If the stream has been reset, then the writable side of - // the duplex should have been closed and we shouldn't be - // receiving any more data.. but just in case... - if (reset_ || IsDestroyed()) { + if (IsDestroyed()) { req_wrap->Done(UV_EOF); return 0; } - // Buffers written must be held on to until acked. The callback - // passed in here will be called when the ack is received. - // TODO(@jasnell): For now, the data will be held onto for - // pretty much eternity, and the implementation will retry an - // unlimited number of times. We need to constrain that to - // fail reasonably after a given number of attempts. - // Specifically, we need to ensure that all of the data is - // cleaned up when the stream is destroyed, even if it hasn't - // been acknowledged. + // There's a difficult balance required here: + // + // Unlike typical UDP, which is fire-and-forget, QUIC packets + // have to be acknowledged. If a packet is not acknowledged + // soon enough, it is retransmitted. The exact arrangement + // of packets being retransmitted varies over the course of + // the connection on many factors, so we can't simply encode + // the packets and resend them. Instead, we have to retain the + // original data and re-encode packets on each transmission + // attempt. This means we have to persist the data written + // until either an acknowledgement is received or the stream + // is reset and canceled. // - // When more than one buffer is passed in, the callback will - // only be invoked when the final buffer in the set is consumed. - uint64_t len = - streambuf_.Push( - bufs, - nbufs, - [&](int status, void* user_data) { - DecrementAvailableOutboundLength(len); - WriteWrap* wrap = static_cast(user_data); - CHECK_NOT_NULL(wrap); - wrap->Done(status); - }, req_wrap, req_wrap->object()); - - IncrementAvailableOutboundLength(len); + // That said, on the JS Streams API side, we can only write + // one batch of buffers at a time. That is, DoWrite won't be + // called again until the previous DoWrite is completed by + // calling WriteWrap::Done(). The challenge, however, is that + // calling Done() essentially signals that we're done with + // the buffers being written, allowing those to be freed. + // + // In other words, if we just store the given buffers and + // wait to call Done() when we receive an acknowledgement, + // we severely limit our throughput and kill performance + // because the JavaScript side won't be able to send additional + // buffers until we receive the acknowledgement from the peer. + // However, if we call Done() here to allow the next chunk to + // be written, we have to copy the data because the buffers + // may end up being freed once the callback is invoked. The + // memcpy obviously incurs a cost but it'll at least be less + // than waiting for the acknowledgement, allowing data to be + // written faster but at the cost of a data copy. + // + // Because of the need to copy, performing many small writes + // will incur a performance penalty over a smaller number of + // larger writes, but only up to a point. Frequently copying + // large chunks of data will end up slowing things down also. + // + // Because we are copying to allow the JS side to write + // faster independently of the underlying send, we will have + // to be careful not to allow the internal buffer to grow + // too large, or we'll run into several other problems. + + uint64_t len = streambuf_.Copy(bufs, nbufs); + req_wrap->Done(0); + + // IncrementAvailableOutboundLength(len); + session_->SendStreamData(this); return 0; } @@ -170,23 +190,18 @@ QuicSession* QuicStream::Session() { return session_; } -int QuicStream::AckedDataOffset( - uint64_t offset, - size_t datalen) { +void QuicStream::AckedDataOffset(uint64_t offset, size_t datalen) { streambuf_.Consume(datalen); - return 0; } -int QuicStream::Send0RTTData() { - return session_->Send0RTTStreamData(this, should_send_fin_, &streambuf_); +size_t QuicStream::DrainInto( + std::vector* vec, + QuicBuffer::drain_from from) { + return streambuf_.DrainInto(vec, from); } -int QuicStream::SendPendingData(bool retransmit) { - return session_->SendStreamData( - this, - should_send_fin_, - &streambuf_, - retransmit ? QuicBuffer::DRAIN_FROM_ROOT : QuicBuffer::DRAIN_FROM_HEAD); +void QuicStream::Commit(size_t count) { + streambuf_.SeekHead(count); } inline void QuicStream::IncrementAvailableOutboundLength(size_t amount) { @@ -214,9 +229,6 @@ int QuicStream::ReadStart() { Debug(this, "Reading started."); flags_ |= QUIC_STREAM_FLAG_READ_START; flags_ &= ~QUIC_STREAM_FLAG_READ_PAUSED; - - // Flush data to JS here? - return 0; } @@ -229,21 +241,15 @@ int QuicStream::ReadStop() { return 0; } -int QuicStream::ReceiveData( - int fin, - const uint8_t* data, - size_t datalen) { - Debug(this, "Receiving %d bytes of data", datalen); - if (reset_) { - Debug(this, "Stream has been reset, discarding received data."); - return 0; - } - HandleScope scope(env()->isolate()); - do { +// Passes chunks of data on to the JavaScript side as soon as they are +// received. The caller of this must have a HandleScope. +void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) { + Debug(this, "Receiving %d bytes of data. Final? %s", + datalen, fin ? "yes" : "no"); + + while (datalen > 0) { uv_buf_t buf = EmitAlloc(datalen); - ssize_t avail = datalen; - if (static_cast(buf.len) < avail) - avail = buf.len; + size_t avail = std::min(static_cast(buf.len), datalen); // TODO(@jasnell): For now, we're allocating and copying. Once // we determine if we can safely switch to a non-allocated mode @@ -256,17 +262,11 @@ int QuicStream::ReceiveData( memcpy(buf.base, data, avail); data += avail; datalen -= avail; - Debug(this, "Emitting %d bytes of data", avail); EmitRead(avail, buf); - } while (datalen != 0); + }; - if (fin) { - Debug(this, "Emitting EOF"); + if (fin) EmitRead(UV_EOF); - Session()->ShutdownStreamRead(stream_id_); - } - - return 0; } // JavaScript API diff --git a/src/node_quic_stream.h b/src/node_quic_stream.h index 4b195d3b72..4cd4814ec3 100644 --- a/src/node_quic_stream.h +++ b/src/node_quic_stream.h @@ -19,15 +19,10 @@ class QuicServerSession; enum quic_stream_flags { QUIC_STREAM_FLAG_NONE = 0x0, - // Writable side has ended QUIC_STREAM_FLAG_SHUT = 0x1, - // Reading has started QUIC_STREAM_FLAG_READ_START = 0x2, - // Reading is paused QUIC_STREAM_FLAG_READ_PAUSED = 0x4, - // Stream is closed QUIC_STREAM_FLAG_CLOSED = 0x8, - // Stream has received all the data it can QUIC_STREAM_FLAG_EOS = 0x20 }; @@ -45,30 +40,19 @@ class QuicStream : public AsyncWrap, v8::Local target, v8::Local context); - static QuicStream* New( - QuicSession* session, - uint64_t stream_id); - - QuicStream( - QuicSession* session, - v8::Local target, - uint64_t stream_id); + static QuicStream* New(QuicSession* session, uint64_t stream_id); ~QuicStream() override; uint64_t GetID() const; + QuicSession* Session(); - virtual int AckedDataOffset( - uint64_t offset, - size_t datalen); + virtual void AckedDataOffset(uint64_t offset, size_t datalen); - virtual void Close( - uint16_t app_error_code); + virtual void Close(uint16_t app_error_code = 0); - virtual void Reset( - uint64_t final_size, - uint16_t app_error_code); + virtual void Reset(uint64_t final_size, uint16_t app_error_code = 0); virtual void Destroy(); @@ -78,11 +62,6 @@ class QuicStream : public AsyncWrap, size_t nbufs, uv_stream_t* send_handle) override; - inline void IncrementAvailableOutboundLength( - size_t amount); - inline void DecrementAvailableOutboundLength( - size_t amount); - bool IsAlive() override { return !IsDestroyed() && !IsShutdown() && !IsClosing(); } @@ -90,16 +69,17 @@ class QuicStream : public AsyncWrap, return flags_ & QUIC_STREAM_FLAG_SHUT || flags_ & QUIC_STREAM_FLAG_EOS; } - bool IsDestroyed() { return session_ == nullptr; } - bool IsEnded() { return flags_ & QUIC_STREAM_FLAG_EOS; } - bool IsPaused() { return flags_ & QUIC_STREAM_FLAG_READ_PAUSED; } - bool IsReading() { return flags_ & QUIC_STREAM_FLAG_READ_START; } - bool IsShutdown() { return flags_ & QUIC_STREAM_FLAG_SHUT; } - virtual int ReceiveData( - int fin, - const uint8_t* data, - size_t datalen); + inline bool IsDestroyed() { return session_ == nullptr; } + inline bool IsEnded() { return flags_ & QUIC_STREAM_FLAG_EOS; } + inline bool IsPaused() { return flags_ & QUIC_STREAM_FLAG_READ_PAUSED; } + inline bool IsReading() { return flags_ & QUIC_STREAM_FLAG_READ_START; } + inline bool IsShutdown() { return flags_ & QUIC_STREAM_FLAG_SHUT; } + + inline void IncrementAvailableOutboundLength(size_t amount); + inline void DecrementAvailableOutboundLength(size_t amount); + + virtual void ReceiveData(int fin, const uint8_t* data, size_t datalen); // Required for StreamBase int ReadStart() override; @@ -108,12 +88,13 @@ class QuicStream : public AsyncWrap, int ReadStop() override; // Required for StreamBase - int DoShutdown( - ShutdownWrap* req_wrap) override; + int DoShutdown(ShutdownWrap* req_wrap) override; + + size_t DrainInto( + std::vector* vec, + QuicBuffer::drain_from from); - int Send0RTTData(); - int SendPendingData( - bool retransmit = false); + void Commit(size_t count); AsyncWrap* GetAsyncWrap() override { return this; } @@ -129,14 +110,17 @@ class QuicStream : public AsyncWrap, SET_SELF_SIZE(QuicStream) private: + QuicStream( + QuicSession* session, + v8::Local target, + uint64_t stream_id); + QuicStreamListener stream_listener_; QuicSession* session_; - uint32_t flags_; uint64_t stream_id_; - bool reset_; + uint32_t flags_; QuicBuffer streambuf_; - bool should_send_fin_; size_t available_outbound_length_; }; diff --git a/test/parallel/test-quic-client-server.js b/test/parallel/test-quic-client-server.js index 3196f69e34..50835f047c 100644 --- a/test/parallel/test-quic-client-server.js +++ b/test/parallel/test-quic-client-server.js @@ -16,6 +16,7 @@ const cert = fixtures.readKey('agent8-cert.pem', 'binary'); const { debuglog } = require('util'); const debug = debuglog('test'); +const filedata = fs.readFileSync(__filename, { encoding: 'utf8' }); const { createSocket } = require('quic'); @@ -46,7 +47,7 @@ server.on('session', common.mustCall((session) => { session.on('keylog', common.mustCall((line) => { assert(kKeylogs.shift().test(line)); - }), kKeylogs.length); + }, kKeylogs.length)); session.on('secure', common.mustCall((servername, alpn) => { debug('QuicServerSession TLS Handshake Complete'); @@ -65,10 +66,15 @@ server.on('session', common.mustCall((session) => { session.on('stream', common.mustCall((stream) => { debug('Bidirectional, Client-initiated stream %d received', stream.id); const file = fs.createReadStream(__filename); + let data = ''; file.pipe(stream); stream.setEncoding('utf8'); - stream.resume(); - stream.on('end', common.mustCall()); + stream.on('data', (chunk) => data += chunk); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, filedata); + debug('Server received expected data for stream %d', stream.id); + })); + stream.on('close', common.mustCall()); })); })); @@ -110,7 +116,14 @@ server.on('ready', common.mustCall(() => { const file = fs.createReadStream(__filename); const stream = req.openStream(); file.pipe(stream); + let data = ''; stream.resume(); + stream.setEncoding('utf8'); + stream.on('data', (chunk) => data += chunk); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, filedata); + debug('Client received expected data for stream %d', stream.id); + })); stream.on('close', common.mustCall(() => { debug('Bidirectional, Client-initiated stream %d closed', stream.id); countdown.dec(); @@ -125,6 +138,7 @@ server.on('ready', common.mustCall(() => { stream.on('data', (chunk) => data += chunk); stream.on('end', common.mustCall(() => { assert.strictEqual(data, unidata.join('')); + debug('Client received expected data for stream %d', stream.id); })); stream.on('close', common.mustCall(() => { debug('Unidirectional, Server-initiated stream %d closed', stream.id);