Skip to content

Commit

Permalink
refactor(server): remove ServerConnectionIdGenerator
Browse files Browse the repository at this point in the history
A `neqo_transport::Server` manages multiple `neqo_transport::Connection`s. A
`Server` keeps a `HashMap` of all `Connection`s, indexed by `ConnectionId`.

Indexing by `ConnectionId` is difficult as:

- The `ConnectionId` is not known before constructing the connection.
- The `ConnectionId` might change throughout the lifetime of a connection.

Previously the index was kept up-to-date through a wrapper around the
`ConnectionIdGenerator` provided to a `Connection` by a `Server`. This wrapper
would be provided a shared reference to the `Server`s `HashMap` of
`Connection`s. Whenever the `Server` would `process` a `Connection` and that `Connection` would generate a new `ConnectionId` via
`ConnectionIdGenerator::generate_cid`, the wrapped `ConnectionIdGenerator` would
also insert the `Connection` keyed by the new `ConnectionId` into the `Server`'s
`HashMap` of `Connection`s.

This has two problems:

- Complexity through indirection where a `Connection` owned by a `Server` can
indirectly mutate the `Server` through the provided wrapped
`ConnectionIdGenerator` having access to the `Server`'s set of `Connection`s.

- Double `RefCell` borrow e.g. when a `Server` would iterate its `Connection`s,
process each, where one of them might generate a new `ConnectionId` via the
provided `ConnectionIdGenerator`, which in turn would insert the `Connection`
into the currently iterated set of `Connection`s of the `Server`.

This commit replaces the `HashMap<ConnectionId, Connection>` of the `Server`
with a simple `Vec<Connection>`. Given the removal of the index, the
`ConnectionIdGenerator` wrapper (i.e. `ServerConnectionIdGenerator`) is no
longer needed and removed and thus the shared reference to the `Server`s
`Connection` `HashMap` is no longer needed and thus the above mentioned double
borrow is made impossible.

Downside to the removal of the index by `ConnectionId` is a potential
performance hit. When the `Server` manages a large set of `Connection`s, finding
the `Connection` corresponding to a `ConnectionId` (e.g. from an incoming
`Datagram`) is `O(n)` (`n` equal the number of connections).
  • Loading branch information
mxinden committed Jul 12, 2024
1 parent 9f0a86d commit 3d7e8fd
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 234 deletions.
10 changes: 5 additions & 5 deletions neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram};
use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay};
use neqo_http3::Error;
use neqo_transport::{
server::{ActiveConnectionRef, Server, ValidateAddress},
server::{ConnectionRef, Server, ValidateAddress},
ConnectionEvent, ConnectionIdGenerator, Output, State, StreamId,
};
use regex::Regex;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl HttpServer {
})
}

fn save_partial(&mut self, stream_id: StreamId, partial: Vec<u8>, conn: &ActiveConnectionRef) {
fn save_partial(&mut self, stream_id: StreamId, partial: Vec<u8>, conn: &ConnectionRef) {
let url_dbg = String::from_utf8(partial.clone())
.unwrap_or_else(|_| format!("<invalid UTF-8: {}>", hex(&partial)));
if partial.len() < 4096 {
Expand All @@ -87,7 +87,7 @@ impl HttpServer {
}
}

fn write(&mut self, stream_id: StreamId, data: Option<Vec<u8>>, conn: &ActiveConnectionRef) {
fn write(&mut self, stream_id: StreamId, data: Option<Vec<u8>>, conn: &ConnectionRef) {
let resp = data.unwrap_or_else(|| Vec::from(&b"404 That request was nonsense\r\n"[..]));
if let Some(stream_state) = self.write_state.get_mut(&stream_id) {
match stream_state.data_to_send {
Expand All @@ -110,7 +110,7 @@ impl HttpServer {
}
}

fn stream_readable(&mut self, stream_id: StreamId, conn: &ActiveConnectionRef) {
fn stream_readable(&mut self, stream_id: StreamId, conn: &ConnectionRef) {
if !stream_id.is_client_initiated() || !stream_id.is_bidi() {
qdebug!("Stream {} not client-initiated bidi, ignoring", stream_id);
return;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl HttpServer {
self.write(stream_id, resp, conn);
}

fn stream_writable(&mut self, stream_id: StreamId, conn: &ActiveConnectionRef) {
fn stream_writable(&mut self, stream_id: StreamId, conn: &ConnectionRef) {
match self.write_state.get_mut(&stream_id) {
None => {
qwarn!("Unknown stream {stream_id}, ignoring event");
Expand Down
8 changes: 4 additions & 4 deletions neqo-http3/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
use neqo_common::{qtrace, Datagram};
use neqo_crypto::{AntiReplay, Cipher, PrivateKey, PublicKey, ZeroRttChecker};
use neqo_transport::{
server::{ActiveConnectionRef, Server, ValidateAddress},
server::{ConnectionRef, Server, ValidateAddress},
ConnectionIdGenerator, Output,
};

Expand All @@ -39,7 +39,7 @@ const MAX_EVENT_DATA_SIZE: usize = 1024;
pub struct Http3Server {
server: Server,
http3_parameters: Http3Parameters,
http3_handlers: HashMap<ActiveConnectionRef, HandlerRef>,
http3_handlers: HashMap<ConnectionRef, HandlerRef>,
events: Http3ServerEvents,
}

Expand Down Expand Up @@ -147,7 +147,7 @@ impl Http3Server {
}
}

fn process_events(&mut self, conn: &ActiveConnectionRef, now: Instant) {
fn process_events(&mut self, conn: &ConnectionRef, now: Instant) {
let mut remove = false;
let http3_parameters = &self.http3_parameters;
{
Expand Down Expand Up @@ -270,7 +270,7 @@ impl Http3Server {
fn prepare_data(
stream_info: Http3StreamInfo,
handler_borrowed: &mut RefMut<Http3ServerHandler>,
conn: &ActiveConnectionRef,
conn: &ConnectionRef,
handler: &HandlerRef,
now: Instant,
events: &Http3ServerEvents,
Expand Down
20 changes: 10 additions & 10 deletions neqo-http3/src/server_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{

use neqo_common::{qdebug, Encoder, Header};
use neqo_transport::{
server::ActiveConnectionRef, AppError, Connection, DatagramTracking, StreamId, StreamType,
server::ConnectionRef, AppError, Connection, DatagramTracking, StreamId, StreamType,
};

use crate::{
Expand All @@ -27,7 +27,7 @@ use crate::{

#[derive(Debug, Clone)]
pub struct StreamHandler {
pub conn: ActiveConnectionRef,
pub conn: ConnectionRef,
pub handler: Rc<RefCell<Http3ServerHandler>>,
pub stream_info: Http3StreamInfo,
}
Expand Down Expand Up @@ -174,7 +174,7 @@ impl ::std::fmt::Display for Http3OrWebTransportStream {

impl Http3OrWebTransportStream {
pub(crate) const fn new(
conn: ActiveConnectionRef,
conn: ConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_info: Http3StreamInfo,
) -> Self {
Expand Down Expand Up @@ -259,7 +259,7 @@ impl ::std::fmt::Display for WebTransportRequest {

impl WebTransportRequest {
pub(crate) const fn new(
conn: ActiveConnectionRef,
conn: ConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_id: StreamId,
) -> Self {
Expand Down Expand Up @@ -460,7 +460,7 @@ pub enum Http3ServerEvent {
},
/// When individual connection change state. It is only used for tests.
StateChange {
conn: ActiveConnectionRef,
conn: ConnectionRef,
state: Http3State,
},
PriorityUpdate {
Expand Down Expand Up @@ -510,14 +510,14 @@ impl Http3ServerEvents {
}

/// Insert a `StateChange` event.
pub(crate) fn connection_state_change(&self, conn: ActiveConnectionRef, state: Http3State) {
pub(crate) fn connection_state_change(&self, conn: ConnectionRef, state: Http3State) {
self.insert(Http3ServerEvent::StateChange { conn, state });
}

/// Insert a `Data` event.
pub(crate) fn data(
&self,
conn: ActiveConnectionRef,
conn: ConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_info: Http3StreamInfo,
data: Vec<u8>,
Expand All @@ -532,7 +532,7 @@ impl Http3ServerEvents {

pub(crate) fn data_writable(
&self,
conn: ActiveConnectionRef,
conn: ConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_info: Http3StreamInfo,
) {
Expand All @@ -543,7 +543,7 @@ impl Http3ServerEvents {

pub(crate) fn stream_reset(
&self,
conn: ActiveConnectionRef,
conn: ConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_info: Http3StreamInfo,
error: AppError,
Expand All @@ -556,7 +556,7 @@ impl Http3ServerEvents {

pub(crate) fn stream_stop_sending(
&self,
conn: ActiveConnectionRef,
conn: ConnectionRef,
handler: Rc<RefCell<Http3ServerHandler>>,
stream_info: Http3StreamInfo,
error: AppError,
Expand Down
6 changes: 6 additions & 0 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,12 @@ impl Connection {
}
}

/// Whether the given [`ConnectionIdRef`] is a valid local [`ConnectionId`].
#[must_use]
pub fn is_valid_local_cid(&self, cid: ConnectionIdRef) -> bool {
self.cid_manager.is_valid(cid)
}

/// Process new input datagrams on the connection.
pub fn process_input(&mut self, d: &Datagram, now: Instant) {
self.process_multiple_input(iter::once(d), now);
Expand Down
Loading

0 comments on commit 3d7e8fd

Please sign in to comment.