Skip to content

Commit

Permalink
Fix infinite loop on blocked stream
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Jun 6, 2024
1 parent f5a56b6 commit a2c6fbe
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 13 deletions.
61 changes: 48 additions & 13 deletions src/quic/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2209,8 +2209,11 @@ void Session::StreamDataBlocked(stream_id id) {
IncrementStat(&SessionStats::block_count);

BaseObjectPtr<Stream> stream = FindStream(id);
if (stream)
if (stream) {
stream->OnBlocked();
} else {
Debug(this, "Stream %" PRId64 " not found to block", id);
}
}

void Session::IncrementConnectionCloseAttempts() {
Expand Down Expand Up @@ -3268,26 +3271,34 @@ bool Session::Application::SendPendingData() {
uint8_t* pos = nullptr;
size_t packets_sent = 0;
int err;
std::vector<stream_id> blocking;
bool ret = false;
BaseObjectPtr<Stream> stream;

Debug(session(), "Start sending pending data");
for (;;) {
ssize_t ndatalen;
StreamData stream_data;
ssize_t ndatalen = 0;
err = GetStreamData(&stream_data);
if (err < 0) {
session()->set_last_error(kQuicInternalError);
return false;
goto end;
}

// If the packet was sent previously, then packet will have been reset.
if (!pos) {
packet = CreateStreamDataPacket();
if (!packet) {
Debug(session(), "Failed to create packet for stream data");
session()->set_last_error(kQuicInternalError);
goto end;
}
pos = packet->data();
}

// If stream_data.id is -1, then we're not serializing any data for any
// specific stream. We still need to process QUIC session packets tho.
if (stream_data.id > -1) {
if (stream_data.id >= 0) {
Debug(session(), "Serializing packets for stream id %" PRId64,
stream_data.id);
packet->AddRetained(stream_data.stream->GetOutboundSource());
Expand Down Expand Up @@ -3322,10 +3333,11 @@ bool Session::Application::SendPendingData() {
// CONNECTION_CLOSE since even those require a
// packet number.
session()->Close(Session::SessionCloseFlags::SILENT);
return false;
goto end;
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
Debug(session(), "Stream %lld blocked", stream_data.id);
Debug(session(), "Stream %lld blocked session data left %lld", stream_data.id, session()->max_data_left());
session()->StreamDataBlocked(stream_data.id);
blocking.push_back(stream_data.id);
if (session()->max_data_left() == 0) {
if (stream_data.id >= 0) {
Debug(session(), "Resuming %llu after block", stream_data.id);
Expand All @@ -3344,6 +3356,7 @@ bool Session::Application::SendPendingData() {
CHECK_LE(ndatalen, 0);
continue;
case NGTCP2_ERR_STREAM_NOT_FOUND:
Debug(session(), "Stream %lld no found", stream_data.id);
continue;
case NGTCP2_ERR_WRITE_MORE:
CHECK_GT(ndatalen, 0);
Expand All @@ -3354,7 +3367,7 @@ bool Session::Application::SendPendingData() {
if(nwrite != 0){ // -ve response i.e error
packet.reset();
session()->set_last_error(kQuicInternalError);
return false;
goto end;
}

// 0 bytes in this sending operation
Expand All @@ -3368,9 +3381,10 @@ bool Session::Application::SendPendingData() {
Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending",
packet->length());
if (!session()->SendPacket(std::move(packet), path))
return false;
goto end;
}
return true;
ret = true;
goto end;
}

pos += nwrite;
Expand All @@ -3384,16 +3398,27 @@ bool Session::Application::SendPendingData() {
Debug(session(), "Sending %" PRIu64 " bytes in serialized packet", nwrite);
if (!session()->SendPacket(std::move(packet), path)) {
Debug(session(), "-- Failed to send packet");
return false;
goto end;
}
pos = nullptr;
if (++packets_sent == kMaxPackets) {
Debug(session(), "-- Max packets sent");
break;
}
Debug(session(), "-- Looping");
} // end for


ret = true;
end:
for(stream_id id : blocking) {
stream = session()->FindStream(id);
if (stream) {
stream->Unblock();
}
}
return true;

return ret;
}

void Session::Application::StreamClose(
Expand Down Expand Up @@ -3576,13 +3601,24 @@ bool DefaultApplication::ReceiveStreamData(
}

int DefaultApplication::GetStreamData(StreamData* stream_data) {
if (stream_queue_.IsEmpty()) {
stream_data->id = -1;
return 0;
}

Stream* stream = stream_queue_.PopFront();
stream_data->stream.reset(stream);
if (stream == nullptr) {
stream_data->id = -1;
return 0;
}
CHECK(!stream->is_destroyed());
stream_data->stream.reset(stream);

if(stream->IsBlocked()){
stream_data->id = -1;
return 0;
}

stream_data->id = stream->id();
auto next = [&](
int status,
Expand All @@ -3600,7 +3636,6 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) {

stream_data->count = count;
if (count > 0) {

stream->Schedule(&stream_queue_);
stream_data->remaining = get_length(data, count);
} else {
Expand Down
10 changes: 10 additions & 0 deletions src/quic/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,23 @@ void Stream::OnBlocked() {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());

blocked = true;

BaseObjectPtr<Stream> ptr(this);
USE(state->stream_blocked_callback()->Call(
env()->context(),
object(),
0, nullptr));
}

bool Stream::IsBlocked(){
return blocked;
}

void Stream::Unblock() {
blocked = false;
}

void Stream::OnReset(error_code app_error_code) {
BindingState* state = BindingState::Get(env());
HandleScope scope(env()->isolate());
Expand Down
4 changes: 4 additions & 0 deletions src/quic/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ class Stream final : public AsyncWrap,
void BeginHeaders(HeadersKind kind);

void OnBlocked();
bool IsBlocked();
void Unblock();

void OnReset(error_code app_error_code);

void Commit(size_t ammount);
Expand Down Expand Up @@ -295,6 +298,7 @@ class Stream final : public AsyncWrap,
BaseObjectPtr<Session> session_;
AliasedStruct<State> state_;
stream_id id_;
bool blocked = false;
bool destroyed_ = false;
bool destroying_ = false;

Expand Down

0 comments on commit a2c6fbe

Please sign in to comment.