Skip to content

Commit

Permalink
fusedev: support splice to handle FUSE requests.
Browse files Browse the repository at this point in the history
1. Support enable splice_read/splice_write on FuseChannel.
2. Add splice interface for ZeroCopyReader and ZeroCopyWriter.
3. Add unit-test cases for splice interface.

Signed-off-by: Henry Huang <henry.hj@antgroup.com>
  • Loading branch information
henry.hj committed Jan 22, 2024
1 parent 5ae92b3 commit 3507c57
Show file tree
Hide file tree
Showing 8 changed files with 1,495 additions and 241 deletions.
95 changes: 95 additions & 0 deletions src/api/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::abi::fuse_abi::{ino64_t, stat64};
mod async_io;
#[cfg(feature = "async-io")]
pub use async_io::{AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter};
#[cfg(all(target_os = "linux", feature = "fusedev"))]
use std::os::unix::io::AsRawFd;

mod sync_io;
pub use sync_io::FileSystem;
Expand Down Expand Up @@ -208,6 +210,18 @@ pub trait ZeroCopyReader: io::Read {
off: u64,
) -> io::Result<usize>;

/// Copies at most `count` bytes from `self` directly into `f` at offset `off` with less data copy
/// `f` could be local file description or tcp socket
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn splice_to(
&mut self,
_f: &dyn AsRawFd,
_count: usize,
_off: Option<u64>,
) -> io::Result<usize> {
Err(io::Error::from_raw_os_error(libc::ENOSYS))
}

/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
/// must be less than `u64::MAX`.
///
Expand Down Expand Up @@ -251,6 +265,50 @@ pub trait ZeroCopyReader: io::Read {
Ok(())
}

/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
/// must be less than `u64::MAX`.
/// `f` could be local file description or tcp socket
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn splice_exact_to(
&mut self,
f: &mut dyn AsRawFd,
mut count: usize,
mut off: Option<u64>,
) -> io::Result<()> {
let c = count
.try_into()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
if let Some(v) = off.as_ref() {
if v.checked_add(c).is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"`off` + `count` must be less than u64::MAX",
));
}
}

while count > 0 {
match self.splice_to(f, count, off) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to fill whole buffer",
))
}
Ok(n) => {
count -= n;
if let Some(v) = off.as_mut() {
*v += n as u64;
}
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}

Ok(())
}

/// Copies all remaining bytes from `self` into `f` at offset `off`. Equivalent to repeatedly
/// calling `read_to` until it returns either `Ok(0)` or a non-`ErrorKind::Interrupted` error.
///
Expand All @@ -275,6 +333,24 @@ pub trait ZeroCopyReader: io::Read {
}
}
}

/// Copies all remaining bytes from `self` into `f` at offset `off`.
/// `f` could be local file description or tcp socket
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn splice_to_end(&mut self, f: &mut dyn AsRawFd, mut off: Option<u64>) -> io::Result<usize> {
let mut out = 0;
loop {
match self.splice_to(f, ::std::usize::MAX, off) {
Ok(0) => return Ok(out),
Ok(n) => {
off.as_mut().map(|v| v.saturating_add(n as u64));
out += n;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
}
}

/// A trait for directly copying data from a `File` into the fuse transport without first storing
Expand All @@ -300,6 +376,25 @@ pub trait ZeroCopyWriter: io::Write {
off: u64,
) -> io::Result<usize>;

/// Append `count` bytes data from fd `f` at offset `off`
/// `f` should be file description or socket
/// This data is always append at the end of all data, only available for bufferd writer.
/// For example:
/// We already write "aaa" to writer. Then we append fd buf witch contains "bbb".
/// Finally we write "ccc" to writer. The final data is "aaacccbbb".
///
/// # Errors
/// ENOSYS: writer doesn't support this operation, should fallback to use `write_from`.
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn append_fd_buf(
&mut self,
_f: &dyn AsRawFd,
_count: usize,
_off: Option<u64>,
) -> io::Result<usize> {
Err(io::Error::from_raw_os_error(libc::ENOSYS))
}

/// Copies exactly `count` bytes of data from `f` at offset `off` into `self`. `off + count`
/// must be less than `u64::MAX`.
///
Expand Down
60 changes: 40 additions & 20 deletions src/api/server/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> {
}
}

struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>);
struct AsyncZcWriter<'a, 'b, S: BitmapSlice = ()>(Writer<'a, 'b, S>);

// The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array.
// Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send
// 'Reader' to other threads.
unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {}
unsafe impl<'a, 'b, S: BitmapSlice> Send for AsyncZcWriter<'a, 'b, S> {}

#[async_trait(?Send)]
impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
impl<'a, 'b, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
async fn async_write_from(
&mut self,
f: Arc<dyn AsyncFileReadWriteVolatile>,
Expand All @@ -79,7 +79,7 @@ impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
}
}

impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
impl<'a, 'b, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
fn write_from(
&mut self,
f: &mut dyn FileReadWriteVolatile,
Expand All @@ -94,7 +94,7 @@ impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
}
}

impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> {
impl<'a, 'b, S: BitmapSlice> io::Write for AsyncZcWriter<'a, 'b, S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
Expand All @@ -120,7 +120,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
pub async unsafe fn async_handle_message<S: BitmapSlice>(
&self,
mut r: Reader<'_, S>,
w: Writer<'_, S>,
w: Writer<'_, '_, S>,
vu_req: Option<&mut dyn FsCacheReqHandler>,
hook: Option<&dyn MetricsHook>,
) -> Result<usize> {
Expand Down Expand Up @@ -222,7 +222,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
res
}

async fn async_lookup<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_lookup<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?;
let name = match bytes_to_cstr(buf.as_ref()) {
Ok(name) => name,
Expand All @@ -234,8 +237,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
return Err(e);
}
};

let version = self.vers.load();
let version = &self.meta.load().version;
let result = self
.fs
.async_lookup(ctx.context(), ctx.nodeid(), name)
Expand All @@ -258,7 +260,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_getattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_getattr<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let handle = if (flags & GETATTR_FH) != 0 {
Some(fh.into())
Expand All @@ -273,7 +278,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
ctx.async_handle_attr_result(result).await
}

async fn async_setattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_setattr<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let handle = if setattr_in.valid & FATTR_FH != 0 {
Some(setattr_in.fh.into())
Expand All @@ -290,7 +298,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
ctx.async_handle_attr_result(result).await
}

async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let result = self
.fs
Expand All @@ -311,7 +319,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
let ReadIn {
fh,
offset,
Expand Down Expand Up @@ -369,7 +377,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
.await
.map_err(Error::EncodeMessage)?;
ctx.w
.async_commit(Some(&data_writer.0))
.async_commit(Some(&mut data_writer.0))
.await
.map_err(Error::EncodeMessage)?;
Ok(out.len as usize)
Expand All @@ -378,7 +386,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_write<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_write<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let WriteIn {
fh,
offset,
Expand Down Expand Up @@ -430,7 +441,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_fsync<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_fsync<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let FsyncIn {
fh, fsync_flags, ..
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
Expand All @@ -446,7 +460,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_fsyncdir<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_fsyncdir<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let FsyncIn {
fh, fsync_flags, ..
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
Expand All @@ -462,7 +479,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_create<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_create<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::<CreateIn>())?;
let name = match bytes_to_cstr(buf.as_ref()) {
Expand Down Expand Up @@ -508,7 +528,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {

async fn async_fallocate<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, F, S>,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let FallocateIn {
fh,
Expand All @@ -529,7 +549,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> {
impl<'a, 'b, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> {
async fn async_reply_ok<T: ByteValued>(
&mut self,
out: Option<T>,
Expand Down
Loading

0 comments on commit 3507c57

Please sign in to comment.