From 9e5e537a4aaa1b66f430c53be039ed06a43b6d60 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Fri, 13 Oct 2023 16:12:41 +0000 Subject: [PATCH] examples/loop: handle io command by async/await async/await can handling io handle in async style, meantime the written Rust code can be just like sync programming. This way is very helpful for writing complicated ublk target. Signed-off-by: Ming Lei --- README.md | 3 + examples/loop.rs | 145 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 122 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index b1dfd08..cf32022 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,9 @@ installed under /usr/local/sbin or other directory which has to match with the udev rules. + * [`examples/loop.rs`](examples/loop.rs): the whole example using async/await + + ## Test You can run the test of the library with ```cargo test``` diff --git a/examples/loop.rs b/examples/loop.rs index b3ac35d..1ba26f2 100644 --- a/examples/loop.rs +++ b/examples/loop.rs @@ -3,10 +3,13 @@ use clap::{Arg, ArgAction, Command}; use io_uring::{opcode, squeue, types}; use libublk::dev_flags::*; use libublk::io::{UblkDev, UblkIOCtx, UblkQueue}; -use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes, UblkSession}; +use libublk::{ + ctrl::UblkCtrl, exe::Executor, exe::UringOpFuture, sys, UblkError, UblkIORes, UblkSession, +}; use log::trace; use serde::Serialize; use std::os::unix::io::AsRawFd; +use std::rc::Rc; #[derive(Debug, Serialize)] struct LoJson { @@ -57,22 +60,26 @@ fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result i32 { + let op = iod.op_flags & 0xff; + + match op { + libublk::sys::UBLK_IO_OP_FLUSH + | libublk::sys::UBLK_IO_OP_READ + | libublk::sys::UBLK_IO_OP_WRITE => return 0, + _ => return -libc::EINVAL, + }; +} +#[inline] +fn __lo_submit_io_cmd(q: &UblkQueue<'_>, tag: u16, iod: &libublk::sys::ublksrv_io_desc, data: u64) { + let op = iod.op_flags & 0xff; + // either start to handle or retry let off = (iod.start_sector << 9) as u64; let bytes = (iod.nr_sectors << 9) as u32; - let op = iod.op_flags & 0xff; - let data = UblkIOCtx::build_user_data(tag as u16, op, 0, true); let buf_addr = q.get_io_buf_addr(tag); - if op == libublk::sys::UBLK_IO_OP_WRITE_ZEROES || op == libublk::sys::UBLK_IO_OP_DISCARD { - q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL))); - return; - } - match op { libublk::sys::UBLK_IO_OP_FLUSH => { let sqe = &opcode::SyncFileRange::new(types::Fixed(1), bytes) @@ -116,12 +123,36 @@ fn loop_queue_tgt_io(q: &UblkQueue, tag: u16, _io: &UblkIOCtx) { .expect("submission fail"); } } - _ => q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL))), + _ => {} }; } -fn _lo_handle_io(q: &UblkQueue, tag: u16, i: &UblkIOCtx) { - // our IO on backing file is done +async fn lo_handle_io_cmd_async(q: &UblkQueue<'_>, tag: u16) -> i32 { + let _iod = q.get_iod(tag); + let iod = unsafe { &*_iod }; + let op = iod.op_flags & 0xff; + let user_data = UblkIOCtx::build_user_data_async(tag as u16, op, 0); + let res = __lo_prep_submit_io_cmd(iod); + if res < 0 { + return res; + } + + for _ in 0..4 { + __lo_submit_io_cmd(q, tag, iod, user_data); + let res = UringOpFuture { user_data }.await; + if res != -(libc::EAGAIN) { + return res; + } + } + + return -libc::EAGAIN; +} + +fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx) { + let _iod = q.get_iod(tag); + let iod = unsafe { &*_iod }; + let op = iod.op_flags & 0xff; + let data = UblkIOCtx::build_user_data(tag as u16, op, 0, true); if i.is_tgt_io() { let user_data = i.user_data(); let res = i.result(); @@ -135,7 +166,12 @@ fn _lo_handle_io(q: &UblkQueue, tag: u16, i: &UblkIOCtx) { } } - loop_queue_tgt_io(q, tag, i); + let res = __lo_prep_submit_io_cmd(iod); + if res < 0 { + q.complete_io_cmd(tag, Ok(UblkIORes::Result(res))); + } else { + __lo_submit_io_cmd(q, tag, iod, data); + } } fn test_add( @@ -146,10 +182,11 @@ fn test_add( backing_file: &String, ctrl_flags: u64, fg: bool, + aio: bool, ) { let _pid = if !fg { unsafe { libc::fork() } } else { 0 }; if _pid == 0 { - __test_add(id, nr_queues, depth, buf_sz, backing_file, ctrl_flags); + __test_add(id, nr_queues, depth, buf_sz, backing_file, ctrl_flags, aio); } } @@ -160,6 +197,7 @@ fn __test_add( buf_sz: u32, backing_file: &String, ctrl_flags: u64, + aio: bool, ) { { // LooTgt has to live in the whole device lifetime @@ -186,20 +224,54 @@ fn __test_add( let tgt_init = |dev: &mut UblkDev| lo_init_tgt(dev, &lo); let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap(); - let q_fn = move |qid: u16, _dev: &UblkDev| { - let lo_io_handler = - move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| _lo_handle_io(q, tag, io); + let q_async_fn = move |qid: u16, dev: &UblkDev| { + let q_rc = Rc::new(UblkQueue::new(qid as u16, &dev, false).unwrap()); + let exe = Executor::new(dev.get_nr_ios()); + + for tag in 0..depth as u16 { + let q = q_rc.clone(); + + exe.spawn(tag as u16, async move { + let buf_addr = q.get_io_buf_addr(tag) as u64; + let mut cmd_op = sys::UBLK_IO_FETCH_REQ; + let mut res = 0; + loop { + let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await; + if cmd_res == sys::UBLK_IO_RES_ABORT { + break; + } + + res = lo_handle_io_cmd_async(&q, tag).await; + cmd_op = sys::UBLK_IO_COMMIT_AND_FETCH_REQ; + } + }); + } + q_rc.wait_and_wake_io_tasks(&exe); + }; + + let q_sync_fn = move |qid: u16, _dev: &UblkDev| { + let lo_io_handler = move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| { + lo_handle_io_cmd_sync(q, tag, io) + }; UblkQueue::new(qid, _dev, true) .unwrap() .wait_and_handle_io(lo_io_handler); }; - sess.run_target(&mut ctrl, &dev, q_fn, |dev_id| { - let mut d_ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap(); - d_ctrl.dump(); - }) - .unwrap() + if aio { + sess.run_target(&mut ctrl, &dev, q_async_fn, |dev_id| { + let mut d_ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap(); + d_ctrl.dump(); + }) + .unwrap() + } else { + sess.run_target(&mut ctrl, &dev, q_sync_fn, |dev_id| { + let mut d_ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap(); + d_ctrl.dump(); + }) + .unwrap() + } }; wh.join().unwrap(); } @@ -265,6 +337,13 @@ fn main() { .required(true) .help("backing file") .action(ArgAction::Set), + ) + .arg( + Arg::new("async") + .long("async") + .short('a') + .action(ArgAction::SetTrue) + .help("use async/await to handle IO command"), ), ) .subcommand( @@ -304,6 +383,11 @@ fn main() { .unwrap_or(52288); let backing_file = add_matches.get_one::("backing_file").unwrap(); + let aio = if add_matches.get_flag("async") { + true + } else { + false + }; let fg = if add_matches.get_flag("forground") { true } else { @@ -314,7 +398,16 @@ fn main() { } else { 0 }; - test_add(id, nr_queues, depth, buf_size, backing_file, ctrl_flags, fg); + test_add( + id, + nr_queues, + depth, + buf_size, + backing_file, + ctrl_flags, + fg, + aio, + ); } Some(("del", add_matches)) => { let id = add_matches