diff --git a/audit/Cargo.toml b/audit/Cargo.toml index 5ec94658..6551f99c 100644 --- a/audit/Cargo.toml +++ b/audit/Cargo.toml @@ -12,11 +12,11 @@ repository = "https://github.com/little-dude/netlink" description = "linux audit via netlink" [dependencies] -futures = "0.3.1" +futures = "0.3.11" thiserror = "1" netlink-packet-audit = { path = "../netlink-packet-audit", version = "0.2" } netlink-proto = { path = "../netlink-proto", features = ["workaround-audit-bug"], version = "0.5" } [dev-dependencies] -tokio = { version = "0.2.6", default-features = false, features = ["macros", "rt-core"] } +tokio = { version = "1.0.1", default-features = false, features = ["macros", "rt-multi-thread"] } env_logger = "0.7.1" diff --git a/netlink-proto/Cargo.toml b/netlink-proto/Cargo.toml index d6c7ee40..a858cd8a 100644 --- a/netlink-proto/Cargo.toml +++ b/netlink-proto/Cargo.toml @@ -14,8 +14,8 @@ description = "async netlink protocol" [dependencies] bytes = "0.5.3" log = "0.4.8" -futures = "0.3.1" -tokio = { version = "0.2.6", default-features = false, features = ["io-util"] } +futures = "0.3.11" +tokio = { version = "1.0.1", default-features = false, features = ["io-util"] } tokio-util = { version = "0.2.0", default-features = false, features = ["codec"] } netlink-packet-core = { path = "../netlink-packet-core", version = "0.2" } netlink-sys = { path = "../netlink-sys", default-features = false, features = ["tokio_socket"], version = "0.5" } @@ -26,7 +26,7 @@ workaround-audit-bug = [] [dev-dependencies] env_logger = "0.7.1" -tokio = { version = "0.2.6", default-features = false, features = ["macros", "rt-core"] } +tokio = { version = "1.0.1", default-features = false, features = ["macros", "rt-multi-thread"] } netlink-packet-route = { path = "../netlink-packet-route" } netlink-packet-audit = { path = "../netlink-packet-audit" } diff --git a/netlink-proto/src/lib.rs b/netlink-proto/src/lib.rs index a33db8ca..bafa9594 100644 --- a/netlink-proto/src/lib.rs +++ b/netlink-proto/src/lib.rs @@ -8,7 +8,7 @@ //! dependencies: //! //! - `futures = "^0.3"` -//! - `tokio = "^0.2"` +//! - `tokio = "^1.0"` //! - `netlink-packet-audit = "^0.1"` //! //! ```rust,no_run diff --git a/netlink-sys/Cargo.toml b/netlink-sys/Cargo.toml index eedfbfba..9729b3f6 100644 --- a/netlink-sys/Cargo.toml +++ b/netlink-sys/Cargo.toml @@ -9,7 +9,7 @@ keywords = ["netlink", "ip", "linux"] license = "MIT" readme = "../README.md" repository = "https://github.com/little-dude/netlink" -description = "netlink sockets, with optional integration with mio and tokio" +description = "netlink sockets, with optional integration with tokio" [dependencies] libc = "0.2.66" @@ -19,16 +19,12 @@ log = "0.4.8" optional = true version = "0.3.1" -[dependencies.mio] -optional = true -version = "0.6.21" - [dependencies.tokio] optional = true -version = "0.2.6" +version = "1.0.1" default-features = false # We only depend on tokio for PollEvented -features = ["io-driver"] +features = ["net"] [dependencies.async-io] optional = true @@ -36,7 +32,7 @@ version = "1.1" [features] default = [] -tokio_socket = ["mio", "tokio", "futures"] +tokio_socket = ["tokio", "futures"] smol_socket = ["async-io"] @@ -44,10 +40,10 @@ smol_socket = ["async-io"] netlink-packet-audit = { path = "../netlink-packet-audit" } [dev-dependencies.tokio] -version = "0.2.6" +version = "1.0.1" default-features = false # We only depend on tokio for PollEvented -features = ["io-driver", "macros"] +features = ["net", "macros", "rt-multi-thread"] [[example]] name = "audit_events" diff --git a/netlink-sys/src/tokio.rs b/netlink-sys/src/tokio.rs index 9419bbcf..a83fe93d 100644 --- a/netlink-sys/src/tokio.rs +++ b/netlink-sys/src/tokio.rs @@ -6,39 +6,12 @@ use std::{ use futures::{future::poll_fn, ready}; use log::trace; -use mio::{event::Evented, unix::EventedFd}; -use tokio::io::PollEvented; +use tokio::io::unix::AsyncFd; use crate::{Socket, SocketAddr}; -impl Evented for Socket { - fn register( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, - ) -> io::Result<()> { - EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts) - } - - fn reregister( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, - ) -> io::Result<()> { - EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts) - } - - fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { - EventedFd(&self.as_raw_fd()).deregister(poll) - } -} - /// An I/O object representing a Netlink socket. -pub struct TokioSocket(PollEvented); +pub struct TokioSocket(AsyncFd); impl TokioSocket { /// This function will create a new Netlink socket and attempt to bind it to @@ -54,7 +27,7 @@ impl TokioSocket { pub fn new(protocol: isize) -> io::Result { let socket = Socket::new(protocol)?; socket.set_non_blocking(true)?; - Ok(TokioSocket(PollEvented::new(socket)?)) + Ok(TokioSocket(AsyncFd::new(socket)?)) } pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> { @@ -62,20 +35,17 @@ impl TokioSocket { } pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| { + poll_fn(|cx| loop { // Check if the socket it writable. If - // PollEvented::poll_write_ready returns NotReady, it will + // AsyncFd::poll_write_ready returns NotReady, it will // already have arranged for the current task to be // notified when the socket becomes writable, so we can // just return Pending - ready!(self.0.poll_write_ready(cx))?; + let mut guard = ready!(self.0.poll_write_ready(cx))?; - match self.0.get_ref().send(buf, 0) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.0.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), + match guard.try_io(|inner| inner.get_ref().send(buf, 0)) { + Ok(x) => return Poll::Ready(x), + Err(_would_block) => continue, } }) .await @@ -91,33 +61,27 @@ impl TokioSocket { buf: &[u8], addr: &SocketAddr, ) -> Poll> { - ready!(self.0.poll_write_ready(cx))?; - match self.0.get_ref().send_to(buf, addr, 0) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.0.clear_write_ready(cx)?; - Poll::Pending + loop { + let mut guard = ready!(self.0.poll_write_ready(cx))?; + + match guard.try_io(|inner| inner.get_ref().send_to(buf, addr, 0)) { + Ok(x) => return Poll::Ready(x), + Err(_would_block) => continue, } - x => Poll::Ready(x), } } pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| { + poll_fn(|cx| loop { // Check if the socket is readable. If not, - // PollEvented::poll_read_ready would have arranged for the + // AsyncFd::poll_read_ready would have arranged for the // current task to be polled again when the socket becomes // readable, so we can just return Pending - ready!(self.0.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.0.get_ref().recv(buf, 0) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - // If the socket is not readable, make sure the - // current task get notified when the socket becomes - // readable again. - self.0.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), + let mut guard = ready!(self.0.poll_read_ready(cx))?; + + match guard.try_io(|inner| inner.get_ref().recv(buf, 0)) { + Ok(x) => return Poll::Ready(x), + Err(_would_block) => continue, } }) .await @@ -136,19 +100,20 @@ impl TokioSocket { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - trace!("poll_recv_from called"); - ready!(self.0.poll_read_ready(cx, mio::Ready::readable()))?; - - trace!("poll_recv_from socket is ready for reading"); - match self.0.get_ref().recv_from(buf, 0) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - trace!("poll_recv_from socket would block"); - self.0.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => { - trace!("poll_recv_from {:?} bytes read", x); - Poll::Ready(x) + loop { + trace!("poll_recv_from called"); + let mut guard = ready!(self.0.poll_read_ready(cx))?; + trace!("poll_recv_from socket is ready for reading"); + + match guard.try_io(|inner| inner.get_ref().recv_from(buf, 0)) { + Ok(x) => { + trace!("poll_recv_from {:?} bytes read", x); + return Poll::Ready(x); + } + Err(_would_block) => { + trace!("poll_recv_from socket would block"); + continue; + } } } } @@ -157,19 +122,20 @@ impl TokioSocket { &mut self, cx: &mut Context, ) -> Poll, SocketAddr)>> { - trace!("poll_recv_from_full called"); - ready!(self.0.poll_read_ready(cx, mio::Ready::readable()))?; - - trace!("poll_recv_from_full socket is ready for reading"); - match self.0.get_ref().recv_from_full() { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - trace!("poll_recv_from_full socket would block"); - self.0.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => { - trace!("poll_recv_from_full {:?} bytes read", x); - Poll::Ready(x) + loop { + trace!("poll_recv_from_full called"); + let mut guard = ready!(self.0.poll_read_ready(cx))?; + trace!("poll_recv_from_full socket is ready for reading"); + + match guard.try_io(|inner| inner.get_ref().recv_from_full()) { + Ok(x) => { + trace!("poll_recv_from_full {:?} bytes read", x); + return Poll::Ready(x); + } + Err(_would_block) => { + trace!("poll_recv_from_full socket would block"); + continue; + } } } } @@ -243,7 +209,7 @@ impl FromRawFd for TokioSocket { unsafe fn from_raw_fd(fd: RawFd) -> Self { let socket = Socket::from_raw_fd(fd); socket.set_non_blocking(true).unwrap(); - TokioSocket(PollEvented::new(socket).unwrap()) + TokioSocket(AsyncFd::new(socket).unwrap()) } } diff --git a/rtnetlink/Cargo.toml b/rtnetlink/Cargo.toml index 48466b51..f1c3f774 100644 --- a/rtnetlink/Cargo.toml +++ b/rtnetlink/Cargo.toml @@ -16,16 +16,16 @@ test_as_root = [] [dependencies] -futures = "0.3.1" +futures = "0.3.11" log = "0.4.8" thiserror = "1" netlink-packet-route = { path = "../netlink-packet-route", version = "0.6" } netlink-proto = { path = "../netlink-proto", version = "0.5" } byteordered = "0.5.0" nix = "0.19.0" -tokio = { version = "0.2.6", features = ["rt-core", "blocking"] } +tokio = { version = "1.0.1", features = ["rt"] } [dev-dependencies] env_logger = "0.7.1" ipnetwork = "0.15.1" -tokio = { version = "0.2.6", features = ["macros", "rt-core"] } +tokio = { version = "1.0.1", features = ["macros", "rt", "rt-multi-thread"] }