-
Notifications
You must be signed in to change notification settings - Fork 121
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(s2n-quic-platform): wire up tokio sockets to ring (#1790)
- Loading branch information
Showing
12 changed files
with
400 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// depending on the platform, some of these implementations aren't used | ||
#![allow(dead_code)] | ||
|
||
mod simple; | ||
#[cfg(unix)] | ||
mod unix; | ||
|
||
cfg_if::cfg_if! { | ||
if #[cfg(s2n_quic_platform_socket_mmsg)] { | ||
pub use mmsg::{rx, tx}; | ||
} else if #[cfg(s2n_quic_platform_socket_msg)] { | ||
pub use msg::{rx, tx}; | ||
} else { | ||
pub use simple::{rx, tx}; | ||
} | ||
} | ||
|
||
macro_rules! libc_msg { | ||
($message:ident, $cfg:ident) => { | ||
#[cfg($cfg)] | ||
mod $message { | ||
use super::unix; | ||
use crate::{features::Gso, message::$message::Message, socket::ring}; | ||
|
||
pub async fn rx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
producer: ring::Producer<Message>, | ||
) -> std::io::Result<()> { | ||
unix::rx(socket, producer).await | ||
} | ||
|
||
pub async fn tx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
consumer: ring::Consumer<Message>, | ||
gso: Gso, | ||
) -> std::io::Result<()> { | ||
unix::tx(socket, consumer, gso).await | ||
} | ||
} | ||
}; | ||
} | ||
|
||
libc_msg!(msg, s2n_quic_platform_socket_msg); | ||
libc_msg!(mmsg, s2n_quic_platform_socket_mmsg); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::{ | ||
features::Gso, | ||
message::{simple::Message, Message as _}, | ||
socket::{ | ||
ring, task, | ||
task::{rx, tx}, | ||
}, | ||
syscall::SocketEvents, | ||
}; | ||
use core::task::{Context, Poll}; | ||
use tokio::{io, net::UdpSocket}; | ||
|
||
pub async fn rx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
producer: ring::Producer<Message>, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = UdpSocket::from_std(socket).unwrap(); | ||
let result = task::Receiver::new(producer, socket).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
pub async fn tx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
consumer: ring::Consumer<Message>, | ||
gso: Gso, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = UdpSocket::from_std(socket).unwrap(); | ||
let result = task::Sender::new(consumer, socket, gso).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl tx::Socket<Message> for UdpSocket { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn send( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [Message], | ||
events: &mut tx::Events, | ||
) -> io::Result<()> { | ||
for entry in entries { | ||
let target = (*entry.remote_address()).into(); | ||
let payload = entry.payload_mut(); | ||
match self.poll_send_to(cx, payload, target) { | ||
Poll::Ready(Ok(_)) => { | ||
if events.on_complete(1).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Ready(Err(err)) => { | ||
if events.on_error(err).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Pending => { | ||
events.blocked(); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl rx::Socket<Message> for UdpSocket { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn recv( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [Message], | ||
events: &mut rx::Events, | ||
) -> io::Result<()> { | ||
for entry in entries { | ||
let payload = entry.payload_mut(); | ||
let mut buf = io::ReadBuf::new(payload); | ||
match self.poll_recv_from(cx, &mut buf) { | ||
Poll::Ready(Ok(addr)) => { | ||
unsafe { | ||
let len = buf.filled().len(); | ||
entry.set_payload_len(len); | ||
} | ||
entry.set_remote_address(&(addr.into())); | ||
|
||
if events.on_complete(1).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Ready(Err(err)) => { | ||
if events.on_error(err).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Pending => { | ||
events.blocked(); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::{ | ||
features::Gso, | ||
socket::{ | ||
ring, | ||
task::{rx, tx}, | ||
}, | ||
syscall::{SocketType, UnixMessage}, | ||
}; | ||
use core::task::{Context, Poll}; | ||
use std::{io, os::unix::io::AsRawFd}; | ||
use tokio::io::unix::AsyncFd; | ||
|
||
pub async fn rx<S: Into<std::net::UdpSocket>, M: UnixMessage + Unpin>( | ||
socket: S, | ||
producer: ring::Producer<M>, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = AsyncFd::new(socket).unwrap(); | ||
let result = rx::Receiver::new(producer, socket).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
pub async fn tx<S: Into<std::net::UdpSocket>, M: UnixMessage + Unpin>( | ||
socket: S, | ||
consumer: ring::Consumer<M>, | ||
gso: Gso, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = AsyncFd::new(socket).unwrap(); | ||
let result = tx::Sender::new(consumer, socket, gso).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl<S: AsRawFd, M: UnixMessage> tx::Socket<M> for AsyncFd<S> { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn send( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [M], | ||
events: &mut tx::Events, | ||
) -> io::Result<()> { | ||
// Call the syscall for the socket | ||
// | ||
// NOTE: we usually wrap this in a `AsyncFdReadyGuard::try_io`. However, here we just | ||
// assume the socket is ready in the general case and then fall back to querying | ||
// socket readiness if it's not. This can avoid some things like having to construct | ||
// a `std::io::Error` with `WouldBlock` and dereferencing the registration. | ||
M::send(self.get_ref().as_raw_fd(), entries, events); | ||
|
||
// yield back if we weren't blocked | ||
if !events.is_blocked() { | ||
return Ok(()); | ||
} | ||
|
||
// * First iteration we need to clear socket readiness since the `send` call returned a | ||
// `WouldBlock`. | ||
// * Second iteration we need to register the waker, assuming the socket readiness was | ||
// cleared. | ||
// * If we got a `Ready` anyway, then clear the blocked status and have the caller try | ||
// again. | ||
for i in 0..2 { | ||
match self.poll_write_ready(cx) { | ||
Poll::Ready(guard) => { | ||
let mut guard = guard?; | ||
if i == 0 { | ||
guard.clear_ready(); | ||
} else { | ||
events.take_blocked(); | ||
} | ||
} | ||
Poll::Pending => { | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl<S: AsRawFd, M: UnixMessage> rx::Socket<M> for AsyncFd<S> { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn recv( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [M], | ||
events: &mut rx::Events, | ||
) -> io::Result<()> { | ||
// Call the syscall for the socket | ||
// | ||
// NOTE: we usually wrap this in a `AsyncFdReadyGuard::try_io`. However, here we just | ||
// assume the socket is ready in the general case and then fall back to querying | ||
// socket readiness if it's not. This can avoid some things like having to construct | ||
// a `std::io::Error` with `WouldBlock` and dereferencing the registration. | ||
M::recv( | ||
self.get_ref().as_raw_fd(), | ||
SocketType::NonBlocking, | ||
entries, | ||
events, | ||
); | ||
|
||
// yield back if we weren't blocked | ||
if !events.is_blocked() { | ||
return Ok(()); | ||
} | ||
|
||
// * First iteration we need to clear socket readiness since the `recv` call returned a | ||
// `WouldBlock`. | ||
// * Second iteration we need to register the waker, assuming the socket readiness was | ||
// cleared. | ||
// * If we got a `Ready` anyway, then clear the blocked status and have the caller try | ||
// again. | ||
for i in 0..2 { | ||
match self.poll_read_ready(cx) { | ||
Poll::Ready(guard) => { | ||
let mut guard = guard?; | ||
if i == 0 { | ||
guard.clear_ready(); | ||
} else { | ||
events.take_blocked(); | ||
} | ||
} | ||
Poll::Pending => { | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.