diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index 19c5bac90..27a776841 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -17,10 +17,13 @@ use futures_lite::{Stream, StreamExt}; use nix::sys::statfs::*; use std::{ cell::Ref, + future::Future, io, os::unix::io::{AsRawFd, RawFd}, path::Path, + pin::Pin, rc::Rc, + task::{Context, Poll}, }; pub(super) type Result = crate::Result; @@ -242,7 +245,12 @@ impl DmaFile { /// /// The position must be aligned to for Direct I/O. In most platforms /// that means 512 bytes. - pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result { + /// + /// Equals to + /// ```ignore + /// pub async fn read_at_aligned(&self, pos: u64, size: usize) -> Result; + /// ``` + pub fn read_at_aligned(&self, pos: u64, size: usize) -> PollDmaReadAtAligned<'_> { let source = self.file.reactor.upgrade().unwrap().read_dma( self.as_raw_fd(), pos, @@ -250,8 +258,10 @@ impl DmaFile { self.pollable, self.file.scheduler.borrow().as_ref(), ); - let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?; - Ok(ReadResult::from_sliced_buffer(source, 0, read_size)) + PollDmaReadAtAligned { + source: Some(source), + file: &self.file, + } } /// Reads into buffer in buf from a specific position in the file. @@ -262,7 +272,12 @@ impl DmaFile { /// /// If you can guarantee proper alignment, prefer [`Self::read_at_aligned`] /// instead - pub async fn read_at(&self, pos: u64, size: usize) -> Result { + /// + /// Equals to + /// ```ignore + /// pub async fn read_at(&self, pos: u64, size: usize) -> Result; + /// ``` + pub fn read_at(&self, pos: u64, size: usize) -> PollDmaReadAt<'_> { let eff_pos = self.align_down(pos); let b = (pos - eff_pos) as usize; @@ -274,13 +289,12 @@ impl DmaFile { self.pollable, self.file.scheduler.borrow().as_ref(), ); - - let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?; - Ok(ReadResult::from_sliced_buffer( - source, - b, - std::cmp::min(read_size, size), - )) + PollDmaReadAt { + source: Some(source), + file: &self.file, + begin: b, + size, + } } /// Submit many reads and process the results in a stream-like fashion via a @@ -424,6 +438,61 @@ impl DmaFile { } } +/// Future of [`DmaFile::read_at_aligned`]. +#[derive(Debug)] +#[must_use = "future has no effect unless you .await or poll it"] +pub struct PollDmaReadAtAligned<'a> { + source: Option, + file: &'a GlommioFile, +} + +impl Future for PollDmaReadAtAligned<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let read_size = self + .source + .as_ref() + .expect("Polling a finished task") + .poll_collect_rw(cx) + .map(|read_size| enhanced_try!(read_size, "Reading", self.file))?; + + read_size.map(|size| { + let source = self.get_mut().source.take().unwrap(); + Ok(ReadResult::from_sliced_buffer(source, 0, size)) + }) + } +} + +/// Future of [`DmaFile::read_at`]. +#[derive(Debug)] +#[must_use = "future has no effect unless you .await or poll it"] +pub struct PollDmaReadAt<'a> { + source: Option, + file: &'a GlommioFile, + begin: usize, + size: usize, +} +impl Future for PollDmaReadAt<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let read_size = self + .source + .as_ref() + .expect("Polling a finished task") + .poll_collect_rw(cx) + .map(|read_size| enhanced_try!(read_size, "Reading", self.file))?; + + read_size.map(|read_size| { + let offset = self.begin; + let len = self.size.min(read_size); + let source = self.get_mut().source.take().unwrap(); + Ok(ReadResult::from_sliced_buffer(source, offset, len)) + }) + } +} + #[cfg(test)] pub(crate) mod test { use super::*; diff --git a/glommio/src/io/immutable_file.rs b/glommio/src/io/immutable_file.rs index cf1663b36..5371b27e8 100644 --- a/glommio/src/io/immutable_file.rs +++ b/glommio/src/io/immutable_file.rs @@ -10,8 +10,8 @@ use crate::io::{ DmaStreamWriter, DmaStreamWriterBuilder, IoVec, + PollDmaReadAt, ReadManyResult, - ReadResult, ScheduledSource, }; use futures_lite::{future::poll_fn, io::AsyncWrite, Stream}; @@ -372,8 +372,13 @@ impl ImmutableFile { /// It is not necessary to respect the `O_DIRECT` alignment of the file, and /// this API will internally convert the positions and sizes to match, /// at a cost. - pub async fn read_at(&self, pos: u64, size: usize) -> Result { - self.stream_builder.file.read_at(pos, size).await + /// + /// Equals to + /// ```ignore + /// pub async fn read_at(&self, pos: u64, size: usize) -> Result; + /// ``` + pub fn read_at(&self, pos: u64, size: usize) -> PollDmaReadAt<'_> { + self.stream_builder.file.read_at(pos, size) } /// Submit many reads and process the results in a stream-like fashion via a diff --git a/glommio/src/io/mod.rs b/glommio/src/io/mod.rs index 647b8d3a2..5562d8c8a 100644 --- a/glommio/src/io/mod.rs +++ b/glommio/src/io/mod.rs @@ -163,7 +163,7 @@ pub use self::{ }, bulk_io::{IoVec, ReadManyResult}, directory::Directory, - dma_file::{CloseResult, DmaFile}, + dma_file::{CloseResult, DmaFile, PollDmaReadAt, PollDmaReadAtAligned}, dma_file_stream::{ DmaStreamReader, DmaStreamReaderBuilder, diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index aa6984f06..3457cc5ce 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -32,7 +32,7 @@ use std::{ os::unix::io::RawFd, path::PathBuf, rc::Rc, - task::{Poll, Waker}, + task::{Context, Poll, Waker}, time::Duration, }; @@ -264,6 +264,15 @@ impl Source { }) .await } + + pub(crate) fn poll_collect_rw(&self, cx: &mut Context<'_>) -> Poll> { + if let Some(result) = self.result() { + return Poll::Ready(result); + } + + self.add_waiter_many(cx.waker().clone()); + Poll::Pending + } } impl Drop for Source {