Skip to content

Commit

Permalink
Start switching to recvmsg
Browse files Browse the repository at this point in the history
cf. #24
  • Loading branch information
fasterthanlime committed Jun 14, 2023
1 parent c6d79c1 commit 78abe04
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pin-project-lite = "0.2.9"
tokio = { version = "1.28.2", features = ["net", "macros", "io-util"] }
futures = "0.3.28"
ktls-sys = "1.0.0"
nix = { version = "0.26.2", features = ["socket", "uio", "net"], default-features = false }

[dev-dependencies]
const-random = "0.1.15"
Expand Down
48 changes: 44 additions & 4 deletions src/ktls_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
use std::{io, os::unix::prelude::AsRawFd, pin::Pin, task};

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::{
io::{self, IoSliceMut},
mem::ManuallyDrop,
os::{
fd::{FromRawFd, RawFd},
unix::prelude::AsRawFd,
},
pin::Pin,
task,
};

use nix::{
cmsg_space,
sys::socket::{ControlMessageOwned, MsgFlags, SockaddrIn},
};
use tokio::io::{AsyncRead, AsyncWrite, Interest, ReadBuf};

// A wrapper around `IO` that sends a `close_notify` when shut down or dropped.
pin_project_lite::pin_project! {
Expand Down Expand Up @@ -83,7 +96,34 @@ where
}

tracing::trace!("KtlsStream::poll_read, forwarding to inner IO");
this.inner.poll_read(cx, buf)
{
let fd = this.inner.as_raw_fd();
let std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
let tcp_stream = tokio::net::TcpStream::from_std(std_tcp_stream)
.map(ManuallyDrop::new)
.unwrap();

let res = futures::ready!(tcp_stream.poll_read_ready(cx));
if let Err(e) = res {
tracing::trace!(?e, "KtlsStream::poll_read, poll_read_ready");
return Err(e).into();
}

let mut cmsgspace = cmsg_space!(nix::sys::time::TimeVal);
let mut iov = [IoSliceMut::new(buf.initialize_unfilled())];
let flags = MsgFlags::empty();

let r =
nix::sys::socket::recvmsg::<SockaddrIn>(fd, &mut iov, Some(&mut cmsgspace), flags);
tracing::trace!("recvmsg result = {:#?}", r);
}

let res = this.inner.poll_read(cx, buf);

if let task::Poll::Ready(res) = &res {
tracing::trace!(?res, "KtlsStream::poll_read, inner IO result");
}
res
}
}

Expand Down
1 change: 0 additions & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ async fn client_test(
.unwrap();

server_config.key_log = Arc::new(rustls::KeyLogFile::new());
server_config.send_tls13_tickets = 0;

let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
let ln = TcpListener::bind("[::]:0").await.unwrap();
Expand Down

0 comments on commit 78abe04

Please sign in to comment.