Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Tokio 1.0 (and mio 0.7) #32

Merged
merged 7 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ Interprocess communication library for tokio.
[dependencies]
futures = "0.3"
log = "0.4"
mio-named-pipes = "0.1"
miow = "0.3.3"
rand = "0.7"
tokio = { version = "0.2", features = ["io-driver", "io-util", "uds", "stream", "rt-core", "macros", "time"] }
tokio = { version = "1.7.0", features = ["net", "time"] }
libc = "0.2.65"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase", "winnt", "accctrl", "aclapi", "securitybaseapi", "minwinbase", "winbase"] }

[dev-dependencies]
tokio = { version = "1.7.0", features = ["io-util", "rt", "time", "macros"] }
6 changes: 3 additions & 3 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use tokio::{self, prelude::*};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use parity_tokio_ipc::Endpoint;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let path = std::env::args().nth(1).expect("Run it with server path to connect as argument");

Expand All @@ -19,6 +19,6 @@ async fn main() {
break;
}

tokio::time::delay_for(std::time::Duration::from_secs(2)).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
11 changes: 4 additions & 7 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use futures::StreamExt as _;
use tokio::{
prelude::*,
self,
io::split,
};
use tokio::io::{split, AsyncReadExt, AsyncWriteExt};

use parity_tokio_ipc::{Endpoint, SecurityAttributes};

async fn run_server(path: String) {
let mut endpoint = Endpoint::new(path);
endpoint.set_security_attributes(SecurityAttributes::allow_everyone_create().unwrap());

let mut incoming = endpoint.incoming().expect("failed to open new socket");
let incoming = endpoint.incoming().expect("failed to open new socket");
futures::pin_mut!(incoming);

while let Some(result) = incoming.next().await
{
Expand Down Expand Up @@ -40,7 +37,7 @@ async fn run_server(path: String) {
};
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
Xanewok marked this conversation as resolved.
Show resolved Hide resolved
async fn main() {
let path = std::env::args().nth(1).expect("Run it with server path as argument");
run_server(path).await
Expand Down
15 changes: 6 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,9 @@ pub fn dummy_endpoint() -> String {

#[cfg(test)]
mod tests {
use tokio::prelude::*;
use futures::{channel::oneshot, StreamExt as _, FutureExt as _};
use std::time::Duration;
use tokio::{
self,
io::split,
};
use tokio::io::{split, AsyncReadExt, AsyncWriteExt};

use super::{dummy_endpoint, Endpoint, SecurityAttributes};
use std::path::Path;
Expand All @@ -68,7 +64,8 @@ mod tests {
.set_mode(0o777)
.unwrap()
);
let mut incoming = endpoint.incoming().expect("failed to open up a new socket");
let incoming = endpoint.incoming().expect("failed to open up a new socket");
futures::pin_mut!(incoming);

while let Some(result) = incoming.next().await {
match result {
Expand Down Expand Up @@ -100,12 +97,12 @@ mod tests {
});
tokio::spawn(server);

tokio::time::delay_for(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs(2)).await;

println!("Connecting to client 0...");
let mut client_0 = Endpoint::connect(&path).await
.expect("failed to open client_0");
tokio::time::delay_for(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Connecting to client 1...");
let mut client_1 = Endpoint::connect(&path).await
.expect("failed to open client_1");
Expand All @@ -125,7 +122,7 @@ mod tests {
// shutdown server
if let Ok(()) = shutdown_tx.send(()) {
// wait one second for the file to be deleted.
tokio::time::delay_for(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let path = Path::new(&path);
// assert that it has
assert!(!path.exists());
Expand Down
18 changes: 8 additions & 10 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use libc::chmod;
use std::ffi::CString;
use std::io::{self, Error};
use futures::Stream;
use tokio::prelude::*;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{UnixListener, UnixStream};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::mem::MaybeUninit;

/// Socket permissions and ownership on UNIX
pub struct SecurityAttributes {
Expand Down Expand Up @@ -68,7 +67,7 @@ pub struct Endpoint {

impl Endpoint {
/// Stream of incoming connections
pub fn incoming(self) -> io::Result<impl Stream<Item = tokio::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
pub fn incoming(self) -> io::Result<impl Stream<Item = std::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
let listener = self.inner()?;
// the call to bind in `inner()` creates the file
// `apply_permission()` will set the file permissions.
Expand Down Expand Up @@ -124,7 +123,10 @@ impl Stream for Incoming {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.listener).poll_next(cx)
match Pin::new(&mut this.listener).poll_accept(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => Poll::Ready(Some(result.map(|(stream, _addr)| stream))),
}
}
}

Expand All @@ -149,15 +151,11 @@ impl Connection {
}

impl AsyncRead for Connection {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}

fn poll_read(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.inner).poll_read(ctx, buf)
}
Expand Down
Loading