Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Upgrade tokio to 1.0
Browse files Browse the repository at this point in the history
Fixes: #105

Signed-off-by: Tim Zhang <tim@hyper.sh>
  • Loading branch information
Tim-Zhang committed Jan 15, 2021
1 parent 2866b65 commit 580411b
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 112 deletions.
4 changes: 2 additions & 2 deletions audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ repository = "https://github.com/little-dude/netlink"
description = "linux audit via netlink"

[dependencies]
futures = "0.3.1"
futures = "0.3.11"
thiserror = "1"
netlink-packet-audit = { path = "../netlink-packet-audit", version = "0.2" }
netlink-proto = { path = "../netlink-proto", features = ["workaround-audit-bug"], version = "0.5" }

[dev-dependencies]
tokio = { version = "0.2.6", default-features = false, features = ["macros", "rt-core"] }
tokio = { version = "1.0.1", default-features = false, features = ["macros", "rt-multi-thread"] }
env_logger = "0.7.1"
6 changes: 3 additions & 3 deletions netlink-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ description = "async netlink protocol"
[dependencies]
bytes = "0.5.3"
log = "0.4.8"
futures = "0.3.1"
tokio = { version = "0.2.6", default-features = false, features = ["io-util"] }
futures = "0.3.11"
tokio = { version = "1.0.1", default-features = false, features = ["io-util"] }
tokio-util = { version = "0.2.0", default-features = false, features = ["codec"] }
netlink-packet-core = { path = "../netlink-packet-core", version = "0.2" }
netlink-sys = { path = "../netlink-sys", default-features = false, features = ["tokio_socket"], version = "0.5" }
Expand All @@ -26,7 +26,7 @@ workaround-audit-bug = []

[dev-dependencies]
env_logger = "0.7.1"
tokio = { version = "0.2.6", default-features = false, features = ["macros", "rt-core"] }
tokio = { version = "1.0.1", default-features = false, features = ["macros", "rt-multi-thread"] }
netlink-packet-route = { path = "../netlink-packet-route" }
netlink-packet-audit = { path = "../netlink-packet-audit" }

Expand Down
2 changes: 1 addition & 1 deletion netlink-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! dependencies:
//!
//! - `futures = "^0.3"`
//! - `tokio = "^0.2"`
//! - `tokio = "^1.0"`
//! - `netlink-packet-audit = "^0.1"`
//!
//! ```rust,no_run
Expand Down
16 changes: 6 additions & 10 deletions netlink-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ keywords = ["netlink", "ip", "linux"]
license = "MIT"
readme = "../README.md"
repository = "https://github.com/little-dude/netlink"
description = "netlink sockets, with optional integration with mio and tokio"
description = "netlink sockets, with optional integration with tokio"

[dependencies]
libc = "0.2.66"
Expand All @@ -19,35 +19,31 @@ log = "0.4.8"
optional = true
version = "0.3.1"

[dependencies.mio]
optional = true
version = "0.6.21"

[dependencies.tokio]
optional = true
version = "0.2.6"
version = "1.0.1"
default-features = false
# We only depend on tokio for PollEvented
features = ["io-driver"]
features = ["net"]

[dependencies.async-io]
optional = true
version = "1.1"

[features]
default = []
tokio_socket = ["mio", "tokio", "futures"]
tokio_socket = ["tokio", "futures"]
smol_socket = ["async-io"]


[dev-dependencies]
netlink-packet-audit = { path = "../netlink-packet-audit" }

[dev-dependencies.tokio]
version = "0.2.6"
version = "1.0.1"
default-features = false
# We only depend on tokio for PollEvented
features = ["io-driver", "macros"]
features = ["net", "macros", "rt-multi-thread"]

[[example]]
name = "audit_events"
Expand Down
143 changes: 50 additions & 93 deletions netlink-sys/src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,12 @@ use std::{

use futures::{future::poll_fn, ready};
use log::trace;
use mio::{event::Evented, unix::EventedFd};
use tokio::io::PollEvented;
use tokio::io::unix::AsyncFd;

use crate::{Socket, SocketAddr};

impl Evented for Socket {
fn register(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
}

fn reregister(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).deregister(poll)
}
}

/// An I/O object representing a Netlink socket.
pub struct TokioSocket(PollEvented<Socket>);
pub struct TokioSocket(AsyncFd<Socket>);

impl TokioSocket {
/// This function will create a new Netlink socket and attempt to bind it to
Expand All @@ -54,28 +27,20 @@ impl TokioSocket {
pub fn new(protocol: isize) -> io::Result<Self> {
let socket = Socket::new(protocol)?;
socket.set_non_blocking(true)?;
Ok(TokioSocket(PollEvented::new(socket)?))
Ok(TokioSocket(AsyncFd::new(socket)?))
}

pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
self.0.get_ref().connect(addr)
}

pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| {
// Check if the socket it writable. If
// PollEvented::poll_write_ready returns NotReady, it will
// already have arranged for the current task to be
// notified when the socket becomes writable, so we can
// just return Pending
ready!(self.0.poll_write_ready(cx))?;

match self.0.get_ref().send(buf, 0) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.0.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
poll_fn(|cx| loop {
let mut guard = ready!(self.0.poll_write_ready(cx))?;

match guard.try_io(|inner| inner.get_ref().send(buf, 0)) {
Ok(x) => return Poll::Ready(x),
Err(_would_block) => continue,
}
})
.await
Expand All @@ -91,33 +56,23 @@ impl TokioSocket {
buf: &[u8],
addr: &SocketAddr,
) -> Poll<io::Result<usize>> {
ready!(self.0.poll_write_ready(cx))?;
match self.0.get_ref().send_to(buf, addr, 0) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.0.clear_write_ready(cx)?;
Poll::Pending
loop {
let mut guard = ready!(self.0.poll_write_ready(cx))?;

match guard.try_io(|inner| inner.get_ref().send_to(buf, addr, 0)) {
Ok(x) => return Poll::Ready(x),
Err(_would_block) => continue,
}
x => Poll::Ready(x),
}
}

pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| {
// Check if the socket is readable. If not,
// PollEvented::poll_read_ready would have arranged for the
// current task to be polled again when the socket becomes
// readable, so we can just return Pending
ready!(self.0.poll_read_ready(cx, mio::Ready::readable()))?;

match self.0.get_ref().recv(buf, 0) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// If the socket is not readable, make sure the
// current task get notified when the socket becomes
// readable again.
self.0.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
poll_fn(|cx| loop {
let mut guard = ready!(self.0.poll_read_ready(cx))?;

match guard.try_io(|inner| inner.get_ref().recv(buf, 0)) {
Ok(x) => return Poll::Ready(x),
Err(_would_block) => continue,
}
})
.await
Expand All @@ -136,19 +91,20 @@ impl TokioSocket {
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<(usize, SocketAddr)>> {
trace!("poll_recv_from called");
ready!(self.0.poll_read_ready(cx, mio::Ready::readable()))?;

trace!("poll_recv_from socket is ready for reading");
match self.0.get_ref().recv_from(buf, 0) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
trace!("poll_recv_from socket would block");
self.0.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => {
trace!("poll_recv_from {:?} bytes read", x);
Poll::Ready(x)
loop {
trace!("poll_recv_from called");
let mut guard = ready!(self.0.poll_read_ready(cx))?;
trace!("poll_recv_from socket is ready for reading");

match guard.try_io(|inner| inner.get_ref().recv_from(buf, 0)) {
Ok(x) => {
trace!("poll_recv_from {:?} bytes read", x);
return Poll::Ready(x);
}
Err(_would_block) => {
trace!("poll_recv_from socket would block");
continue;
}
}
}
}
Expand All @@ -157,19 +113,20 @@ impl TokioSocket {
&mut self,
cx: &mut Context,
) -> Poll<io::Result<(Vec<u8>, SocketAddr)>> {
trace!("poll_recv_from_full called");
ready!(self.0.poll_read_ready(cx, mio::Ready::readable()))?;

trace!("poll_recv_from_full socket is ready for reading");
match self.0.get_ref().recv_from_full() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
trace!("poll_recv_from_full socket would block");
self.0.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => {
trace!("poll_recv_from_full {:?} bytes read", x);
Poll::Ready(x)
loop {
trace!("poll_recv_from_full called");
let mut guard = ready!(self.0.poll_read_ready(cx))?;
trace!("poll_recv_from_full socket is ready for reading");

match guard.try_io(|inner| inner.get_ref().recv_from_full()) {
Ok(x) => {
trace!("poll_recv_from_full {:?} bytes read", x);
return Poll::Ready(x);
}
Err(_would_block) => {
trace!("poll_recv_from_full socket would block");
continue;
}
}
}
}
Expand Down Expand Up @@ -243,7 +200,7 @@ impl FromRawFd for TokioSocket {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
let socket = Socket::from_raw_fd(fd);
socket.set_non_blocking(true).unwrap();
TokioSocket(PollEvented::new(socket).unwrap())
TokioSocket(AsyncFd::new(socket).unwrap())
}
}

Expand Down
6 changes: 3 additions & 3 deletions rtnetlink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ test_as_root = []


[dependencies]
futures = "0.3.1"
futures = "0.3.11"
log = "0.4.8"
thiserror = "1"
netlink-packet-route = { path = "../netlink-packet-route", version = "0.6" }
netlink-proto = { path = "../netlink-proto", version = "0.5" }
byteordered = "0.5.0"
nix = "0.19.0"
tokio = { version = "0.2.6", features = ["rt-core", "blocking"] }
tokio = { version = "1.0.1", features = ["rt"] }

[dev-dependencies]
env_logger = "0.7.1"
ipnetwork = "0.15.1"
tokio = { version = "0.2.6", features = ["macros", "rt-core"] }
tokio = { version = "1.0.1", features = ["macros", "rt", "rt-multi-thread"] }

0 comments on commit 580411b

Please sign in to comment.