Skip to content

Commit

Permalink
libublk: io: support to handle io command by async/await
Browse files Browse the repository at this point in the history
Now it is ready to write target io command handling code by async/await.

Add UblkQueue::wait_and_handle_io_cmd() for handle each io command, and
run async executor for driving each io task.

Add UblkDev::get_nr_ios() for creating Executor.

Add UblkIOCtx::build_user_data_async() for building UringOpFuture().

Add one test for verifying async/await to handle one null io command
by submitting two io_uring nop Op, then wait both by futures::join!(f,
f2). In this test, device wide data read/write is also added.

Signed-off-by: Ming Lei <tom.leiming@gmail.com>
  • Loading branch information
ming1 committed Oct 16, 2023
1 parent 629a1b3 commit ce326ff
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 17 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,41 @@ fn main() {
}
```

libublk also supports async/await for handling IO command from linux ublk
driver, and it just takes async `q_handler`, such as:

``` rust
async fn null_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) {
let iod = q.get_iod(tag);
let bytes = unsafe { (*iod).nr_sectors << 9 } as i32;

q.complete_io_cmd(tag, Ok(UblkIORes::Result(bytes)));
}
let q_handler = move |qid: u16, _dev: &UblkDev| {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap());
let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios()));
let q = q_rc.clone();
let exe = exe_rc.clone();

let io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
let q = q_rc.clone();
let data = io.user_data();

exe.spawn(tag as u16, async move {
null_handle_io_cmd(&q, tag, data).await;
});
};
q.wait_and_handle_io_cmd(&exe_rc, io_handler);
}
```

Device wide data can be shared in each queue/io handler by
Arc::new(Mutex::new(Data)) and the queue handler closure supports Clone(),
see [`test_ublk_null_async():tests/basic.rs`](tests/basic.rs)

Queue wide data is per-thread and can be shared in io handler by
Rc() & RefCell().

## Test

You can run the test of the library with ```cargo test```
Expand Down
118 changes: 117 additions & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "fat_complete")]
use super::UblkFatRes;
use super::{ctrl::UblkCtrl, sys, UblkError, UblkIORes};
use super::{ctrl::UblkCtrl, exe::Executor, sys, UblkError, UblkIORes};
use io_uring::{cqueue, opcode, squeue, types, IoUring};
use log::{error, info, trace};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -123,6 +123,21 @@ impl<'a> UblkIOCtx<'a> {
tag as u64 | (op << 16) as u64 | (tgt_data << 24) as u64 | ((is_target_io as u64) << 63)
}

/// Build userdata for async io_uring OP
///
/// # Arguments:
/// * `tag`: io tag, length is 16bit
/// * `op`: io operation code, length is 8bit
/// * `op_id`: unique id in io task
///
/// The built userdata has to be unique in this io task, so that
/// our executor can figure out the exact submitted OP with
/// completed cqe
#[inline(always)]
pub fn build_user_data_async(tag: u16, op: u32, op_id: u32) -> u64 {
Self::build_user_data(tag, op, op_id, true)
}

/// Extract tag from userdata
#[inline(always)]
pub fn user_data_to_tag(user_data: u64) -> u32 {
Expand All @@ -140,6 +155,13 @@ impl<'a> UblkIOCtx<'a> {
fn is_target_io(user_data: u64) -> bool {
(user_data & (1_u64 << 63)) != 0
}

/// Check if this userdata is from IO command which is from
/// ublk driver
#[inline(always)]
fn is_io_command(user_data: u64) -> bool {
(user_data & (1_u64 << 63)) == 0
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -267,6 +289,13 @@ impl UblkDev {
..Default::default()
};
}

/// Return how many io slots, which is usually same with executor's
/// nr_tasks.
#[inline]
pub fn get_nr_ios(&self) -> u16 {
self.dev_info.queue_depth + self.tgt.extra_ios as u16
}
}

impl Drop for UblkDev {
Expand Down Expand Up @@ -925,6 +954,72 @@ impl UblkQueue<'_> {
}
}

pub(crate) fn process_io_cmds<F>(
&self,
exe: &Executor,
mut ops: F,
to_wait: usize,
) -> Result<i32, UblkError>
where
F: FnMut(&UblkQueue, u16, &UblkIOCtx),
{
match self.wait_ios(to_wait) {
Err(r) => Err(r),
Ok(done) => {
for idx in 0..done {
let cqe = {
match self.q_ring.borrow_mut().completion().next() {
None => return Err(UblkError::OtherError(-libc::EINVAL)),
Some(r) => r,
}
};

let e = UblkIOCtx(
&cqe,
if idx == 0 {
UblkIOCtx::UBLK_IO_F_FIRST
} else {
0
} | if idx + 1 == done {
UblkIOCtx::UBLK_IO_F_LAST
} else {
0
},
);

let data = e.user_data();
let res = e.result();
let tag = UblkIOCtx::user_data_to_tag(data);

{
let cmd_op = UblkIOCtx::user_data_to_op(data);
trace!(
"{}: res {} (qid {} tag {} cmd_op {} target {}) state {:?}",
"handle_cqe",
res,
self.q_id,
tag,
cmd_op,
UblkIOCtx::is_target_io(data),
self.state.borrow(),
);
}
if UblkIOCtx::is_io_command(data) {
self.update_state(e.0);

if res == sys::UBLK_IO_RES_OK as i32 {
assert!(tag < self.q_depth);
ops(self, tag as u16, &e);
}
} else {
exe.wake_with_uring_cqe(tag as u16, &cqe);
}
}
Ok(0)
}
}
}

/// Wait and handle incoming IO
///
/// # Arguments:
Expand All @@ -945,4 +1040,25 @@ impl UblkQueue<'_> {
}
}
}

/// Wait and handle incoming IO command
///
/// # Arguments:
///
/// * `ops`: IO handling closure
///
/// Called in queue context. won't return unless error is observed.
/// Wait and handle any incoming cqe until queue is down.
///
pub fn wait_and_handle_io_cmd<F>(&self, exe: &Executor, mut ops: F)
where
F: FnMut(&UblkQueue, u16, &UblkIOCtx),
{
loop {
match self.process_io_cmds(exe, &mut ops, 1) {
Err(_) => break,
_ => continue,
}
}
}
}
137 changes: 121 additions & 16 deletions tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#[cfg(test)]
mod integration {
use io_uring::opcode;
use libublk::dev_flags::*;
use libublk::exe::{Executor, UringOpFuture};
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes};
use libublk::{sys, UblkSessionBuilder};
use std::env;
use std::path::Path;
use std::process::{Command, Stdio};
use std::rc::Rc;
use std::sync::{Arc, Mutex};

fn read_ublk_disk(dev_id: i32) {
let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id);
Expand All @@ -20,6 +24,25 @@ mod integration {
println!("{:?}", Command::new("dd").args(arg_list).output().unwrap());
}

fn run_ublk_disk_test(dev_id: i32, dev_flags: u32) {
let mut ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap();
let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id);

std::thread::sleep(std::time::Duration::from_millis(500));

assert!(ctrl.get_target_flags_from_json().unwrap() == dev_flags);

//ublk block device should be observed now
assert!(Path::new(&dev_path).exists() == true);

//ublk exported json file should be observed
assert!(Path::new(&ctrl.run_path()).exists() == true);

read_ublk_disk(dev_id);

ctrl.del().unwrap();
}

fn __test_ublk_null(dev_flags: u32, q_handler: fn(u16, &UblkDev)) {
let sess = UblkSessionBuilder::default()
.name("null")
Expand All @@ -42,22 +65,7 @@ mod integration {
};

sess.run_target(&mut ctrl, &dev, q_fn, move |dev_id| {
let mut ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap();
let dev_path = format!("{}{}", libublk::BDEV_PATH, dev_id);

std::thread::sleep(std::time::Duration::from_millis(500));

assert!(ctrl.get_target_flags_from_json().unwrap() == dev_flags);

//ublk block device should be observed now
assert!(Path::new(&dev_path).exists() == true);

//ublk exported json file should be observed
assert!(Path::new(&ctrl.run_path()).exists() == true);

read_ublk_disk(dev_id);

ctrl.del().unwrap();
run_ublk_disk_test(dev_id, dev_flags);
})
.unwrap()
};
Expand Down Expand Up @@ -108,6 +116,103 @@ mod integration {
);
}

/// make one async ublk-null and test if /dev/ublkbN can be created successfully
#[test]
fn test_ublk_null_async() {
fn null_submit_nop(q: &UblkQueue<'_>, user_data: u64) -> UringOpFuture {
let nop_e = opcode::Nop::new().build().user_data(user_data);

unsafe {
q.q_ring
.borrow_mut()
.submission()
.push(&nop_e)
.expect("submission fail");
};
UringOpFuture { user_data }
}

async fn null_handle_io_cmd(q: &UblkQueue<'_>, tag: u16, _data: u64) {
let _iod = q.get_iod(tag);
let iod = unsafe { &*_iod };
let bytes = (iod.nr_sectors << 9) as i32;
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data_async(tag, op, 0);
let data2 = UblkIOCtx::build_user_data_async(tag, op, 1);

//simulate our io command by joining two io_uring nops
let f = null_submit_nop(q, data);
let f2 = null_submit_nop(q, data2);
let (res, res2) = futures::join!(f, f2);
assert!(res == 0 && res2 == 0);

q.complete_io_cmd(tag, Ok(UblkIORes::Result(bytes)));
}

struct DevData {
spawned: i32,
}

let sess = libublk::UblkSessionBuilder::default()
.name("null")
.nr_queues(2_u16)
.depth(4_u16)
.id(-1)
.dev_flags(UBLK_DEV_F_ADD_DEV)
.build()
.unwrap();

let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(250_u64 << 30);
Ok(serde_json::json!({}))
};

// device data is shared among all queue contexts
let dev_data = Arc::new(Mutex::new(DevData { spawned: 0 }));
let saved_dev_data_addr = Arc::new(Mutex::new({
std::ptr::addr_of!(*(dev_data.lock().unwrap())) as u64
}));

let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap();
let q_fn = move |qid: u16, _dev: &UblkDev| {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &_dev).unwrap());
let exe_rc = Rc::new(Executor::new(_dev.get_nr_ios()));
let q = q_rc.clone();
let exe = exe_rc.clone();

// @q_fn closure implements Clone() Trait, so the captured
// @dev_data is cloned to @q_fn context.
let _dev_data = Rc::new(dev_data);

// make sure that all queues see same device data
let _dev_addr = Rc::new(saved_dev_data_addr);

let io_handler = move |_q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
let q = q_rc.clone();
let data = io.user_data();
let __dev_data = _dev_data.clone();
let __dev_addr = _dev_addr.clone();

exe.spawn(tag as u16, async move {
{
let mut guard = __dev_data.lock().unwrap();
(*guard).spawned += 1;

let guard_addr = __dev_addr.lock().unwrap();
assert!(*guard_addr == std::ptr::addr_of!(*guard) as u64);
}
null_handle_io_cmd(&q, tag, data).await;
});
};
q.wait_and_handle_io_cmd(&exe_rc, io_handler);
};

sess.run_target(&mut ctrl, &dev, q_fn, |dev_id| {
run_ublk_disk_test(dev_id, UBLK_DEV_F_ADD_DEV);
})
.unwrap();
}

/// make one ublk-ramdisk and test:
/// - if /dev/ublkbN can be created successfully
/// - if yes, then test format/mount/umount over this ublk-ramdisk
Expand Down

0 comments on commit ce326ff

Please sign in to comment.