Skip to content

Commit

Permalink
quic: more of the implementation, fixing bits
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Dec 29, 2023
1 parent 1ae7f4b commit b0f1863
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 216 deletions.
8 changes: 4 additions & 4 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ StreamPriority Session::Application::GetStreamPriority(const Stream& stream) {
return StreamPriority::DEFAULT;
}

BaseObjectPtr<Packet> Session::Application::CreateStreamDataPacket() {
Packet* Session::Application::CreateStreamDataPacket() {
return Packet::Create(env(),
session_->endpoint_.get(),
session_->remote_address_,
Expand Down Expand Up @@ -224,7 +224,7 @@ void Session::Application::SendPendingData() {
Debug(session_, "Application sending pending data");
PathStorage path;

BaseObjectPtr<Packet> packet;
Packet* packet = nullptr;
uint8_t* pos = nullptr;
int err = 0;

Expand Down Expand Up @@ -261,9 +261,9 @@ void Session::Application::SendPendingData() {
return session_->Close(Session::CloseMethod::SILENT);
}

if (!packet) {
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (!packet) {
if (packet == nullptr) {
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}
Expand Down
2 changes: 1 addition & 1 deletion src/quic/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class Session::Application : public MemoryRetainer {
inline Session& session() { return *session_; }
inline const Session& session() const { return *session_; }

BaseObjectPtr<Packet> CreateStreamDataPacket();
Packet* CreateStreamDataPacket();

struct StreamData;

Expand Down
2 changes: 1 addition & 1 deletion src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class BindingData final
// bridge out to the JS API.
static void SetCallbacks(const v8::FunctionCallbackInfo<v8::Value>& args);

std::vector<BaseObjectPtr<BaseObject>> packet_freelist;
std::vector<Packet*> packet_freelist;

std::unordered_map<Endpoint*, BaseObjectPtr<BaseObject>> listening_endpoints;

Expand Down
21 changes: 21 additions & 0 deletions src/quic/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <memory_tracker-inl.h>
#include <ngtcp2/ngtcp2.h>
#include <node_sockaddr-inl.h>
#include <string_bytes.h>
#include <v8.h>
#include "defs.h"
#include "util.h"

namespace node {
Expand All @@ -26,6 +28,25 @@ Path::Path(const SocketAddress& local, const SocketAddress& remote) {
ngtcp2_addr_init(&this->remote, remote.data(), remote.length());
}

std::string Path::ToString() const {
DebugIndentScope indent;
auto prefix = indent.Prefix();

const sockaddr* local_in = reinterpret_cast<const sockaddr*>(local.addr);
auto local_addr = SocketAddress::GetAddress(local_in);
auto local_port = SocketAddress::GetPort(local_in);

const sockaddr* remote_in = reinterpret_cast<const sockaddr*>(remote.addr);
auto remote_addr = SocketAddress::GetAddress(remote_in);
auto remote_port = SocketAddress::GetPort(remote_in);

std::string res("{");
res += prefix + "local: " + local_addr + ":" + std::to_string(local_port);
res += prefix + "remote: " + remote_addr + ":" + std::to_string(remote_port);
res += indent.Close();
return res;
}

PathStorage::PathStorage() {
ngtcp2_path_storage_zero(this);
}
Expand Down
2 changes: 2 additions & 0 deletions src/quic/data.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <string>
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC

Expand All @@ -17,6 +18,7 @@ namespace quic {
struct Path final : public ngtcp2_path {
Path(const SocketAddress& local, const SocketAddress& remote);
inline operator ngtcp2_path*() { return this; }
std::string ToString() const;
};

struct PathStorage final : public ngtcp2_path_storage {
Expand Down
81 changes: 48 additions & 33 deletions src/quic/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,20 +554,31 @@ SocketAddress Endpoint::UDP::local_address() const {
return SocketAddress::FromSockName(impl_->handle_);
}

int Endpoint::UDP::Send(BaseObjectPtr<Packet> packet) {
int Endpoint::UDP::Send(Packet* packet) {
if (is_closed_or_closing()) return UV_EBADF;
DCHECK(packet && !packet->is_sending());
DCHECK_NOT_NULL(packet);
uv_buf_t buf = *packet;
return packet->Dispatch(
uv_udp_send,
&impl_->handle_,
&buf,
1,
packet->destination().data(),
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
auto ptr = static_cast<Packet*>(ReqWrap<uv_udp_send_t>::from_req(req));
ptr->Done(status);
}});

// We don't use the default implementation of Dispatch because the packet
// itself is going to be reset and added to a freelist to be reused. The
// default implementation of Dispatch will cause the packet to be deleted,
// which we don't want. We call ClearWeak here just to be doubly sure.
packet->ClearWeak();
packet->Dispatched();
int err = uv_udp_send(packet->req(), &impl_->handle_, &buf, 1, packet->destination().data(),
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
auto ptr =
static_cast<Packet*>(ReqWrap<uv_udp_send_t>::from_req(req));
ptr->env()->DecreaseWaitingRequestCounter();
ptr->Done(status);
}});
if (err < 0) {
// The packet failed.
packet->Done(err);
} else {
packet->env()->IncreaseWaitingRequestCounter();
}
return err;
}

void Endpoint::UDP::MemoryInfo(MemoryTracker* tracker) const {
Expand Down Expand Up @@ -788,7 +799,8 @@ void Endpoint::DisassociateStatelessResetToken(
}
}

void Endpoint::Send(BaseObjectPtr<Packet> packet) {
void Endpoint::Send(Packet* packet) {
CHECK_NOT_NULL(packet);
#ifdef DEBUG
// When diagnostic packet loss is enabled, the packet will be randomly
// dropped. This can happen to any type of packet. We use this only in
Expand Down Expand Up @@ -1077,7 +1089,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
// as a server, then we cannot accept the initial packet.
if (is_closed() || is_closing() || !is_listening()) return;

Debug(this, "Trying to create new session for initial packet");
Debug(this, "Trying to create new session for %s", config.dcid);
auto session = Session::Create(this, config);
if (session) {
receive(session.get(),
Expand All @@ -1096,7 +1108,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
const SocketAddress& local_address,
const SocketAddress& remote_address) {
// Conditionally accept an initial packet to create a new session.
Debug(this, "Trying to accept initial packet");
Debug(this, "Trying to accept initial packet for %s from %s", dcid, remote_address);

// If we're not listening as a server, do not accept an initial packet.
if (state_->listening == 0) return;
Expand All @@ -1111,7 +1123,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
// successful, or an error code if it was not. Currently there's only one
// documented error code (NGTCP2_ERR_INVALID_ARGUMENT) but we'll handle
// any error here the same -- by ignoring the packet entirely.
Debug(this, "Failed to accept initial packet");
Debug(this, "Failed to accept initial packet from %s", remote_address);
return;
}

Expand All @@ -1120,7 +1132,8 @@ void Endpoint::Receive(const uv_buf_t& buf,
// version negotiation packet in response.
if (ngtcp2_is_supported_version(hd.version) == 0) {
Debug(this,
"Packet was not accepted because the version is not supported");
"Packet was not accepted because the version (%d) is not supported",
hd.version);
SendVersionNegotiation(
PathDescriptor{version, dcid, scid, local_address, remote_address});
STAT_INCREMENT(Stats, packets_received);
Expand All @@ -1142,8 +1155,8 @@ void Endpoint::Receive(const uv_buf_t& buf,
if (state_->busy || limits_exceeded) {
Debug(this,
"Packet was not accepted because the endpoint is busy or the "
"remote peer has exceeded their maximum number of concurrent "
"connections");
"remote address %s has exceeded their maximum number of concurrent "
"connections", remote_address);
// Endpoint is busy or the connection count is exceeded. The connection is
// refused. For the purpose of stats collection, we'll count both of these
// the same.
Expand Down Expand Up @@ -1190,7 +1203,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
// a new token. If it does exist, and if it is valid, we grab the original
// cid and continue.
if (!is_remote_address_validated) {
Debug(this, "Remote address is not validated");
Debug(this, "Remote address %s is not validated", remote_address);
switch (hd.type) {
case NGTCP2_PKT_INITIAL:
// First, let's see if we need to do anything here.
Expand All @@ -1199,8 +1212,8 @@ void Endpoint::Receive(const uv_buf_t& buf,
// If there is no token, generate and send one.
if (hd.tokenlen == 0) {
Debug(this,
"Initial packet has no token. Sending retry to start "
"validation");
"Initial packet has no token. Sending retry to %s to start "
"validation", remote_address);
SendRetry(PathDescriptor{
version,
dcid,
Expand All @@ -1219,15 +1232,15 @@ void Endpoint::Receive(const uv_buf_t& buf,
switch (hd.token[0]) {
case RetryToken::kTokenMagic: {
RetryToken token(hd.token, hd.tokenlen);
Debug(this, "Initial packet has retry token %s", token);
Debug(this, "Initial packet from %s has retry token %s", remote_address, token);
auto ocid = token.Validate(
version,
remote_address,
dcid,
options_.token_secret,
options_.retry_token_expiration * NGTCP2_SECONDS);
if (!ocid.has_value()) {
Debug(this, "Retry token is invalid.");
Debug(this, "Retry token from %s is invalid.", remote_address);
// Invalid retry token was detected. Close the connection.
SendImmediateConnectionClose(
PathDescriptor{
Expand All @@ -1243,22 +1256,23 @@ void Endpoint::Receive(const uv_buf_t& buf,
// original retry packet sent to the client. We use it for
// validation.
Debug(this,
"Retry token is valid. Original dcid %s",
ocid.value());
"Retry token from %s is valid. Original dcid %s",
remote_address, ocid.value());
config.ocid = ocid.value();
config.retry_scid = dcid;
config.set_token(token);
break;
}
case RegularToken::kTokenMagic: {
RegularToken token(hd.token, hd.tokenlen);
Debug(this, "Initial packet has regular token %s", token);
Debug(this, "Initial packet from %s has regular token %s",
remote_address, token);
if (!token.Validate(
version,
remote_address,
options_.token_secret,
options_.token_expiration * NGTCP2_SECONDS)) {
Debug(this, "Regular token is invalid.");
Debug(this, "Regular token from %s is invalid.", remote_address);
// If the regular token is invalid, let's send a retry to be
// lenient. There's a small risk that a malicious peer is
// trying to make us do some work but the risk is fairly low
Expand All @@ -1275,12 +1289,12 @@ void Endpoint::Receive(const uv_buf_t& buf,
STAT_INCREMENT(Stats, packets_received);
return;
}
Debug(this, "Regular token is valid.");
Debug(this, "Regular token from %s is valid.", remote_address);
config.set_token(token);
break;
}
default: {
Debug(this, "Initial packet has unknown token type");
Debug(this, "Initial packet from %s has unknown token type", remote_address);
// If our prefix bit does not match anything we know about,
// let's send a retry to be lenient. There's a small risk that a
// malicious peer is trying to make us do some work but the risk
Expand All @@ -1301,10 +1315,11 @@ void Endpoint::Receive(const uv_buf_t& buf,
// path to the remote address is valid (for now). Let's record that
// so we don't have to do this dance again for this endpoint
// instance.
Debug(this, "Remote address is validated");
Debug(this, "Remote address %s is validated", remote_address);
addrLRU_.Upsert(remote_address)->validated = true;
} else if (hd.tokenlen > 0) {
Debug(this, "Ignoring initial packet with unexpected token");
Debug(this, "Ignoring initial packet from %s with unexpected token",
remote_address);
// If validation is turned off and there is a token, that's weird.
// The peer should only have a token if we sent it to them and we
// wouldn't have sent it unless validation was turned on. Let's
Expand All @@ -1314,7 +1329,7 @@ void Endpoint::Receive(const uv_buf_t& buf,
}
break;
case NGTCP2_PKT_0RTT:
Debug(this, "Sending retry to initial 0RTT packet");
Debug(this, "Sending retry to %s due to initial 0RTT packet", remote_address);
// If it's a 0RTT packet, we're always going to perform path
// validation no matter what. This is a bit unfortunate since
// ORTT is supposed to be, you know, 0RTT, but sending a retry
Expand Down
4 changes: 2 additions & 2 deletions src/quic/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class Endpoint final : public AsyncWrap, public Packet::Listener {
Session* session);
void DisassociateStatelessResetToken(const StatelessResetToken& token);

void Send(BaseObjectPtr<Packet> packet);
void Send(Packet* packet);

// Generates and sends a retry packet. This is terminal for the connection.
// Retry packets are used to force explicit path validation by issuing a token
Expand Down Expand Up @@ -294,7 +294,7 @@ class Endpoint final : public AsyncWrap, public Packet::Listener {
int Start();
void Stop();
void Close();
int Send(BaseObjectPtr<Packet> packet);
int Send(Packet* packet);

// Returns the local UDP socket address to which we are bound,
// or fail with an assert if we are not bound.
Expand Down
15 changes: 11 additions & 4 deletions src/quic/http3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ class Http3Application final : public Session::Application {
Http3Application(Session* session,
const Session::Application_Options& options)
: Application(session, options),
allocator_(BindingData::Get(env())),
options_(options),
conn_(InitializeConnection()) {
session->set_priority_supported();
}

bool Start() override {
CHECK(!started_);
started_ = true;
Debug(&session(), "Starting HTTP/3 application.");
auto params = ngtcp2_conn_get_remote_transport_params(session());
if (params == nullptr) {
Expand Down Expand Up @@ -385,7 +388,10 @@ class Http3Application final : public Session::Application {
SET_SELF_SIZE(Http3Application);

private:
inline operator nghttp3_conn*() const { return conn_.get(); }
inline operator nghttp3_conn*() const {
DCHECK_NOT_NULL(conn_.get());
return conn_.get();
}

bool CreateAndBindControlStreams() {
Debug(&session(), "Creating and binding HTTP/3 control streams");
Expand Down Expand Up @@ -418,15 +424,14 @@ class Http3Application final : public Session::Application {

Http3ConnectionPointer InitializeConnection() {
nghttp3_conn* conn = nullptr;
nghttp3_mem allocator = BindingData::Get(env());
nghttp3_settings settings = options_;
if (session().is_server()) {
CHECK_EQ(nghttp3_conn_server_new(
&conn, &kCallbacks, &settings, &allocator, this),
&conn, &kCallbacks, &settings, &allocator_, this),
0);
} else {
CHECK_EQ(nghttp3_conn_client_new(
&conn, &kCallbacks, &settings, &allocator, this),
&conn, &kCallbacks, &settings, &allocator_, this),
0);
}
return Http3ConnectionPointer(conn);
Expand Down Expand Up @@ -578,6 +583,8 @@ class Http3Application final : public Session::Application {
&session(), "HTTP/3 application received updated settings ", options_);
}

bool started_ = false;
nghttp3_mem allocator_;
Session::Application_Options options_;
Http3ConnectionPointer conn_;
int64_t control_stream_id_ = -1;
Expand Down
Loading

0 comments on commit b0f1863

Please sign in to comment.