Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status of async support #188

Open
cbrewster opened this issue Mar 23, 2024 · 1 comment
Open

Status of async support #188

cbrewster opened this issue Mar 23, 2024 · 1 comment

Comments

@cbrewster
Copy link
Contributor

Hello, I am working on a virtual filesystem using fuse-backend-rs for tvix, a Rust implementation of Nix. Most of the underlying filesystem is built on top of async Rust + tokio, so it would be great to be able to use the AsyncFileSystem trait. I've tried doing this but I've hit some issues figuring out how to drive the filesystem with both FUSE and virtiofs.

For FUSE it looks like there is some code to support driving async FUSE tasks:

#[cfg(feature = "async_io")]
pub use asyncio::FuseDevTask;
#[cfg(feature = "async_io")]
/// Task context to handle fuse request in asynchronous mode.
mod asyncio {
use std::os::unix::io::RawFd;
use std::sync::Arc;
use crate::api::filesystem::AsyncFileSystem;
use crate::api::server::Server;
use crate::transport::{FuseBuf, Reader, Writer};
/// Task context to handle fuse request in asynchronous mode.
///
/// This structure provides a context to handle fuse request in asynchronous mode, including
/// the fuse fd, a internal buffer and a `Server` instance to serve requests.
///
/// ## Examples
/// ```ignore
/// let buf_size = 0x1_0000;
/// let state = AsyncExecutorState::new();
/// let mut task = FuseDevTask::new(buf_size, fuse_dev_fd, fs_server, state.clone());
///
/// // Run the task
/// executor.spawn(async move { task.poll_handler().await });
///
/// // Stop the task
/// state.quiesce();
/// ```
pub struct FuseDevTask<F: AsyncFileSystem + Sync> {
fd: RawFd,
buf: Vec<u8>,
state: AsyncExecutorState,
server: Arc<Server<F>>,
}
impl<F: AsyncFileSystem + Sync> FuseDevTask<F> {
/// Create a new fuse task context for asynchronous IO.
///
/// # Parameters
/// - buf_size: size of buffer to receive requests from/send reply to the fuse fd
/// - fd: fuse device file descriptor
/// - server: `Server` instance to serve requests from the fuse fd
/// - state: shared state object to control the task object
///
/// # Safety
/// The caller must ensure `fd` is valid during the lifetime of the returned task object.
pub fn new(
buf_size: usize,
fd: RawFd,
server: Arc<Server<F>>,
state: AsyncExecutorState,
) -> Self {
FuseDevTask {
fd,
server,
state,
buf: vec![0x0u8; buf_size],
}
}
/// Handler to process fuse requests in asynchronous mode.
///
/// An async fn to handle requests from the fuse fd. It works in asynchronous IO mode when:
/// - receiving request from fuse fd
/// - handling requests by calling Server::async_handle_requests()
/// - sending reply to fuse fd
///
/// The async fn repeatedly return Poll::Pending when polled until the state has been set
/// to quiesce mode.
pub async fn poll_handler(&mut self) {
// TODO: register self.buf as io uring buffers.
let drive = AsyncDriver::default();
while !self.state.quiescing() {
let result = AsyncUtil::read(drive.clone(), self.fd, &mut self.buf, 0).await;
match result {
Ok(len) => {
// ###############################################
// Note: it's a heavy hack to reuse the same underlying data
// buffer for both Reader and Writer, in order to reduce memory
// consumption. Here we assume Reader won't be used anymore once
// we start to write to the Writer. To get rid of this hack,
// just allocate a dedicated data buffer for Writer.
let buf = unsafe {
std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
};
// Reader::new() and Writer::new() should always return success.
let reader =
Reader::<()>::new(FuseBuf::new(&mut self.buf[0..len])).unwrap();
let writer = Writer::new(self.fd, buf).unwrap();
let result = unsafe {
self.server
.async_handle_message(drive.clone(), reader, writer, None, None)
.await
};
if let Err(e) = result {
// TODO: error handling
error!("failed to handle fuse request, {}", e);
}
}
Err(e) => {
// TODO: error handling
error!("failed to read request from fuse device fd, {}", e);
}
}
}
// TODO: unregister self.buf as io uring buffers.
// Report that the task has been quiesced.
self.state.report();
}
}
impl<F: AsyncFileSystem + Sync> Clone for FuseDevTask<F> {
fn clone(&self) -> Self {
FuseDevTask {
fd: self.fd,
server: self.server.clone(),
state: self.state.clone(),
buf: vec![0x0u8; self.buf.capacity()],
}
}
}
#[cfg(test)]
mod tests {
use std::os::unix::io::AsRawFd;
use super::*;
use crate::api::{Vfs, VfsOptions};
use crate::async_util::{AsyncDriver, AsyncExecutor};
#[test]
fn test_fuse_task() {
let state = AsyncExecutorState::new();
let fs = Vfs::<AsyncDriver, ()>::new(VfsOptions::default());
let _server = Arc::new(Server::<Vfs<AsyncDriver, ()>, AsyncDriver, ()>::new(fs));
let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
let _fd = file.as_file().as_raw_fd();
let mut executor = AsyncExecutor::new(32);
executor.setup().unwrap();
/*
// Create three tasks, which could handle three concurrent fuse requests.
let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
executor
.spawn(async move { task.poll_handler().await })
.unwrap();
let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
executor
.spawn(async move { task.poll_handler().await })
.unwrap();
let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
executor
.spawn(async move { task.poll_handler().await })
.unwrap();
*/
for _i in 0..10 {
executor.run_once(false).unwrap();
}
// Set existing flag
state.quiesce();
// Close the fusedev fd, so all pending async io requests will be aborted.
drop(file);
for _i in 0..10 {
executor.run_once(false).unwrap();
}
}
}
}

However, this code is behind the async_io feature flag, even though the real feature flag is async-io. The code here also seems to refer to things that have been deleted like use crate::async_util::{AsyncDriver, AsyncExecutor}.

I was wondering if async is something that is supported or if its currently in a broken state and needs some more help to become functional again?

@PlamenHristov
Copy link

That one is not a type. It has been disabled intentionally, since there are some compilation issues (from what I remember). Once I have some more time in the coming weeks, I'll open a PR to fix them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants