diff --git a/.cargo/config.toml b/.cargo/config.toml index 4a087b02..72b2f4a9 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,6 +2,7 @@ protocol = "sparse" [build] +# target = ["x86_64-unknown-linux-musl"] # target = ["x86_64-unknown-linux-gnu"] # target = ["aarch64-linux-android"] # target = ["aarch64-apple-ios"] diff --git a/src/async/mod.rs b/src/async/mod.rs index e3b542a7..6e13ec2b 100644 --- a/src/async/mod.rs +++ b/src/async/mod.rs @@ -25,12 +25,12 @@ pub use codec::TunPacketCodec; #[cfg(unix)] mod unix_device; #[cfg(unix)] -pub use unix_device::AsyncDevice; +pub use unix_device::{AsyncDevice, DeviceReader, DeviceWriter}; #[cfg(target_os = "windows")] mod win_device; #[cfg(target_os = "windows")] -pub use win_device::AsyncDevice; +pub use win_device::{AsyncDevice, DeviceReader, DeviceWriter}; /// Create a TUN device with the given name. pub fn create_as_async(configuration: &Configuration) -> Result { diff --git a/src/async/unix_device.rs b/src/async/unix_device.rs index a308acea..30787be0 100644 --- a/src/async/unix_device.rs +++ b/src/async/unix_device.rs @@ -23,6 +23,7 @@ use tokio_util::codec::Framed; use super::TunPacketCodec; use crate::device::AbstractDevice; +use crate::platform::posix::{Reader, Writer}; use crate::platform::Device; /// An async TUN device wrapper around a TUN device. @@ -62,6 +63,11 @@ impl AsyncDevice { // associate mtu with the capacity of ReadBuf Framed::with_capacity(self, codec, mtu as usize) } + pub fn split(self) -> std::io::Result<(DeviceWriter, DeviceReader)> { + let device = self.inner.into_inner(); + let (reader, writer) = device.split(); + Ok((DeviceWriter::new(writer)?, DeviceReader::new(reader)?)) + } /// Recv a packet from tun device pub async fn recv(&self, buf: &mut [u8]) -> std::io::Result { @@ -146,3 +152,88 @@ impl AsyncWrite for AsyncDevice { true } } +pub struct DeviceReader { + inner: AsyncFd, +} +impl DeviceReader { + fn new(reader: Reader) -> std::io::Result { + Ok(Self { + inner: AsyncFd::new(reader)?, + }) + } +} +impl AsyncRead for DeviceReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf, + ) -> Poll> { + loop { + let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?; + let rbuf = buf.initialize_unfilled(); + match guard.try_io(|inner| inner.get_mut().read(rbuf)) { + Ok(res) => return Poll::Ready(res.map(|n| buf.advance(n))), + Err(_wb) => continue, + } + } + } +} + +pub struct DeviceWriter { + inner: AsyncFd, +} +impl DeviceWriter { + fn new(writer: Writer) -> std::io::Result { + Ok(Self { + inner: AsyncFd::new(writer)?, + }) + } +} + +impl AsyncWrite for DeviceWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + loop { + let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?; + match guard.try_io(|inner| inner.get_mut().write(buf)) { + Ok(res) => return Poll::Ready(res), + Err(_wb) => continue, + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?; + match guard.try_io(|inner| inner.get_mut().flush()) { + Ok(res) => return Poll::Ready(res), + Err(_wb) => continue, + } + } + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + loop { + let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?; + match guard.try_io(|inner| inner.get_mut().write_vectored(bufs)) { + Ok(res) => return Poll::Ready(res), + Err(_wb) => continue, + } + } + } + + fn is_write_vectored(&self) -> bool { + true + } +} diff --git a/src/async/win_device.rs b/src/async/win_device.rs index 44df4822..5cade334 100644 --- a/src/async/win_device.rs +++ b/src/async/win_device.rs @@ -17,17 +17,19 @@ use core::task::{Context, Poll}; use std::io; use std::io::Error; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_util::codec::Framed; - use super::TunPacketCodec; use crate::device::AbstractDevice; use crate::platform::Device; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::sync::mpsc::error::TrySendError; +use tokio_util::codec::Framed; +use wintun::Packet; /// An async TUN device wrapper around a TUN device. pub struct AsyncDevice { inner: Device, - session: WinSession, + session_reader: DeviceReader, + session_writer: DeviceWriter, } /// Returns a shared reference to the underlying Device object. @@ -49,10 +51,12 @@ impl core::ops::DerefMut for AsyncDevice { impl AsyncDevice { /// Create a new `AsyncDevice` wrapping around a `Device`. pub fn new(device: Device) -> io::Result { - let session = WinSession::new(device.tun.get_session())?; + let session_reader = DeviceReader::new(device.tun.get_session())?; + let session_writer = DeviceWriter::new(device.tun.get_session())?; Ok(AsyncDevice { inner: device, - session, + session_reader, + session_writer, }) } @@ -63,6 +67,9 @@ impl AsyncDevice { // guarantee to avoid the mtu of wintun may far away larger than the default provided capacity of ReadBuf of Framed Framed::with_capacity(self, codec, mtu as usize) } + pub fn split(self) -> io::Result<(DeviceWriter, DeviceReader)> { + Ok((self.session_writer, self.session_reader)) + } /// Recv a packet from tun device - Not implemented for windows pub async fn recv(&self, _buf: &mut [u8]) -> std::io::Result { @@ -81,7 +88,7 @@ impl AsyncRead for AsyncDevice { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - Pin::new(&mut self.session).poll_read(cx, buf) + Pin::new(&mut self.session_reader).poll_read(cx, buf) } } @@ -91,34 +98,38 @@ impl AsyncWrite for AsyncDevice { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.session).poll_write(cx, buf) + Pin::new(&mut self.session_writer).poll_write(cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.session).poll_flush(cx) + Pin::new(&mut self.session_writer).poll_flush(cx) } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.session).poll_shutdown(cx) + Pin::new(&mut self.session_writer).poll_shutdown(cx) } } - -struct WinSession { - session: std::sync::Arc, - receiver: tokio::sync::mpsc::UnboundedReceiver>, +pub struct DeviceReader { + receiver: tokio::sync::mpsc::Receiver, _task: std::thread::JoinHandle<()>, } - -impl WinSession { - fn new(session: std::sync::Arc) -> Result { - let session_reader = session.clone(); - let (receiver_tx, receiver_rx) = tokio::sync::mpsc::unbounded_channel::>(); +impl DeviceReader { + fn new(session: std::sync::Arc) -> Result { + let (receiver_tx, receiver_rx) = tokio::sync::mpsc::channel(1024); let task = std::thread::spawn(move || loop { - match session_reader.receive_blocking() { + match session.receive_blocking() { Ok(packet) => { - if let Err(err) = receiver_tx.send(packet.bytes().to_vec()) { - log::error!("{}", err); - break; + if let Err(err) = receiver_tx.try_send(packet) { + match err { + TrySendError::Full(_) => { + log::error!("receiver_tx Full"); + continue; + } + TrySendError::Closed(_) => { + log::error!("receiver_tx Closed"); + break; + } + } } } Err(err) => { @@ -127,16 +138,22 @@ impl WinSession { } } }); - - Ok(WinSession { - session, + Ok(DeviceReader { receiver: receiver_rx, _task: task, }) } } +pub struct DeviceWriter { + session: std::sync::Arc, +} +impl DeviceWriter { + fn new(session: std::sync::Arc) -> Result { + Ok(Self { session }) + } +} -impl AsyncRead for WinSession { +impl AsyncRead for DeviceReader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -144,7 +161,7 @@ impl AsyncRead for WinSession { ) -> Poll> { match std::task::ready!(self.receiver.poll_recv(cx)) { Some(bytes) => { - buf.put_slice(&bytes); + buf.put_slice(bytes.bytes()); std::task::Poll::Ready(Ok(())) } None => std::task::Poll::Ready(Ok(())), @@ -152,7 +169,7 @@ impl AsyncRead for WinSession { } } -impl AsyncWrite for WinSession { +impl AsyncWrite for DeviceWriter { fn poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, diff --git a/src/platform/windows/device.rs b/src/platform/windows/device.rs index fdb89995..af920dc4 100644 --- a/src/platform/windows/device.rs +++ b/src/platform/windows/device.rs @@ -287,13 +287,14 @@ impl Write for Tun { } } -impl Drop for Tun { - fn drop(&mut self) { - if let Err(err) = self.session.shutdown() { - log::error!("failed to shutdown session: {:?}", err); - } - } -} +// impl Drop for Tun { +// fn drop(&mut self) { +// // The session has implemented drop +// if let Err(err) = self.session.shutdown() { +// log::error!("failed to shutdown session: {:?}", err); +// } +// } +// } #[repr(transparent)] pub struct Reader(Arc);