Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: add back pressure guards
Browse files Browse the repository at this point in the history
Only extend the stream offset if we are in flowing mode on the
JS side so that the peer doesn't push too much when we're not
reading.

PR-URL: #31
  • Loading branch information
jasnell committed Aug 19, 2019
1 parent aa99a6a commit 5b76872
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 3 deletions.
19 changes: 18 additions & 1 deletion lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,16 @@ function afterShutdown() {
stream[kMaybeDestroy]();
}

function streamOnResume() {
if (!this.destroyed)
this[kHandle].readStart();
}

function streamOnPause() {
if (!this.destroyed /* && !this.pending */)
this[kHandle].readStop();
}

class QuicStream extends Duplex {
#didRead = false;
#id = undefined;
Expand All @@ -1349,6 +1359,7 @@ class QuicStream extends Duplex {
this.#id = id;
this.#session = session;
this._readableState.readingMore = true;
this.on('pause', streamOnPause);

// See src/node_quic_stream.h for an explanation
// of the initial states for unidirectional streams.
Expand Down Expand Up @@ -1475,7 +1486,13 @@ class QuicStream extends Duplex {
this._readableState.readingMore = false;
this.#didRead = true;
}
this[kHandle].readStart();

streamOnResume.call(this);
// if (!this.pending) {
// streamOnResume.call(this);
// } else {
// this.once('ready', streamOnResume);
// }
}

get bufferSize() {
Expand Down
5 changes: 4 additions & 1 deletion src/node_quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1347,12 +1347,15 @@ int QuicSession::ReceiveStreamData(
CHECK_NOT_NULL(stream);
stream->ReceiveData(fin, data, datalen);

ngtcp2_conn_extend_max_stream_offset(connection_, stream_id, datalen);
ngtcp2_conn_extend_max_offset(connection_, datalen);

return 0;
}

void QuicSession::ExtendStreamOffset(QuicStream* stream, size_t amount) {
ngtcp2_conn_extend_max_stream_offset(connection_, stream->GetID(), amount);
}

// Removes the given connection id from the QuicSession.
void QuicSession::RemoveConnectionID(
const ngtcp2_cid* cid) {
Expand Down
1 change: 1 addition & 0 deletions src/node_quic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class QuicSession : public AsyncWrap,
void Close();
void Closing();
void Destroy();
void ExtendStreamOffset(QuicStream* stream, size_t amount);
const std::string& GetALPN();
void GetLocalTransportParams(
ngtcp2_transport_params* params);
Expand Down
8 changes: 7 additions & 1 deletion src/node_quic_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ QuicStream::QuicStream(
session_(session),
stream_id_(stream_id),
flags_(QUICSTREAM_FLAG_INITIAL),
available_outbound_length_(0) {
available_outbound_length_(0),
inbound_consumed_data_while_paused_(0) {
CHECK_NOT_NULL(session);
SetInitialFlags();
session->AddStream(this);
Expand Down Expand Up @@ -251,6 +252,7 @@ int QuicStream::ReadStart() {
CHECK(IsReadable());
SetReadStart();
SetReadResume();
session_->ExtendStreamOffset(this, inbound_consumed_data_while_paused_);
return 0;
}

Expand Down Expand Up @@ -291,6 +293,10 @@ void QuicStream::ReceiveData(int fin, const uint8_t* data, size_t datalen) {
data += avail;
datalen -= avail;
EmitRead(avail, buf);
if (IsReadPaused())
inbound_consumed_data_while_paused_ += avail;
else
session_->ExtendStreamOffset(this, avail);
};

// When fin != 0, we've received that last chunk of data for this
Expand Down
1 change: 1 addition & 0 deletions src/node_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class QuicStream : public AsyncWrap,

QuicBuffer streambuf_;
size_t available_outbound_length_;
size_t inbound_consumed_data_while_paused_;
};

} // namespace quic
Expand Down

0 comments on commit 5b76872

Please sign in to comment.