Skip to content

Commit

Permalink
Reduce dependency on futures crate (#140)
Browse files Browse the repository at this point in the history
* Add future::poll_fn

* Replace all uses of poll_fn with the new one

* Remove some uses of futures

* Simplify ReadDir and DirEntry

* Remove some use of futures from File

* Use futures subcrates

* Fix imports in docs

* Remove futures-util dependency

* Remove futures-executor-preview

* Refactor

* Require more features in the futures-preview crate
  • Loading branch information
Stjepan Glavina authored Sep 4, 2019
1 parent 75a4ba8 commit bac74c2
Show file tree
Hide file tree
Showing 35 changed files with 454 additions and 363 deletions.
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ unstable = []
async-task = "1.0.0"
cfg-if = "0.1.9"
crossbeam-channel = "0.3.9"
futures-channel-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.18"
futures-io-preview = "0.3.0-alpha.18"
futures-timer = "0.3.0"
lazy_static = "1.3.0"
log = { version = "0.4.8", features = ["kv_unstable"] }
Expand All @@ -37,11 +40,11 @@ num_cpus = "1.10.0"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"

[dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["async-await", "nightly"]

[dev-dependencies]
femme = "1.1.0"
tempdir = "0.3.7"
surf = "1.0.1"
tempdir = "0.3.7"

[dev-dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["std", "nightly", "async-await"]
2 changes: 1 addition & 1 deletion docs/src/tutorial/connecting_readers_and_writers.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
# task,
# };
# use futures::channel::mpsc;
# use futures::SinkExt;
# use futures::sink::SinkExt;
# use std::sync::Arc;
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
2 changes: 1 addition & 1 deletion docs/src/tutorial/sending_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the
# prelude::Stream,
# };
use futures::channel::mpsc; // 1
use futures::SinkExt;
use futures::sink::SinkExt;
use std::sync::Arc;
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
105 changes: 12 additions & 93 deletions src/fs/dir_entry.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::ffi::OsString;
use std::fs;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::Arc;

use cfg_if::cfg_if;
use futures::future::{self, FutureExt, TryFutureExt};

use crate::future::Future;
use crate::io;
use crate::task::{blocking, Poll};
use crate::task::blocking;

/// An entry inside a directory.
///
Expand All @@ -21,44 +18,25 @@ use crate::task::{blocking, Poll};
/// [`std::fs::DirEntry`]: https://doc.rust-lang.org/std/fs/struct.DirEntry.html
#[derive(Debug)]
pub struct DirEntry {
/// The state of the entry.
state: Mutex<State>,

/// The full path to the entry.
path: PathBuf,
/// The inner synchronous `DirEntry`.
inner: Arc<fs::DirEntry>,

#[cfg(unix)]
ino: u64,

/// The bare name of the entry without the leading path.
file_name: OsString,
}

/// The state of an asynchronous `DirEntry`.
///
/// The `DirEntry` can be either idle or busy performing an asynchronous operation.
#[derive(Debug)]
enum State {
Idle(Option<fs::DirEntry>),
Busy(blocking::JoinHandle<State>),
}

impl DirEntry {
/// Creates an asynchronous `DirEntry` from a synchronous handle.
pub(crate) fn new(inner: fs::DirEntry) -> DirEntry {
#[cfg(unix)]
let dir_entry = DirEntry {
path: inner.path(),
file_name: inner.file_name(),
ino: inner.ino(),
state: Mutex::new(State::Idle(Some(inner))),
inner: Arc::new(inner),
};

#[cfg(windows)]
let dir_entry = DirEntry {
path: inner.path(),
file_name: inner.file_name(),
state: Mutex::new(State::Idle(Some(inner))),
inner: Arc::new(inner),
};

dir_entry
Expand Down Expand Up @@ -89,7 +67,7 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub fn path(&self) -> PathBuf {
self.path.clone()
self.inner.path()
}

/// Returns the metadata for this entry.
Expand All @@ -114,35 +92,8 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub async fn metadata(&self) -> io::Result<fs::Metadata> {
future::poll_fn(|cx| {
let state = &mut *self.state.lock().unwrap();

loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();

// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = inner.metadata();
let _ = s.send(res);
State::Idle(Some(inner))
}));

return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("invalid state")))
.await?
.map_err(|_| io_error("blocking task failed"))
.await?
let inner = self.inner.clone();
blocking::spawn(async move { inner.metadata() }).await
}

/// Returns the file type for this entry.
Expand All @@ -167,35 +118,8 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub async fn file_type(&self) -> io::Result<fs::FileType> {
future::poll_fn(|cx| {
let state = &mut *self.state.lock().unwrap();

loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();

// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = inner.file_type();
let _ = s.send(res);
State::Idle(Some(inner))
}));

return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("invalid state")))
.await?
.map_err(|_| io_error("blocking task failed"))
.await?
let inner = self.inner.clone();
blocking::spawn(async move { inner.file_type() }).await
}

/// Returns the bare name of this entry without the leading path.
Expand All @@ -218,15 +142,10 @@ impl DirEntry {
/// # Ok(()) }) }
/// ```
pub fn file_name(&self) -> OsString {
self.file_name.clone()
self.inner.file_name()
}
}

/// Creates a custom `io::Error` with an arbitrary error type.
fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}

cfg_if! {
if #[cfg(feature = "docs")] {
use crate::os::unix::fs::DirEntryExt;
Expand Down
Loading

0 comments on commit bac74c2

Please sign in to comment.