Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce dependency on futures crate #140

Merged
12 commits merged into from Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ docs = []
async-task = "1.0.0"
cfg-if = "0.1.9"
crossbeam-channel = "0.3.9"
futures-channel-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.18"
futures-io-preview = "0.3.0-alpha.18"
futures-timer = "0.3.0"
lazy_static = "1.3.0"
log = { version = "0.4.8", features = ["kv_unstable"] }
Expand All @@ -36,11 +39,11 @@ num_cpus = "1.10.0"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"

[dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["async-await", "nightly"]

[dev-dependencies]
femme = "1.1.0"
tempdir = "0.3.7"
surf = "1.0.1"
tempdir = "0.3.7"

[dev-dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["std", "nightly", "async-await"]
2 changes: 1 addition & 1 deletion docs/src/tutorial/connecting_readers_and_writers.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
# task,
# };
# use futures::channel::mpsc;
# use futures::SinkExt;
# use futures::sink::SinkExt;
# use std::sync::Arc;
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
2 changes: 1 addition & 1 deletion docs/src/tutorial/sending_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the
# prelude::Stream,
# };
use futures::channel::mpsc; // 1
use futures::SinkExt;
use futures::sink::SinkExt;
use std::sync::Arc;

# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
105 changes: 12 additions & 93 deletions src/fs/dir_entry.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::ffi::OsString;
use std::fs;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::Arc;

use cfg_if::cfg_if;
use futures::future::{self, FutureExt, TryFutureExt};

use crate::future::Future;
use crate::io;
use crate::task::{blocking, Poll};
use crate::task::blocking;

/// An entry inside a directory.
///
Expand All @@ -21,44 +18,25 @@ use crate::task::{blocking, Poll};
/// [`std::fs::DirEntry`]: https://doc.rust-lang.org/std/fs/struct.DirEntry.html
#[derive(Debug)]
pub struct DirEntry {
/// The state of the entry.
state: Mutex<State>,

/// The full path to the entry.
path: PathBuf,
/// The inner synchronous `DirEntry`.
inner: Arc<fs::DirEntry>,

#[cfg(unix)]
ino: u64,

/// The bare name of the entry without the leading path.
file_name: OsString,
}

/// The state of an asynchronous `DirEntry`.
///
/// The `DirEntry` can be either idle or busy performing an asynchronous operation.
#[derive(Debug)]
enum State {
Idle(Option<fs::DirEntry>),
Busy(blocking::JoinHandle<State>),
}

impl DirEntry {
/// Creates an asynchronous `DirEntry` from a synchronous handle.
pub(crate) fn new(inner: fs::DirEntry) -> DirEntry {
#[cfg(unix)]
let dir_entry = DirEntry {
path: inner.path(),
file_name: inner.file_name(),
ino: inner.ino(),
state: Mutex::new(State::Idle(Some(inner))),
inner: Arc::new(inner),
};

#[cfg(windows)]
let dir_entry = DirEntry {
path: inner.path(),
file_name: inner.file_name(),
state: Mutex::new(State::Idle(Some(inner))),
inner: Arc::new(inner),
};

dir_entry
Expand Down Expand Up @@ -89,7 +67,7 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub fn path(&self) -> PathBuf {
self.path.clone()
self.inner.path()
}

/// Returns the metadata for this entry.
Expand All @@ -114,35 +92,8 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub async fn metadata(&self) -> io::Result<fs::Metadata> {
future::poll_fn(|cx| {
let state = &mut *self.state.lock().unwrap();

loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();

// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = inner.metadata();
let _ = s.send(res);
State::Idle(Some(inner))
}));

return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("invalid state")))
.await?
.map_err(|_| io_error("blocking task failed"))
.await?
let inner = self.inner.clone();
blocking::spawn(async move { inner.metadata() }).await
}

/// Returns the file type for this entry.
Expand All @@ -167,35 +118,8 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub async fn file_type(&self) -> io::Result<fs::FileType> {
future::poll_fn(|cx| {
let state = &mut *self.state.lock().unwrap();

loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();

// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = inner.file_type();
let _ = s.send(res);
State::Idle(Some(inner))
}));

return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("invalid state")))
.await?
.map_err(|_| io_error("blocking task failed"))
.await?
let inner = self.inner.clone();
blocking::spawn(async move { inner.file_type() }).await
}

/// Returns the bare name of this entry without the leading path.
Expand All @@ -218,15 +142,10 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub fn file_name(&self) -> OsString {
self.file_name.clone()
self.inner.file_name()
}
}

/// Creates a custom `io::Error` with an arbitrary error type.
fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}

cfg_if! {
if #[cfg(feature = "docs")] {
use crate::os::unix::fs::DirEntryExt;
Expand Down
Loading