Skip to content

Commit

Permalink
impl for MutateInnerOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmicexplorer committed Oct 17, 2023
1 parent eda9f45 commit f755697
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
//!
//!

#![feature(can_vector)]

#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]

Expand Down
181 changes: 170 additions & 11 deletions src/tokio/os/copy_file_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio_pipe::{PipeRead, PipeWrite};

use std::{
future::Future,
io::IoSlice,
io::{IoSlice, IoSliceMut},
mem::{self, MaybeUninit},
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},
pin::Pin,
Expand Down Expand Up @@ -106,6 +106,79 @@ impl CopyFileRangeHandle for MutateInnerOffset {
}
}

impl std::io::Read for MutateInnerOffset {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let fd = self.owned_fd.as_raw_fd();
/* FIXME: make this truly async instead of sync, perf permitting! */
let num_read =
unsafe { cvt!(libc::read(fd, mem::transmute(buf.as_mut_ptr()), buf.len())) }?;
assert!(num_read >= 0);
Ok(num_read as usize)
}

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
let fd = self.owned_fd.as_raw_fd();
/* FIXME: make this truly async instead of sync, perf permitting! */
let num_read = unsafe {
cvt!(libc::readv(
fd,
mem::transmute(bufs.as_ptr()),
bufs.len() as libc::c_int
))
}?;
assert!(num_read >= 0);
Ok(num_read as usize)
}

#[inline]
fn is_read_vectored(&self) -> bool {
true
}
}

impl std::io::Seek for MutateInnerOffset {
fn seek(&mut self, arg: io::SeekFrom) -> io::Result<u64> {
let (offset, whence): (libc::off64_t, libc::c_int) = match arg {
io::SeekFrom::Start(pos) => (pos as libc::off64_t, libc::SEEK_SET),
io::SeekFrom::Current(diff) => (diff, libc::SEEK_CUR),
io::SeekFrom::End(diff) => (diff, libc::SEEK_END),
};
let fd = self.owned_fd.as_raw_fd();
let new_offset = cvt!(unsafe { libc::lseek(fd, offset, whence) })?;
self.offset = new_offset as u64;
Ok(self.offset)
}
}

impl std::io::Write for MutateInnerOffset {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let fd = self.owned_fd.as_raw_fd();
let num_written =
cvt!(unsafe { libc::write(fd, mem::transmute(buf.as_ptr()), buf.len()) })?;
assert!(num_written > 0);
Ok(num_written as usize)
}

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
let fd = self.owned_fd.as_raw_fd();
let num_written = cvt!(unsafe {
libc::writev(fd, mem::transmute(bufs.as_ptr()), bufs.len() as libc::c_int)
})?;
assert!(num_written > 0);
Ok(num_written as usize)
}

#[inline]
fn is_write_vectored(&self) -> bool {
true
}

fn flush(&mut self) -> io::Result<()> {
let _ = cvt!(unsafe { libc::fdatasync(self.owned_fd.as_raw_fd()) })?;
Ok(())
}
}

impl io::AsyncRead for MutateInnerOffset {
fn poll_read(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -235,6 +308,95 @@ impl FromGivenOffset {
}
}

impl std::io::Read for FromGivenOffset {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let num_read = cvt!(unsafe {
libc::pread(
self.fd,
mem::transmute(buf.as_mut_ptr()),
buf.len(),
self.offset,
)
})?;
assert!(num_read >= 0);
self.offset += num_read as i64;
Ok(num_read as usize)
}

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
let num_read = cvt!(unsafe {
libc::preadv(
self.fd,
mem::transmute(bufs.as_mut_ptr()),
bufs.len() as libc::c_int,
self.offset,
)
})?;
assert!(num_read >= 0);
self.offset += num_read as i64;
Ok(num_read as usize)
}

#[inline]
fn is_read_vectored(&self) -> bool {
true
}
}

impl std::io::Seek for FromGivenOffset {
fn seek(&mut self, arg: io::SeekFrom) -> io::Result<u64> {
self.offset = match arg {
io::SeekFrom::Start(from_start) => from_start as i64,
io::SeekFrom::Current(diff) => self.offset + diff,
io::SeekFrom::End(from_end) => {
assert!(from_end <= 0);
let full_len = self.len_sync()?;
full_len + from_end
}
};
Ok(self.offset as u64)
}
}

impl std::io::Write for FromGivenOffset {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let num_written = cvt!(unsafe {
libc::pwrite(
self.fd,
mem::transmute(buf.as_ptr()),
buf.len(),
self.offset,
)
})?;
assert!(num_written > 0);
self.offset += num_written as i64;
Ok(num_written as usize)
}

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
let num_written = cvt!(unsafe {
libc::pwritev(
self.fd,
mem::transmute(bufs.as_ptr()),
bufs.len() as libc::c_int,
self.offset,
)
})?;
assert!(num_written > 0);
self.offset += num_written as i64;
Ok(num_written as usize)
}

fn is_write_vectored(&self) -> bool {
true
}

fn flush(&mut self) -> io::Result<()> {
let _ = cvt!(unsafe { libc::fdatasync(self.fd) })?;
Ok(())
}
}

impl io::AsyncRead for FromGivenOffset {
fn poll_read(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -754,28 +916,25 @@ mod test {

#[tokio::test]
async fn test_io_copy_for_owned_wrapper() {
use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use std::io::prelude::*;

let mut f = MutateInnerOffset::new(tempfile::tempfile().unwrap(), Role::Writable)
.await
.unwrap();
f.write_all(b"hello").await.unwrap();
f.seek(io::SeekFrom::Start(0)).await.unwrap();
f.write_all(b"hello").unwrap();
f.seek(io::SeekFrom::Start(0)).unwrap();
let mut f_in = MutateInnerOffset::new(f, Role::Readable).await.unwrap();

let mut f_out = MutateInnerOffset::new(tempfile::tempfile().unwrap(), Role::Writable)
.await
.unwrap();

io::copy(&mut Pin::new(&mut f_in), &mut Pin::new(&mut f_out))
.await
.unwrap();
std::io::copy(&mut f_in, &mut f_out).unwrap();

let f: std::fs::File = f_out.into();
let mut f = tokio::fs::File::from_std(f);
f.seek(io::SeekFrom::Start(0)).await.unwrap();
let mut f: std::fs::File = f_out.into();
f.seek(io::SeekFrom::Start(0)).unwrap();
let mut s = String::new();
f.read_to_string(&mut s).await.unwrap();
f.read_to_string(&mut s).unwrap();
assert_eq!(&s, "hello");
}

Expand Down

0 comments on commit f755697

Please sign in to comment.