diff --git a/README.md b/README.md index f33c1f0..167407b 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,41 @@ fn main() { } ``` +libublk also supports async/await for handling IO command from linux ublk +driver, and it just takes async `q_handler`, such as: + +``` rust +async fn null_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) { + let iod = q.get_iod(tag); + let bytes = unsafe { (*iod).nr_sectors << 9 } as i32; + + q.complete_io_cmd(tag, Ok(UblkIORes::Result(bytes))); +} +let q_handler = move |qid: u16, _dev: &UblkDev| { + let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap()); + let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios())); + let q = q_rc.clone(); + let exe = exe_rc.clone(); + + let io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| { + let q = q_rc.clone(); + let data = io.user_data(); + + exe.spawn(tag as u16, async move { + null_handle_io_cmd(&q, tag, data).await; + }); + }; + q.wait_and_handle_io_cmd(&exe_rc, io_handler); +} +``` + +Device wide data can be shared in each queue/io handler by +Arc::new(Mutex::new(Data)) and the queue handler closure supports Clone(), +see [`test_ublk_null_async():tests/basic.rs`](tests/basic.rs) + +Queue wide data is per-thread and can be shared in io handler by +Rc() & RefCell(). + ## Test You can run the test of the library with ```cargo test``` diff --git a/src/io.rs b/src/io.rs index b460049..406063e 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,6 +1,6 @@ #[cfg(feature = "fat_complete")] use super::UblkFatRes; -use super::{ctrl::UblkCtrl, sys, UblkError, UblkIORes}; +use super::{ctrl::UblkCtrl, exe::Executor, sys, UblkError, UblkIORes}; use io_uring::{cqueue, opcode, squeue, types, IoUring}; use log::{error, info, trace}; use serde::{Deserialize, Serialize}; @@ -123,6 +123,21 @@ impl<'a> UblkIOCtx<'a> { tag as u64 | (op << 16) as u64 | (tgt_data << 24) as u64 | ((is_target_io as u64) << 63) } + /// Build userdata for async io_uring OP + /// + /// # Arguments: + /// * `tag`: io tag, length is 16bit + /// * `op`: io operation code, length is 8bit + /// * `op_id`: unique id in io task + /// + /// The built userdata has to be unique in this io task, so that + /// our executor can figure out the exact submitted OP with + /// completed cqe + #[inline(always)] + pub fn build_user_data_async(tag: u16, op: u32, op_id: u32) -> u64 { + Self::build_user_data(tag, op, op_id, true) + } + /// Extract tag from userdata #[inline(always)] pub fn user_data_to_tag(user_data: u64) -> u32 { @@ -140,6 +155,13 @@ impl<'a> UblkIOCtx<'a> { fn is_target_io(user_data: u64) -> bool { (user_data & (1_u64 << 63)) != 0 } + + /// Check if this userdata is from IO command which is from + /// ublk driver + #[inline(always)] + fn is_io_command(user_data: u64) -> bool { + (user_data & (1_u64 << 63)) == 0 + } } #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -267,6 +289,13 @@ impl UblkDev { ..Default::default() }; } + + /// Return how many io slots, which is usually same with executor's + /// nr_tasks. + #[inline] + pub fn get_nr_ios(&self) -> u16 { + self.dev_info.queue_depth + self.tgt.extra_ios as u16 + } } impl Drop for UblkDev { @@ -925,6 +954,72 @@ impl UblkQueue<'_> { } } + pub(crate) fn process_io_cmds( + &self, + exe: &Executor, + mut ops: F, + to_wait: usize, + ) -> Result + where + F: FnMut(&UblkQueue, u16, &UblkIOCtx), + { + match self.wait_ios(to_wait) { + Err(r) => Err(r), + Ok(done) => { + for idx in 0..done { + let cqe = { + match self.q_ring.borrow_mut().completion().next() { + None => return Err(UblkError::OtherError(-libc::EINVAL)), + Some(r) => r, + } + }; + + let e = UblkIOCtx( + &cqe, + if idx == 0 { + UblkIOCtx::UBLK_IO_F_FIRST + } else { + 0 + } | if idx + 1 == done { + UblkIOCtx::UBLK_IO_F_LAST + } else { + 0 + }, + ); + + let data = e.user_data(); + let res = e.result(); + let tag = UblkIOCtx::user_data_to_tag(data); + + { + let cmd_op = UblkIOCtx::user_data_to_op(data); + trace!( + "{}: res {} (qid {} tag {} cmd_op {} target {}) state {:?}", + "handle_cqe", + res, + self.q_id, + tag, + cmd_op, + UblkIOCtx::is_target_io(data), + self.state.borrow(), + ); + } + if UblkIOCtx::is_io_command(data) { + self.update_state(e.0); + + if res == sys::UBLK_IO_RES_OK as i32 { + assert!(tag < self.q_depth); + ops(self, tag as u16, &e); + } + } else { + exe.wake_with_uring_cqe(tag as u16, &cqe); + } + } + Ok(0) + } + } + } + /// Wait and handle incoming IO /// /// # Arguments: @@ -945,4 +1040,25 @@ impl UblkQueue<'_> { } } } + + /// Wait and handle incoming IO command + /// + /// # Arguments: + /// + /// * `ops`: IO handling closure + /// + /// Called in queue context. won't return unless error is observed. + /// Wait and handle any incoming cqe until queue is down. + /// + pub fn wait_and_handle_io_cmd(&self, exe: &Executor, mut ops: F) + where + F: FnMut(&UblkQueue, u16, &UblkIOCtx), + { + loop { + match self.process_io_cmds(exe, &mut ops, 1) { + Err(_) => break, + _ => continue, + } + } + } } diff --git a/tests/basic.rs b/tests/basic.rs index 4f01f47..a61df6d 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -1,12 +1,16 @@ #[cfg(test)] mod integration { + use io_uring::opcode; use libublk::dev_flags::*; + use libublk::exe::{Executor, UringOpFuture}; use libublk::io::{UblkDev, UblkIOCtx, UblkQueue}; use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes}; use libublk::{sys, UblkSessionBuilder}; use std::env; use std::path::Path; use std::process::{Command, Stdio}; + use std::rc::Rc; + use std::sync::{Arc, Mutex}; fn read_ublk_disk(dev_id: i32) { let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id); @@ -20,6 +24,25 @@ mod integration { println!("{:?}", Command::new("dd").args(arg_list).output().unwrap()); } + fn run_ublk_disk_test(dev_id: i32, dev_flags: u32) { + let mut ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap(); + let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + assert!(ctrl.get_target_flags_from_json().unwrap() == dev_flags); + + //ublk block device should be observed now + assert!(Path::new(&dev_path).exists() == true); + + //ublk exported json file should be observed + assert!(Path::new(&ctrl.run_path()).exists() == true); + + read_ublk_disk(dev_id); + + ctrl.del().unwrap(); + } + fn __test_ublk_null(dev_flags: u32, q_handler: fn(u16, &UblkDev)) { let sess = UblkSessionBuilder::default() .name("null") @@ -42,22 +65,7 @@ mod integration { }; sess.run_target(&mut ctrl, &dev, q_fn, move |dev_id| { - let mut ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap(); - let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id); - - std::thread::sleep(std::time::Duration::from_millis(500)); - - assert!(ctrl.get_target_flags_from_json().unwrap() == dev_flags); - - //ublk block device should be observed now - assert!(Path::new(&dev_path).exists() == true); - - //ublk exported json file should be observed - assert!(Path::new(&ctrl.run_path()).exists() == true); - - read_ublk_disk(dev_id); - - ctrl.del().unwrap(); + run_ublk_disk_test(dev_id, dev_flags); }) .unwrap() }; @@ -108,6 +116,103 @@ mod integration { ); } + /// make one async ublk-null and test if /dev/ublkbN can be created successfully + #[test] + fn test_ublk_null_async() { + fn null_submit_nop(q: &UblkQueue<'_>, user_data: u64) -> UringOpFuture { + let nop_e = opcode::Nop::new().build().user_data(user_data); + + unsafe { + q.q_ring + .borrow_mut() + .submission() + .push(&nop_e) + .expect("submission fail"); + }; + UringOpFuture { user_data } + } + + async fn null_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) { + let _iod = q.get_iod(tag); + let iod = unsafe { &*_iod }; + let bytes = (iod.nr_sectors << 9) as i32; + let op = iod.op_flags & 0xff; + let data = UblkIOCtx::build_user_data_async(tag, op, 0); + let data2 = UblkIOCtx::build_user_data_async(tag, op, 1); + + //simulate our io command by joining two io_uring nops + let f = null_submit_nop(q, data); + let f2 = null_submit_nop(q, data2); + let (res, res2) = futures::join!(f, f2); + assert!(res == 0 && res2 == 0); + + q.complete_io_cmd(tag, Ok(UblkIORes::Result(bytes))); + } + + struct DevData { + spawned: i32, + } + + let sess = libublk::UblkSessionBuilder::default() + .name("null") + .nr_queues(2_u16) + .depth(4_u16) + .id(-1) + .dev_flags(UBLK_DEV_F_ADD_DEV) + .build() + .unwrap(); + + let tgt_init = |dev: &mut UblkDev| { + dev.set_default_params(250_u64 << 30); + Ok(serde_json::json!({})) + }; + + // device data is shared among all queue contexts + let dev_data = Arc::new(Mutex::new(DevData { spawned: 0 })); + let saved_dev_data_addr = Arc::new(Mutex::new({ + std::ptr::addr_of!(*(dev_data.lock().unwrap())) as u64 + })); + + let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap(); + let q_fn = move |qid: u16, _dev: &UblkDev| { + let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap()); + let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios())); + let q = q_rc.clone(); + let exe = exe_rc.clone(); + + // @q_fn closure implements Clone() Trait, so the captured + // @dev_data is cloned to @q_fn context. + let _dev_data = Rc::new(dev_data); + + // make sure that all queues see same device data + let _dev_addr = Rc::new(saved_dev_data_addr); + + let io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| { + let q = q_rc.clone(); + let data = io.user_data(); + let __dev_data = _dev_data.clone(); + let __dev_addr = _dev_addr.clone(); + + exe.spawn(tag as u16, async move { + { + let mut guard = __dev_data.lock().unwrap(); + (*guard).spawned += 1; + + let guard_addr = __dev_addr.lock().unwrap(); + assert!(*guard_addr == std::ptr::addr_of!(*guard) as u64); + } + null_handle_io_cmd(&q, tag, data).await; + }); + }; + q.wait_and_handle_io_cmd(&exe_rc, io_handler); + }; + + sess.run_target(&mut ctrl, &dev, q_fn, |dev_id| { + run_ublk_disk_test(dev_id, UBLK_DEV_F_ADD_DEV); + }) + .unwrap(); + } + /// make one ublk-ramdisk and test: /// - if /dev/ublkbN can be created successfully /// - if yes, then test format/mount/umount over this ublk-ramdisk