Skip to content

Commit

Permalink
refactor(s2n-quic-dc): reduce the number of peer_addr calls (#2401)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Dec 4, 2024
1 parent 4e85ccb commit 513212c
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 79 deletions.
16 changes: 12 additions & 4 deletions dc/s2n-quic-dc/src/stream/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,18 @@ where
Sub: event::Subscriber,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Stream")
.field("peer_addr", &self.peer_addr().unwrap())
.field("local_addr", &self.local_addr().unwrap())
.finish()
let mut s = f.debug_struct("Stream");

for (name, addr) in [
("peer_addr", self.peer_addr()),
("local_addr", self.local_addr()),
] {
if let Ok(addr) = addr {
s.field(name, &addr);
}
}

s.finish()
}
}

Expand Down
4 changes: 2 additions & 2 deletions dc/s2n-quic-dc/src/stream/environment/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ where
}

/// A socket that should be reregistered with the application runtime
pub struct TcpReregistered(pub TcpStream);
pub struct TcpReregistered(pub TcpStream, pub SocketAddress);

impl<Sub> super::Peer<Environment<Sub>> for TcpReregistered
where
Expand All @@ -308,7 +308,7 @@ where

#[inline]
fn setup(self, _env: &Environment<Sub>) -> super::Result<super::SocketSet<Self::WorkerSocket>> {
let remote_addr = self.0.peer_addr()?.into();
let remote_addr = self.1;
let source_control_port = self.0.local_addr()?.port();
let application = Box::new(self.0.into_std()?);
Ok(super::SocketSet {
Expand Down
16 changes: 12 additions & 4 deletions dc/s2n-quic-dc/src/stream/recv/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,18 @@ where
Sub: event::Subscriber,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Reader")
.field("peer_addr", &self.peer_addr().unwrap())
.field("local_addr", &self.local_addr().unwrap())
.finish()
let mut s = f.debug_struct("Reader");

for (name, addr) in [
("peer_addr", self.peer_addr()),
("local_addr", self.local_addr()),
] {
if let Ok(addr) = addr {
s.field(name, &addr);
}
}

s.finish()
}
}

Expand Down
16 changes: 12 additions & 4 deletions dc/s2n-quic-dc/src/stream/send/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,18 @@ where
Sub: event::Subscriber,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Writer")
.field("peer_addr", &self.peer_addr().unwrap())
.field("local_addr", &self.local_addr().unwrap())
.finish()
let mut s = f.debug_struct("Writer");

for (name, addr) in [
("peer_addr", self.peer_addr()),
("local_addr", self.local_addr()),
] {
if let Ok(addr) = addr {
s.field(name, &addr);
}
}

s.finish()
}
}

Expand Down
111 changes: 46 additions & 65 deletions dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ where

fresh.fill(cx, &self.socket, &publisher);

for socket in fresh.drain() {
workers.push(socket, now, &self.subscriber, &publisher);
for (socket, remote_address) in fresh.drain() {
workers.push(socket, remote_address, now, &self.subscriber, &publisher);
}

let res = workers.poll(cx, &mut context, now, &publisher);
Expand Down Expand Up @@ -154,7 +154,7 @@ fn publisher<'a, Sub: Subscriber, C: Clock>(
///
/// This should produce overall better latencies in the case of overloaded queues.
struct FreshQueue {
queue: VecDeque<TcpStream>,
queue: VecDeque<(TcpStream, SocketAddress)>,
}

impl FreshQueue {
Expand Down Expand Up @@ -186,32 +186,31 @@ impl FreshQueue {

while let Poll::Ready(res) = listener.poll_accept(cx) {
match res {
Ok((socket, remote_addr)) => {
Ok((socket, remote_address)) => {
if self.queue.len() == self.queue.capacity() {
if let Some(remote_addr) = self
if let Some(remote_address) = self
.queue
.pop_back()
.and_then(|socket| socket.peer_addr().ok())
.map(|(_socket, remote_address)| remote_address)
{
let remote_address: SocketAddress = remote_addr.into();
let remote_address = &remote_address;
publisher.on_acceptor_tcp_stream_dropped(
event::builder::AcceptorTcpStreamDropped { remote_address, reason: event::builder::AcceptorTcpStreamDropReason::FreshQueueAtCapacity },
event::builder::AcceptorTcpStreamDropped { remote_address: &remote_address, reason: event::builder::AcceptorTcpStreamDropReason::FreshQueueAtCapacity },
);
dropped += 1;
}
}

let remote_address: SocketAddress = remote_addr.into();
let remote_address = &remote_address;
let remote_address: SocketAddress = remote_address.into();
publisher.on_acceptor_tcp_fresh_enqueued(
event::builder::AcceptorTcpFreshEnqueued { remote_address },
event::builder::AcceptorTcpFreshEnqueued {
remote_address: &remote_address,
},
);
enqueued += 1;

// most recent streams go to the front of the line, since they're the most
// likely to be successfully processed
self.queue.push_front(socket);
self.queue.push_front((socket, remote_address));
}
Err(error) => {
// TODO submit to a separate error channel that the application can subscribe
Expand Down Expand Up @@ -239,7 +238,7 @@ impl FreshQueue {
)
}

fn drain(&mut self) -> impl Iterator<Item = TcpStream> + '_ {
fn drain(&mut self) -> impl Iterator<Item = (TcpStream, SocketAddress)> + '_ {
self.queue.drain(..)
}
}
Expand Down Expand Up @@ -292,6 +291,7 @@ where
pub fn push<Pub>(
&mut self,
stream: TcpStream,
remote_address: SocketAddress,
now: Timestamp,
subscriber: &Sub,
publisher: &Pub,
Expand All @@ -304,20 +304,14 @@ where
//
// TODO: we need to investigate how this interacts with SYN cookies/retries and fast
// failure modes in kernel space.
if let Ok(remote_addr) = stream.peer_addr() {
let remote_address: SocketAddress = remote_addr.into();
let remote_address = &remote_address;
publisher.on_acceptor_tcp_stream_dropped(
event::builder::AcceptorTcpStreamDropped {
remote_address,
reason: event::builder::AcceptorTcpStreamDropReason::SlotsAtCapacity,
},
);
}
publisher.on_acceptor_tcp_stream_dropped(event::builder::AcceptorTcpStreamDropped {
remote_address: &remote_address,
reason: event::builder::AcceptorTcpStreamDropReason::SlotsAtCapacity,
});
drop(stream);
return;
};
self.workers[idx].push(stream, now, subscriber, publisher);
self.workers[idx].push(stream, remote_address, now, subscriber, publisher);
self.working.push_back(idx);
}

Expand Down Expand Up @@ -473,7 +467,7 @@ where
Sub: event::Subscriber + Clone,
{
queue_time: Timestamp,
stream: Option<TcpStream>,
stream: Option<(TcpStream, SocketAddress)>,
subscriber_ctx: Option<Sub::ConnectionContext>,
state: WorkerState,
}
Expand All @@ -495,6 +489,7 @@ where
pub fn push<Pub>(
&mut self,
stream: TcpStream,
remote_address: SocketAddress,
now: Timestamp,
subscriber: &Sub,
publisher: &Pub,
Expand All @@ -514,20 +509,18 @@ where

let prev_queue_time = core::mem::replace(&mut self.queue_time, now);
let prev_state = core::mem::replace(&mut self.state, WorkerState::Init);
let prev_stream = core::mem::replace(&mut self.stream, Some(stream));
let prev_stream = core::mem::replace(&mut self.stream, Some((stream, remote_address)));
let prev_ctx = core::mem::replace(&mut self.subscriber_ctx, Some(subscriber_ctx));

if let Some(remote_addr) = prev_stream.and_then(|socket| socket.peer_addr().ok()) {
let remote_address: SocketAddress = remote_addr.into();
let remote_address = &remote_address;
if let Some(remote_address) = prev_stream.map(|(_socket, remote_address)| remote_address) {
let sojourn_time = now.saturating_duration_since(prev_queue_time);
let buffer_len = match prev_state {
WorkerState::Init => 0,
WorkerState::Buffering { buffer, .. } => buffer.payload_len(),
WorkerState::Erroring { .. } => 0,
};
publisher.on_acceptor_tcp_stream_replaced(event::builder::AcceptorTcpStreamReplaced {
remote_address,
remote_address: &remote_address,
sojourn_time,
buffer_len,
});
Expand Down Expand Up @@ -615,7 +608,7 @@ impl WorkerState {
&mut self,
cx: &mut Context,
context: &mut WorkerContext<Sub>,
stream: &mut Option<TcpStream>,
stream: &mut Option<(TcpStream, SocketAddress)>,
subscriber_ctx: &mut Option<Sub::ConnectionContext>,
queue_time: Timestamp,
now: Timestamp,
Expand All @@ -639,8 +632,8 @@ impl WorkerState {
} => (buffer, *blocked_count),
// we encountered an error so try and send it back
WorkerState::Erroring { offset, buffer, .. } => {
let stream = Pin::new(stream.as_mut().unwrap());
let len = ready!(stream.poll_write(cx, &buffer[*offset..]))?;
let (stream, _remote_address) = stream.as_mut().unwrap();
let len = ready!(Pin::new(stream).poll_write(cx, &buffer[*offset..]))?;

*offset += len;

Expand All @@ -660,13 +653,17 @@ impl WorkerState {
};

// try to read an initial packet from the socket
let res = Self::poll_initial_packet(
cx,
stream.as_mut().unwrap(),
recv_buffer,
sojourn_time,
publisher,
);
let res = {
let (stream, remote_address) = stream.as_mut().unwrap();
Self::poll_initial_packet(
cx,
stream,
remote_address,
recv_buffer,
sojourn_time,
publisher,
)
};

let Poll::Ready(res) = res else {
// if we got `Pending` but we don't own the recv buffer then we need to copy it
Expand All @@ -689,11 +686,12 @@ impl WorkerState {
let initial_packet = res?;

let subscriber_ctx = subscriber_ctx.take().unwrap();
let (socket, remote_address) = stream.take().unwrap();

let stream_builder = match endpoint::accept_stream(
now,
&context.env,
env::TcpReregistered(stream.take().unwrap()),
env::TcpReregistered(socket, remote_address),
&initial_packet,
None,
Some(recv_buffer),
Expand All @@ -704,11 +702,11 @@ impl WorkerState {
) {
Ok(stream) => stream,
Err(error) => {
if let Some(env::TcpReregistered(socket)) = error.peer {
if let Some(env::TcpReregistered(socket, remote_address)) = error.peer {
if !error.secret_control.is_empty() {
// if we need to send an error then update the state and loop back
// around
*stream = Some(socket);
*stream = Some((socket, remote_address));
*self = WorkerState::Erroring {
offset: 0,
buffer: error.secret_control,
Expand Down Expand Up @@ -768,6 +766,7 @@ impl WorkerState {
fn poll_initial_packet<Pub>(
cx: &mut Context,
stream: &mut TcpStream,
remote_address: &SocketAddress,
recv_buffer: &mut msg::recv::Message,
sojourn_time: Duration,
publisher: &Pub,
Expand All @@ -777,15 +776,9 @@ impl WorkerState {
{
loop {
if recv_buffer.payload_len() > 10_000 {
let remote_address = stream
.peer_addr()
.ok()
.map(SocketAddress::from)
.unwrap_or_default();

publisher.on_acceptor_tcp_packet_dropped(
event::builder::AcceptorTcpPacketDropped {
remote_address: &remote_address,
remote_address,
reason: DecoderError::UnexpectedBytes(recv_buffer.payload_len())
.into_event(),
sojourn_time,
Expand All @@ -798,15 +791,9 @@ impl WorkerState {

match server::InitialPacket::peek(recv_buffer, 16) {
Ok(packet) => {
let remote_address = stream
.peer_addr()
.ok()
.map(SocketAddress::from)
.unwrap_or_default();

publisher.on_acceptor_tcp_packet_received(
event::builder::AcceptorTcpPacketReceived {
remote_address: &remote_address,
remote_address,
credential_id: &*packet.credentials.id,
stream_id: packet.stream_id.into_varint().as_u64(),
payload_len: packet.payload_len,
Expand All @@ -823,15 +810,9 @@ impl WorkerState {
continue;
}

let remote_address = stream
.peer_addr()
.ok()
.map(SocketAddress::from)
.unwrap_or_default();

publisher.on_acceptor_tcp_packet_dropped(
event::builder::AcceptorTcpPacketDropped {
remote_address: &remote_address,
remote_address,
reason: err.into_event(),
sojourn_time,
},
Expand Down

0 comments on commit 513212c

Please sign in to comment.