Skip to content

Commit

Permalink
Merge pull request #282 from AsakuraMizu/quic
Browse files Browse the repository at this point in the history
  • Loading branch information
George-Miao authored Sep 2, 2024
2 parents f616fdc + ee92ffd commit 111c8d0
Show file tree
Hide file tree
Showing 29 changed files with 5,333 additions and 82 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"compio-tls",
"compio-log",
"compio-process",
"compio-quic",
]
resolver = "2"

Expand All @@ -36,7 +37,9 @@ compio-dispatcher = { path = "./compio-dispatcher", version = "0.3.0" }
compio-log = { path = "./compio-log", version = "0.1.0" }
compio-tls = { path = "./compio-tls", version = "0.2.0", default-features = false }
compio-process = { path = "./compio-process", version = "0.1.0" }
compio-quic = { path = "./compio-quic", version = "0.1.0" }

bytes = "1.7.1"
flume = "0.11.0"
cfg-if = "1.0.0"
criterion = "0.5.1"
Expand All @@ -49,10 +52,13 @@ nix = "0.29.0"
once_cell = "1.18.0"
os_pipe = "1.1.4"
paste = "1.0.14"
rand = "0.8.5"
rustls = { version = "0.23.1", default-features = false }
slab = "0.4.9"
socket2 = "0.5.6"
tempfile = "3.8.1"
tokio = "1.33.0"
tracing-subscriber = "0.3.18"
widestring = "1.0.2"
windows-sys = "0.52.0"

Expand Down
2 changes: 1 addition & 1 deletion compio-buf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
bumpalo = { version = "3.14.0", optional = true }
arrayvec = { version = "0.7.4", optional = true }
bytes = { version = "1.5.0", optional = true }
bytes = { workspace = true, optional = true }

[target.'cfg(unix)'.dependencies]
libc = { workspace = true }
Expand Down
29 changes: 12 additions & 17 deletions compio-driver/src/iocp/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,11 @@ static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();

/// Receive data and source address with ancillary data into vectored buffer.
pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
msg: WSAMSG,
addr: SOCKADDR_STORAGE,
addr_len: socklen_t,
fd: SharedFd<S>,
buffer: T,
control: C,
control_len: u32,
_p: PhantomPinned,
}

Expand All @@ -802,12 +801,11 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
"misaligned control message buffer"
);
Self {
msg: unsafe { std::mem::zeroed() },
addr: unsafe { std::mem::zeroed() },
addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
fd,
buffer,
control,
control_len: 0,
_p: PhantomPinned,
}
}
Expand All @@ -820,8 +818,8 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
(
(self.buffer, self.control),
self.addr,
self.addr_len,
self.control_len as _,
self.msg.namelen,
self.msg.Control.len as _,
)
}
}
Expand All @@ -835,26 +833,23 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
})?;

let this = self.get_unchecked_mut();

let mut slices = this.buffer.io_slices_mut();
let mut msg = WSAMSG {
name: &mut this.addr as *mut _ as _,
namelen: this.addr_len,
lpBuffers: slices.as_mut_ptr() as _,
dwBufferCount: slices.len() as _,
Control: std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut()),
dwFlags: 0,
};
this.control_len = 0;
this.msg.name = &mut this.addr as *mut _ as _;
this.msg.namelen = std::mem::size_of::<SOCKADDR_STORAGE>() as _;
this.msg.lpBuffers = slices.as_mut_ptr() as _;
this.msg.dwBufferCount = slices.len() as _;
this.msg.Control =
std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut());

let mut received = 0;
let res = recvmsg_fn(
this.fd.as_raw_fd() as _,
&mut msg,
&mut this.msg,
&mut received,
optr,
None,
);
this.control_len = msg.Control.len;
winsock_result(res, received)
}

Expand Down
2 changes: 1 addition & 1 deletion compio-log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repository = { workspace = true }
tracing = { version = "0.1", default-features = false }

[dev-dependencies]
tracing-subscriber = "0.3"
tracing-subscriber = { workspace = true }

[features]
enable_log = []
59 changes: 56 additions & 3 deletions compio-net/src/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{future::Future, io, mem::ManuallyDrop};
use std::{
future::Future,
io,
mem::{ManuallyDrop, MaybeUninit},
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
#[cfg(unix)]
Expand Down Expand Up @@ -325,7 +329,51 @@ impl Socket {
}

#[cfg(unix)]
pub fn set_socket_option<T>(&self, level: i32, name: i32, value: &T) -> io::Result<()> {
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
let mut len = size_of::<T>() as libc::socklen_t;
syscall!(libc::getsockopt(
self.socket.as_raw_fd(),
level,
name,
value.as_mut_ptr() as _,
&mut len
))
.map(|_| {
debug_assert_eq!(len as usize, size_of::<T>());
// SAFETY: The value is initialized by `getsockopt`.
value.assume_init()
})
}

#[cfg(windows)]
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
let mut len = size_of::<T>() as i32;
syscall!(
SOCKET,
windows_sys::Win32::Networking::WinSock::getsockopt(
self.socket.as_raw_fd() as _,
level,
name,
value.as_mut_ptr() as _,
&mut len
)
)
.map(|_| {
debug_assert_eq!(len as usize, size_of::<T>());
// SAFETY: The value is initialized by `getsockopt`.
value.assume_init()
})
}

#[cfg(unix)]
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
syscall!(libc::setsockopt(
self.socket.as_raw_fd(),
level,
Expand All @@ -337,7 +385,12 @@ impl Socket {
}

#[cfg(windows)]
pub fn set_socket_option<T>(&self, level: i32, name: i32, value: &T) -> io::Result<()> {
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
syscall!(
SOCKET,
windows_sys::Win32::Networking::WinSock::setsockopt(
Expand Down
20 changes: 19 additions & 1 deletion compio-net/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,26 @@ impl UdpSocket {
.await
}

/// Gets a socket option.
///
/// # Safety
///
/// The caller must ensure `T` is the correct type for `level` and `name`.
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
self.inner.get_socket_option(level, name)
}

/// Sets a socket option.
pub fn set_socket_option<T>(&self, level: i32, name: i32, value: &T) -> io::Result<()> {
///
/// # Safety
///
/// The caller must ensure `T` is the correct type for `level` and `name`.
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
self.inner.set_socket_option(level, name, value)
}
}
Expand Down
56 changes: 1 addition & 55 deletions compio-net/tests/udp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use compio_net::{CMsgBuilder, CMsgIter, UdpSocket};
use compio_net::UdpSocket;

#[compio_macros::test]
async fn connect() {
Expand Down Expand Up @@ -64,57 +64,3 @@ async fn send_to() {
active_addr
);
}

#[compio_macros::test]
async fn send_msg_with_ipv6_ecn() {
#[cfg(unix)]
use libc::{IPPROTO_IPV6, IPV6_RECVTCLASS, IPV6_TCLASS};
#[cfg(windows)]
use windows_sys::Win32::Networking::WinSock::{
IPPROTO_IPV6, IPV6_ECN, IPV6_RECVTCLASS, IPV6_TCLASS,
};

const MSG: &str = "foo bar baz";

let passive = UdpSocket::bind("[::1]:0").await.unwrap();
let passive_addr = passive.local_addr().unwrap();

passive
.set_socket_option(IPPROTO_IPV6, IPV6_RECVTCLASS, &1)
.unwrap();

let active = UdpSocket::bind("[::1]:0").await.unwrap();
let active_addr = active.local_addr().unwrap();

let mut control = vec![0u8; 32];
let mut builder = CMsgBuilder::new(&mut control);

const ECN_BITS: i32 = 0b10;

#[cfg(unix)]
builder
.try_push(IPPROTO_IPV6, IPV6_TCLASS, ECN_BITS)
.unwrap();
#[cfg(windows)]
builder.try_push(IPPROTO_IPV6, IPV6_ECN, ECN_BITS).unwrap();

let len = builder.finish();
control.truncate(len);

active.send_msg(MSG, control, passive_addr).await.unwrap();

let ((_, _, addr), (buffer, control)) = passive
.recv_msg(Vec::with_capacity(20), Vec::with_capacity(32))
.await
.unwrap();
assert_eq!(addr, active_addr);
assert_eq!(buffer, MSG.as_bytes());
unsafe {
let mut iter = CMsgIter::new(&control);
let cmsg = iter.next().unwrap();
assert_eq!(cmsg.level(), IPPROTO_IPV6);
assert_eq!(cmsg.ty(), IPV6_TCLASS);
assert_eq!(cmsg.data::<i32>(), &ECN_BITS);
assert!(iter.next().is_none());
}
}
82 changes: 82 additions & 0 deletions compio-quic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[package]
name = "compio-quic"
version = "0.1.0"
description = "QUIC for compio"
categories = ["asynchronous", "network-programming"]
keywords = ["async", "net", "quic"]
edition = { workspace = true }
authors = { workspace = true }
readme = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
# Workspace dependencies
compio-io = { workspace = true }
compio-buf = { workspace = true }
compio-log = { workspace = true }
compio-net = { workspace = true }
compio-runtime = { workspace = true, features = ["time"] }

quinn-proto = "0.11.3"
rustls = { workspace = true }
rustls-platform-verifier = { version = "0.3.3", optional = true }
rustls-native-certs = { version = "0.7.1", optional = true }
webpki-roots = { version = "0.26.3", optional = true }
h3 = { version = "0.0.6", optional = true }

# Utils
bytes = { workspace = true }
flume = { workspace = true }
futures-util = { workspace = true }
rustc-hash = "2.0.0"
thiserror = "1.0.63"

# Windows specific dependencies
[target.'cfg(windows)'.dependencies]
windows-sys = { workspace = true, features = ["Win32_Networking_WinSock"] }

[target.'cfg(unix)'.dependencies]
libc = { workspace = true }

[dev-dependencies]
compio-buf = { workspace = true, features = ["bytes"] }
compio-dispatcher = { workspace = true }
compio-driver = { workspace = true }
compio-fs = { workspace = true }
compio-macros = { workspace = true }
compio-runtime = { workspace = true, features = ["criterion"] }

criterion = { workspace = true, features = ["async_tokio"] }
http = "1.1.0"
quinn = "0.11.3"
rand = { workspace = true }
rcgen = "0.13.1"
socket2 = { workspace = true, features = ["all"] }
tokio = { workspace = true, features = ["rt", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[features]
default = []
io-compat = ["futures-util/io"]
platform-verifier = ["dep:rustls-platform-verifier"]
native-certs = ["dep:rustls-native-certs"]
webpki-roots = ["dep:webpki-roots"]
h3 = ["dep:h3"]
# FIXME: see https://github.com/quinn-rs/quinn/pull/1962

[[example]]
name = "http3-client"
required-features = ["h3"]

[[example]]
name = "http3-server"
required-features = ["h3"]

[[bench]]
name = "quic"
harness = false
Loading

0 comments on commit 111c8d0

Please sign in to comment.