From 1cde8d6b64e9e1da15e3af9023870799eeaea604 Mon Sep 17 00:00:00 2001 From: Ian O'Connell Date: Tue, 16 Feb 2021 12:27:15 -0700 Subject: [PATCH] Try ensure osx can work correctly. Improve error handling and some edge cases around fuse file handles for shutdown --- examples/xmp.rs | 1 - src/channel.rs | 324 +++++++++++++---------------------- src/io_ops/blocking_io.rs | 55 ++++++ src/io_ops/mod.rs | 48 ++++++ src/io_ops/nonblocking_io.rs | 103 +++++++++++ src/lib.rs | 1 + src/request.rs | 20 +-- src/session.rs | 117 ++++++++----- 8 files changed, 408 insertions(+), 261 deletions(-) create mode 100644 src/io_ops/blocking_io.rs create mode 100644 src/io_ops/mod.rs create mode 100644 src/io_ops/nonblocking_io.rs diff --git a/examples/xmp.rs b/examples/xmp.rs index 74478c3a..99d7d09a 100644 --- a/examples/xmp.rs +++ b/examples/xmp.rs @@ -498,7 +498,6 @@ impl Filesystem for XmpFS { datasync: bool, reply: ReplyEmpty, ) { - eprintln!("Calling fsync"); if !self.opened_files.contains_key(&fh) { reply.error(EIO).await; return; diff --git a/src/channel.rs b/src/channel.rs index b184a469..4c4b808f 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -13,23 +13,23 @@ use crate::fuse_sys::{ fuse_session_destroy, fuse_session_fd, fuse_session_mount, fuse_session_new, fuse_session_unmount, }; -use async_trait::async_trait; -use libc::{self, c_int, c_void, size_t, O_NONBLOCK}; + +use libc::{self, c_int, c_void}; use log::error; +use log::warn; #[cfg(any(feature = "libfuse", test))] use std::ffi::OsStr; use std::os::unix::io::IntoRawFd; -use std::os::unix::io::RawFd; -use std::os::unix::{ffi::OsStrExt, prelude::AsRawFd}; + +use crate::io_ops::{FileDescriptorRawHandle, SubChannel}; +use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; use std::{ ffi::{CStr, CString}, sync::Arc, }; use std::{io, ptr}; -use tokio::io::unix::AsyncFd; -use crate::reply::ReplySender; #[cfg(not(feature = "libfuse"))] use crate::MountOption; @@ -59,22 +59,12 @@ fn with_fuse_args T>(options: &[&OsStr], f: F) -> T }) } -/// In the latest version of rust this isn't required since RawFd implements AsRawFD -/// but until pretty recently that didn't work. So including this wrapper is cheap and allows -/// us better compatibility. -#[derive(Debug, Clone, Copy)] -pub(crate) struct FileDescriptorRawHandle(pub(in crate) RawFd); -impl AsRawFd for FileDescriptorRawHandle { - fn as_raw_fd(&self) -> RawFd { - self.0 - } -} /// A raw communication channel to the FUSE kernel driver #[derive(Debug)] pub struct Channel { mountpoint: PathBuf, pub(in crate) session_fd: FileDescriptorRawHandle, - worker_fds: Vec, + pub(in crate) sub_channels: Arc>, pub(in crate) fuse_session: *mut c_void, } @@ -83,84 +73,107 @@ pub struct Channel { unsafe impl Send for Channel {} impl Channel { - /// Build async channels - /// Using the set of child raw FD's, make tokio async FD's from them for usage. - pub(crate) fn async_channels(&self) -> io::Result>>> { - let mut r = Vec::default(); + /// This allows file systems to work concurrently over several buffers/descriptors for concurrent operation. + /// More detailed description of the protocol is at: + /// https://john-millikin.com/the-fuse-protocol#multi-threading + /// + #[cfg(not(target_os = "macos"))] + fn create_worker(root_fd: &FileDescriptorRawHandle) -> io::Result { + let fuse_device_name = "/dev/fuse"; + + let fd = match std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(fuse_device_name) + { + Ok(file) => file.into_raw_fd(), + Err(error) => { + if error.kind() == io::ErrorKind::NotFound { + error!("{} not found. Try 'modprobe fuse'", fuse_device_name); + } + return Err(error); + } + }; - r.push(Arc::new(AsyncFd::new(self.session_fd)?)); + let code = unsafe { libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) }; + if code == -1 { + eprintln!("fcntl command failed with {}", code); + return Err(io::Error::last_os_error()); + } - for worker in self.worker_fds.iter() { - r.push(Arc::new(AsyncFd::new(*worker)?)); + let code = unsafe { libc::ioctl(fd, FUSE_DEV_IOC_CLONE, &root_fd.0) }; + if code == -1 { + eprintln!("Clone command failed with {}", code); + return Err(io::Error::last_os_error()); } - Ok(r) + Ok(SubChannel::new(FileDescriptorRawHandle(fd), false)?) } + // mac fuse seems to just re-use the root fd relying onthe atomic semantics setup in the driver + // This will have lowerthroughput than the linux approach. + #[cfg(target_os = "macos")] + fn create_worker(root_fd: &FileDescriptorRawHandle) -> io::Result { + SubChannel::new(*root_fd, true) + } /// /// Create worker fd's takes the root/session file descriptor and makes several clones /// This allows file systems to work concurrently over several buffers/descriptors for concurrent operation. /// More detailed description of the protocol is at: /// https://john-millikin.com/the-fuse-protocol#multi-threading /// - fn create_worker_fds( + fn create_sub_channels( root_fd: &FileDescriptorRawHandle, worker_channels: usize, - ) -> io::Result> { - let fuse_device_name = "/dev/fuse"; - + ) -> io::Result> { let mut res = Vec::default(); - for _ in 0..worker_channels { - let fd = match std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(fuse_device_name) - { - Ok(file) => file.into_raw_fd(), - Err(error) => { - if error.kind() == io::ErrorKind::NotFound { - error!("{} not found. Try 'modprobe fuse'", fuse_device_name); - } - return Err(error); - } - }; - - let cur_flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) }; + res.push(SubChannel::new(*root_fd, false)?); - let code = unsafe { libc::fcntl(fd, libc::F_SETFL, cur_flags | O_NONBLOCK) }; - if code == -1 { - eprintln!("fcntl set flags command failed with {}", code); - return Err(io::Error::last_os_error()); - } - - let code = unsafe { libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) }; - if code == -1 { - eprintln!("fcntl command failed with {}", code); - return Err(io::Error::last_os_error()); - } - - let code = unsafe { libc::ioctl(fd, FUSE_DEV_IOC_CLONE, &root_fd.0) }; - if code == -1 { - eprintln!("Clone command failed with {}", code); - return Err(io::Error::last_os_error()); - } - - res.push(FileDescriptorRawHandle(fd)); + for _ in 0..worker_channels { + res.push(Channel::create_worker(root_fd)?); } Ok(res) } - fn set_non_block(fd: RawFd) -> io::Result<()> { - let cur_flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) }; - - let code = unsafe { libc::fcntl(fd, libc::F_SETFL, cur_flags | O_NONBLOCK) }; - if code == -1 { - eprintln!("fcntl set flags command failed with {}", code); - return Err(io::Error::last_os_error()); + fn internal_new_on_err_cleanup( + mountpoint: PathBuf, + worker_channel_count: usize, + fd: FileDescriptorRawHandle, + fuse_session: *mut c_void, + ) -> io::Result { + if fd.0 < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(Channel { + mountpoint, + sub_channels: Arc::new(Channel::create_sub_channels(&fd, worker_channel_count)?), + session_fd: fd, + fuse_session, + }) + } + } + fn new_from_session_and_fd( + mountpoint: &Path, + worker_channel_count: usize, + fd: FileDescriptorRawHandle, + fuse_session: *mut c_void, + ) -> io::Result { + match Channel::internal_new_on_err_cleanup( + mountpoint.to_owned(), + worker_channel_count, + fd, + fuse_session, + ) { + Ok(r) => Ok(r), + Err(err) => { + if let Err(e) = unmount(mountpoint, fuse_session, fd.0) { + warn!("When shutting down on error, attempted to unmount failed with error {:?}. Given failure to mount this maybe be fine.", e); + }; + Err(err) + } } - Ok(()) } /// Create a new communication channel to the kernel driver by mounting the /// given path. The kernel driver will delegate filesystem operations of @@ -178,20 +191,12 @@ impl Channel { let mnt = CString::new(mountpoint.as_os_str().as_bytes())?; let fd = unsafe { fuse_mount_compat25(mnt.as_ptr(), args) }; - Channel::set_non_block(fd)?; - - if fd < 0 { - Err(io::Error::last_os_error()) - } else { - let fd = FileDescriptorRawHandle(fd); - - Ok(Channel { - mountpoint, - worker_fds: Channel::create_worker_fds(&fd, worker_channel_count)?, - session_fd: fd, - fuse_session: ptr::null_mut(), - }) - } + Channel::new_from_session_and_fd( + &mountpoint, + worker_channel_count, + FileDescriptorRawHandle(fd), + ptr::null_mut(), + ) }) } @@ -214,19 +219,12 @@ impl Channel { } let fd = unsafe { fuse_session_fd(fuse_session) }; - if fd < 0 { - Err(io::Error::last_os_error()) - } else { - Channel::set_non_block(fd)?; - - let fd = FileDescriptorRawHandle(fd); - Ok(Channel { - mountpoint, - worker_fds: Channel::create_worker_fds(&fd, worker_channel_count)?, - session_fd: fd, - fuse_session, - }) - } + Channel::new_from_session_and_fd( + &mountpoint, + worker_channel_count, + FileDescriptorRawHandle(fd), + fuse_session, + ) }) } @@ -239,18 +237,13 @@ impl Channel { let mountpoint = mountpoint.canonicalize()?; let fd = fuse_mount_pure(mountpoint.as_os_str(), options)?; - if fd < 0 { - Err(io::Error::last_os_error()) - } else { - Channel::set_non_block(fd)?; - let fd = FileDescriptorRawHandle(fd); - Ok(Channel { - mountpoint, - worker_fds: Channel::create_worker_fds(&fd, worker_channel_count)?, - session_fd: fd, - fuse_session: ptr::null_mut(), - }) - } + + Channel::new_from_session_and_fd( + &mountpoint, + worker_channel_count, + FileDescriptorRawHandle(fd), + ptr::null_mut(), + ) } /// Return path of the mounted filesystem @@ -258,52 +251,18 @@ impl Channel { &self.mountpoint } - fn blocking_receive( - fd: &FileDescriptorRawHandle, - buffer: &mut Vec, - ) -> io::Result> { - let rc = unsafe { - libc::read( - fd.0, - buffer.as_ptr() as *mut c_void, - buffer.capacity() as size_t, - ) - }; - if rc < 0 { - Err(io::Error::last_os_error()) - } else { - unsafe { - buffer.set_len(rc as usize); - } - Ok(Some(())) - } - } - /// Receives data up to the capacity of the given buffer (can block). pub(in crate) async fn receive<'a, 'b>( - async_fd: &'a Arc>, + sub_channel: &'a SubChannel, + terminated: &mut tokio::sync::oneshot::Receiver<()>, buffer: &'b mut Vec, ) -> io::Result> { - use tokio::time::timeout; - - use std::time::Duration; - loop { - // If the main session timesout/shuts down, not all of the worker fds are marked as ready - // this means after every second the poll request here will be aborted and we can spin around - // checking conditions. TODO: Probably should try solve this with a one shot channel on destruction. - if let Ok(guard_result) = - timeout(Duration::from_millis(1000), async_fd.readable()).await - { - let mut guard = guard_result?; - - match guard.try_io(|inner| Channel::blocking_receive(inner.get_ref(), buffer)) { - Ok(result) => return result, - Err(_would_block) => { - continue; - } - } - } else { - return Ok(None); - } + tokio::select! { + _ = terminated => { + Ok(None) + } + result = sub_channel.do_receive(buffer) => { + result + } } } } @@ -314,65 +273,16 @@ impl Drop for Channel { // Close the communication channel to the kernel driver // (closing it before unnmount prevents sync unmount deadlock) - for raw_fd in self.worker_fds.iter() { - unsafe { - libc::close(raw_fd.0); - } - } - let handle = self.session_fd; - unsafe { - libc::close(handle.0); + // Close all the channel/file handles. This will include the session fd. + for sub_channel in self.sub_channels.iter() { + sub_channel.close() } // Unmount this channel's mount point - let _ = unmount(&self.mountpoint, self.fuse_session, handle.0); + let _ = unmount(&self.mountpoint, self.fuse_session, self.session_fd.0); self.fuse_session = ptr::null_mut(); // unmount frees this pointer } } -#[derive(Clone, Debug)] -pub struct ChannelSender { - pub(crate) fd: Arc>, -} - -impl ChannelSender { - /// Send all data in the slice of slice of bytes in a single write (can block). - pub async fn send(&self, buffer: &[&[u8]]) -> io::Result<()> { - loop { - let mut guard = self.fd.writable().await?; - - match guard.try_io(|inner| { - let iovecs: Vec<_> = buffer - .iter() - .map(|d| libc::iovec { - iov_base: d.as_ptr() as *mut c_void, - iov_len: d.len() as size_t, - }) - .collect(); - let rc = unsafe { - libc::writev(inner.get_ref().0, iovecs.as_ptr(), iovecs.len() as c_int) - }; - if rc < 0 { - Err(io::Error::last_os_error()) - } else { - Ok(()) - } - }) { - Ok(result) => return result, - Err(_would_block) => continue, - } - } - } -} - -#[async_trait] -impl ReplySender for ChannelSender { - async fn send(&self, data: &[&[u8]]) { - if let Err(err) = ChannelSender::send(self, data).await { - error!("Failed to send FUSE reply: {}", err); - } - } -} - /// Unmount an arbitrary mount point #[allow(unused_variables)] pub fn unmount(mountpoint: &Path, fuse_session: *mut c_void, fd: c_int) -> io::Result<()> { diff --git a/src/io_ops/blocking_io.rs b/src/io_ops/blocking_io.rs new file mode 100644 index 00000000..df8751ed --- /dev/null +++ b/src/io_ops/blocking_io.rs @@ -0,0 +1,55 @@ +use super::FileDescriptorRawHandle; +use async_trait::async_trait; +use libc::{self, c_int, c_void, size_t}; +use log::error; +use std::io; + +#[derive(Debug, Clone)] +pub struct SubChannel { + fd: FileDescriptorRawHandle, + shared: bool, // If its a shared file handle when asked to close, noop. +} + +impl SubChannel { + pub fn new(fd: FileDescriptorRawHandle, shared: bool) -> io::Result { + Ok(SubChannel { fd, shared }) + } + + /// Send all data in the slice of slice of bytes in a single write (can block). + pub async fn send(&self, buffer: &[&[u8]]) -> io::Result<()> { + let iovecs: Vec<_> = buffer + .iter() + .map(|d| libc::iovec { + iov_base: d.as_ptr() as *mut c_void, + iov_len: d.len() as size_t, + }) + .collect(); + let rc = unsafe { libc::writev(self.fd.0, iovecs.as_ptr(), iovecs.len() as c_int) }; + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + + pub fn close(&self) { + if !self.shared { + unsafe { + libc::close(self.fd.0); + } + } + } + + pub async fn do_receive(&self, buffer: &'_ mut Vec) -> io::Result> { + tokio::task::block_in_place(|| super::blocking_receive(&self.fd, buffer)) + } +} + +#[async_trait] +impl crate::reply::ReplySender for SubChannel { + async fn send(&self, data: &[&[u8]]) { + if let Err(err) = SubChannel::send(self, data).await { + error!("Failed to send FUSE reply: {}", err); + } + } +} diff --git a/src/io_ops/mod.rs b/src/io_ops/mod.rs new file mode 100644 index 00000000..1f4f8544 --- /dev/null +++ b/src/io_ops/mod.rs @@ -0,0 +1,48 @@ +use libc::{self, c_void, size_t}; + +use std::os::unix::io::RawFd; +use std::os::unix::prelude::AsRawFd; + +use std::io; + +/// In the latest version of rust this isn't required since RawFd implements AsRawFD +/// but until pretty recently that didn't work. So including this wrapper is cheap and allows +/// us better compatibility. +#[derive(Debug, Clone, Copy)] +pub struct FileDescriptorRawHandle(pub(in crate) RawFd); + +impl AsRawFd for FileDescriptorRawHandle { + fn as_raw_fd(&self) -> RawFd { + self.0 + } +} + +fn blocking_receive(fd: &FileDescriptorRawHandle, buffer: &mut Vec) -> io::Result> { + let rc = unsafe { + libc::read( + fd.0, + buffer.as_ptr() as *mut c_void, + buffer.capacity() as size_t, + ) + }; + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + unsafe { + buffer.set_len(rc as usize); + } + Ok(Some(())) + } +} + +#[cfg(target_os = "macos")] +pub mod blocking_io; + +#[cfg(target_os = "macos")] +pub(crate) use blocking_io::SubChannel; + +#[cfg(not(target_os = "macos"))] +pub mod nonblocking_io; + +#[cfg(not(target_os = "macos"))] +pub(crate) use nonblocking_io::SubChannel; diff --git a/src/io_ops/nonblocking_io.rs b/src/io_ops/nonblocking_io.rs new file mode 100644 index 00000000..5fe3b444 --- /dev/null +++ b/src/io_ops/nonblocking_io.rs @@ -0,0 +1,103 @@ +use super::FileDescriptorRawHandle; +use async_trait::async_trait; +use libc::O_NONBLOCK; +use libc::{self, c_int, c_void, size_t}; +use log::error; +use std::io; +use std::sync::Arc; +use tokio::io::unix::AsyncFd; + +#[derive(Debug, Clone)] +pub struct SubChannel { + fd: Arc>, + shared: bool, // If its a shared file handle when asked to close, noop. +} +impl SubChannel { + pub fn new(fd: FileDescriptorRawHandle, shared: bool) -> io::Result { + let code = unsafe { libc::fcntl(fd.0, libc::F_SETFL, O_NONBLOCK) }; + if code == -1 { + eprintln!( + "fcntl set file handle to O_NONBLOCK command failed with {}", + code + ); + return Err(io::Error::last_os_error()); + } + + Ok(SubChannel { + fd: Arc::new(AsyncFd::new(fd)?), + shared, + }) + } + /// Send all data in the slice of slice of bytes in a single write (can block). + pub async fn send(&self, buffer: &[&[u8]]) -> io::Result<()> { + loop { + let mut guard = self.fd.writable().await?; + + match guard.try_io(|inner| { + let iovecs: Vec<_> = buffer + .iter() + .map(|d| libc::iovec { + iov_base: d.as_ptr() as *mut c_void, + iov_len: d.len() as size_t, + }) + .collect(); + let rc = unsafe { + libc::writev(inner.get_ref().0, iovecs.as_ptr(), iovecs.len() as c_int) + }; + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + }) { + Ok(result) => return result, + Err(_would_block) => continue, + } + } + } + + pub fn close(&self) { + if !self.shared { + unsafe { + libc::close(self.fd.get_ref().0); + } + } + } + + pub async fn do_receive(&self, buffer: &'_ mut Vec) -> io::Result> { + use std::time::Duration; + use tokio::time::timeout; + loop { + if let Ok(guard_result) = timeout(Duration::from_millis(1000), self.fd.readable()).await + { + let mut guard = guard_result?; + match guard.try_io(|inner| super::blocking_receive(inner.get_ref(), buffer)) { + Ok(result) => return result, + Err(_would_block) => { + return Ok(None); + } + } + } else { + // some termination states when it comes to fuse in the kernel(umount sometimes..), do not trigger readable. + // so after a timeout/every so often we need to just try do the read manually. + match super::blocking_receive(self.fd.get_ref(), buffer) { + Ok(r) => return Ok(r), + Err(e) => { + if e.kind() != io::ErrorKind::WouldBlock { + return Err(e); + } + } + } + } + } + } +} + +#[async_trait] +impl crate::reply::ReplySender for SubChannel { + async fn send(&self, data: &[&[u8]]) { + if let Err(err) = SubChannel::send(self, data).await { + error!("Failed to send FUSE reply: {}", err); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 609830e5..7b070efe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,7 @@ use std::cmp::min; mod channel; mod fuse_abi; mod fuse_sys; +mod io_ops; mod ll; mod mount_options; mod reply; diff --git a/src/request.rs b/src/request.rs index ac6e772e..6595cdc5 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,15 +5,14 @@ //! //! TODO: This module is meant to go away soon in favor of `ll::Request`. -use crate::fuse_abi::*; use crate::{fuse_abi::consts::*, session::ActiveSession}; +use crate::{fuse_abi::*, io_ops::SubChannel}; use libc::{EIO, ENOSYS, EPROTO}; use log::{debug, error, warn}; use std::path::Path; use std::time::{Duration, SystemTime}; use std::{convert::TryFrom, sync::Arc}; -use crate::channel::ChannelSender; #[cfg(feature = "abi-7-21")] use crate::reply::ReplyDirectoryPlus; use crate::reply::{Reply, ReplyDirectory, ReplyEmpty, ReplyRaw}; @@ -57,7 +56,7 @@ impl<'a> Request<'a> { &self, se: &Arc, filesystem: &Arc, - ch: ChannelSender, + ch: SubChannel, ) { debug!("{}", self.request); match self.request.operation() { @@ -68,8 +67,7 @@ impl<'a> Request<'a> { if arg.major < 7 || (arg.major == 7 && arg.minor < 6) { error!("Unsupported FUSE ABI version {}.{}", arg.major, arg.minor); reply.error(EPROTO).await; - se.destroyed - .store(true, std::sync::atomic::Ordering::Relaxed); + se.destroy().await; return; } @@ -83,8 +81,7 @@ impl<'a> Request<'a> { let res = filesystem.init(self, &mut config).await; if let Err(err) = res { reply.error(err).await; - se.destroyed - .store(true, std::sync::atomic::Ordering::Relaxed); + se.destroy().await; return; } // Reply with our desired version and settings. If the kernel supports a @@ -136,7 +133,7 @@ impl<'a> Request<'a> { &self, active_session: &Arc, filesystem: Arc, - ch: ChannelSender, + ch: SubChannel, ) -> std::io::Result<()> { debug!("{}", self.request); @@ -821,10 +818,7 @@ impl<'a> Request<'a> { } ll::Operation::Destroy => { - active_session - .destroyed - .store(true, std::sync::atomic::Ordering::Relaxed); - + active_session.destroy().await; self.reply::(&ch).ok().await; } } @@ -833,7 +827,7 @@ impl<'a> Request<'a> { /// Create a reply object for this request that can be passed to the filesystem /// implementation and makes sure that a request is replied exactly once - fn reply(&self, ch: &ChannelSender) -> T { + fn reply(&self, ch: &SubChannel) -> T { Reply::new(self.request.unique(), ch.clone()) } diff --git a/src/session.rs b/src/session.rs index 1c68db51..ac1179d5 100644 --- a/src/session.rs +++ b/src/session.rs @@ -5,7 +5,6 @@ //! filesystem is mounted, the session loop receives, dispatches and replies to kernel requests //! for filesystem operations under its mount point. -use channel::{ChannelSender, FileDescriptorRawHandle}; use futures::future::join_all; use libc::{EAGAIN, EINTR, ENODEV, ENOENT}; use log::{error, info, warn}; @@ -17,13 +16,16 @@ use std::{ io, sync::{atomic::AtomicBool, Arc}, }; -use tokio::{io::unix::AsyncFd, sync::Mutex, task::JoinHandle}; +use tokio::{sync::Mutex, task::JoinHandle}; -use crate::channel::{self, Channel}; use crate::request::Request; use crate::Filesystem; #[cfg(not(feature = "libfuse"))] use crate::MountOption; +use crate::{ + channel::{self, Channel}, + io_ops::SubChannel, +}; /// The max size of write requests from the kernel. The absolute minimum is 4k, /// FUSE recommends at least 128k, max 16M. The FUSE default is 16M on macOS @@ -56,15 +58,43 @@ pub(crate) struct ActiveSession { /// True if the filesystem is initialized (init operation done) pub initialized: AtomicBool, /// True if the filesystem was destroyed (destroy operation done) - pub destroyed: AtomicBool, + is_destroyed: AtomicBool, + /// Pipes to inform all of the child channels/interested parties we are shutting down + pub destroy_signals: Arc>>>, } +impl ActiveSession { + pub(in crate::session) async fn register_destroy( + &self, + sender: tokio::sync::oneshot::Sender<()>, + ) { + let mut guard = self.destroy_signals.lock().await; + guard.push(sender) + } + + pub(in crate) fn destroyed(&self) -> bool { + self.is_destroyed.load(std::sync::atomic::Ordering::Relaxed) + } + + pub(in crate) async fn destroy(&self) { + self.is_destroyed + .store(true, std::sync::atomic::Ordering::SeqCst); + let mut guard = self.destroy_signals.lock().await; + + for e in guard.drain(..) { + if let Err(e) = e.send(()) { + warn!("Unable to send a shutdown signal: {:?}", e); + } + } + } +} impl Default for ActiveSession { fn default() -> Self { Self { session_configuration: Arc::new(Mutex::new(Default::default())), initialized: AtomicBool::new(false), - destroyed: AtomicBool::new(false), + is_destroyed: AtomicBool::new(false), + destroy_signals: Arc::new(Mutex::new(Vec::default())), } } } @@ -78,8 +108,8 @@ impl Session { mountpoint: &Path, options: &[&OsStr], ) -> io::Result> { - info!("Mounting {}", mountpoint.display()); - Channel::new(mountpoint, worker_channel_count, options).map(|ch| Session { filesystem, ch }) + let ch = Channel::new(mountpoint, worker_channel_count, options)?; + Ok(Session { filesystem, ch }) } /// Create a new session by mounting the given filesystem to the given mountpoint @@ -101,10 +131,11 @@ impl Session { } async fn read_single_request<'a, 'b>( - ch: &Arc>, + ch: &SubChannel, + terminated: &mut tokio::sync::oneshot::Receiver<()>, buffer: &'b mut Vec, ) -> Option>> { - match Channel::receive(ch, buffer).await { + match Channel::receive(ch, terminated, buffer).await { Err(err) => match err.raw_os_error() { // Operation interrupted. Accordingly to FUSE, this is safe to retry Some(ENOENT) => None, @@ -130,23 +161,23 @@ impl Session { async fn main_request_loop( active_session: &Arc, - ch: &Arc>, + ch: &SubChannel, + terminated: &mut tokio::sync::oneshot::Receiver<()>, filesystem: &Arc, _worker_idx: usize, ) -> io::Result<()> { let mut buffer: Vec = Vec::with_capacity(BUFFER_SIZE); - let sender = ChannelSender { fd: ch.clone() }; + let sender = ch.clone(); loop { - if active_session - .destroyed - .load(std::sync::atomic::Ordering::Relaxed) - { + if active_session.destroyed() { return Ok(()); } - if let Some(req_or_err) = Session::::read_single_request(&ch, &mut buffer).await { + if let Some(req_or_err) = + Session::::read_single_request(&ch, terminated, &mut buffer).await + { let req = req_or_err?; let filesystem = filesystem.clone(); let sender = sender.clone(); @@ -167,21 +198,21 @@ impl Session { /// after a different channel got the init we will need to process that as if we were in the main loop. async fn wait_for_init( active_session: &Arc, - ch: &Arc>, + ch: &SubChannel, + terminated: &mut tokio::sync::oneshot::Receiver<()>, filesystem: &Arc, ) -> io::Result<()> { - let sender = ChannelSender { fd: ch.clone() }; + let sender = ch.clone(); loop { let mut buffer: Vec = Vec::with_capacity(BUFFER_SIZE); - if active_session - .destroyed - .load(std::sync::atomic::Ordering::Relaxed) - { + if active_session.destroyed() { return Ok(()); } - if let Some(req_or_err) = Session::::read_single_request(&ch, &mut buffer).await { + if let Some(req_or_err) = + Session::::read_single_request(&ch, terminated, &mut buffer).await + { let req = req_or_err?; if !active_session .initialized @@ -213,12 +244,20 @@ impl Session { pub(crate) async fn spawn_worker_loop( active_session: Arc, - ch: Arc>, + ch: SubChannel, + mut terminated: tokio::sync::oneshot::Receiver<()>, filesystem: Arc, worker_idx: usize, ) -> io::Result<()> { - Session::wait_for_init(&active_session, &ch, &filesystem).await?; - Session::main_request_loop(&active_session, &ch, &filesystem, worker_idx).await + Session::wait_for_init(&active_session, &ch, &mut terminated, &filesystem).await?; + Session::main_request_loop( + &active_session, + &ch, + &mut terminated, + &filesystem, + worker_idx, + ) + .await } async fn driver_evt_loop( @@ -227,15 +266,12 @@ impl Session { mut filesystem: Arc, channel: Channel, ) -> io::Result<()> { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2)); loop { interval.tick().await; - if active_session - .destroyed - .load(std::sync::atomic::Ordering::Relaxed) - { + if active_session.destroyed() { loop { if let Some(fs) = Arc::get_mut(&mut filesystem) { fs.destroy(); @@ -251,6 +287,7 @@ impl Session { } } } + /// Run the session loop that receives kernel requests and dispatches them to method /// calls into the filesystem. This spawns as a task in tokio returning that task pub async fn spawn_run(self) -> io::Result>> { @@ -262,20 +299,20 @@ impl Session { let active_session = Arc::new(ActiveSession::default()); let filesystem = Arc::new(filesystem); - let communication_channels = channel.async_channels()?; - let mut join_handles: Vec>> = Vec::default(); - for (idx, ch) in communication_channels.iter().enumerate() { - let ch = Arc::clone(&ch); + for (idx, ch) in channel.sub_channels.iter().enumerate() { + let ch = ch.clone(); let active_session = Arc::clone(&active_session); let filesystem = Arc::clone(&filesystem); let finalizer_active_session = active_session.clone(); + let (sender, receiver) = tokio::sync::oneshot::channel(); + + active_session.register_destroy(sender).await; join_handles.push(tokio::spawn(async move { - let r = Session::spawn_worker_loop(active_session, ch, filesystem, idx).await; + let r = + Session::spawn_worker_loop(active_session, ch, receiver, filesystem, idx).await; // once any worker finishes/exits, then then the entire session shout be shut down. - finalizer_active_session - .destroyed - .store(true, std::sync::atomic::Ordering::Relaxed); + finalizer_active_session.destroy().await; r })); } @@ -309,7 +346,7 @@ pub struct BackgroundSession { /// Thread guard of the background session pub guard: JoinHandle>, fuse_session: *mut libc::c_void, - fd: FileDescriptorRawHandle, + fd: crate::io_ops::FileDescriptorRawHandle, } impl BackgroundSession {