Skip to content

Commit

Permalink
examples/loop: handle io command by async/await
Browse files Browse the repository at this point in the history
async/await can handling io handle in async style, meantime the
written Rust code can be just like sync programming.

This way is very helpful for writing complicated ublk target.

Signed-off-by: Ming Lei <tom.leiming@gmail.com>
  • Loading branch information
ming1 committed Oct 22, 2023
1 parent aded72b commit 982b20a
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 26 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ installed under /usr/local/sbin or other directory which has to match
with the udev rules.


* [`examples/loop.rs`](examples/loop.rs): the whole example using async/await


## Test

You can run the test of the library with ```cargo test```
Expand Down
145 changes: 119 additions & 26 deletions examples/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use clap::{Arg, ArgAction, Command};
use io_uring::{opcode, squeue, types};
use libublk::dev_flags::*;
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue};
use libublk::{ctrl::UblkCtrl, UblkError, UblkIORes, UblkSession};
use libublk::{
ctrl::UblkCtrl, exe::Executor, exe::UringOpFuture, sys, UblkError, UblkIORes, UblkSession,
};
use log::trace;
use serde::Serialize;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;

#[derive(Debug, Serialize)]
struct LoJson {
Expand Down Expand Up @@ -57,22 +60,26 @@ fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt) -> Result<serde_json::Value, Ubl
)
}

fn loop_queue_tgt_io(q: &UblkQueue, tag: u16, _io: &UblkIOCtx) {
// either start to handle or retry
let _iod = q.get_iod(tag);
let iod = unsafe { &*_iod };
#[inline]
fn __lo_prep_submit_io_cmd(iod: &libublk::sys::ublksrv_io_desc) -> i32 {
let op = iod.op_flags & 0xff;

match op {
libublk::sys::UBLK_IO_OP_FLUSH
| libublk::sys::UBLK_IO_OP_READ
| libublk::sys::UBLK_IO_OP_WRITE => return 0,
_ => return -libc::EINVAL,
};
}

#[inline]
fn __lo_submit_io_cmd(q: &UblkQueue<'_>, tag: u16, iod: &libublk::sys::ublksrv_io_desc, data: u64) {
let op = iod.op_flags & 0xff;
// either start to handle or retry
let off = (iod.start_sector << 9) as u64;
let bytes = (iod.nr_sectors << 9) as u32;
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data(tag as u16, op, 0, true);
let buf_addr = q.get_io_buf_addr(tag);

if op == libublk::sys::UBLK_IO_OP_WRITE_ZEROES || op == libublk::sys::UBLK_IO_OP_DISCARD {
q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL)));
return;
}

match op {
libublk::sys::UBLK_IO_OP_FLUSH => {
let sqe = &opcode::SyncFileRange::new(types::Fixed(1), bytes)
Expand Down Expand Up @@ -116,12 +123,36 @@ fn loop_queue_tgt_io(q: &UblkQueue, tag: u16, _io: &UblkIOCtx) {
.expect("submission fail");
}
}
_ => q.complete_io_cmd(tag, Err(UblkError::OtherError(-libc::EINVAL))),
_ => {}
};
}

fn _lo_handle_io(q: &UblkQueue, tag: u16, i: &UblkIOCtx) {
// our IO on backing file is done
async fn lo_handle_io_cmd_async(q: &UblkQueue<'_>, tag: u16) -> i32 {
let _iod = q.get_iod(tag);
let iod = unsafe { &*_iod };
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data_async(tag as u16, op, 0);
let res = __lo_prep_submit_io_cmd(iod);
if res < 0 {
return res;
}

for _ in 0..4 {
__lo_submit_io_cmd(q, tag, iod, data);
let res = UringOpFuture { user_data: data }.await;
if res != -(libc::EAGAIN) {
return res;
}
}

return -libc::EAGAIN;
}

fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx) {
let _iod = q.get_iod(tag);
let iod = unsafe { &*_iod };
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data(tag as u16, op, 0, true);
if i.is_tgt_io() {
let user_data = i.user_data();
let res = i.result();
Expand All @@ -135,7 +166,12 @@ fn _lo_handle_io(q: &UblkQueue, tag: u16, i: &UblkIOCtx) {
}
}

loop_queue_tgt_io(q, tag, i);
let res = __lo_prep_submit_io_cmd(iod);
if res < 0 {
q.complete_io_cmd(tag, Ok(UblkIORes::Result(res)));
} else {
__lo_submit_io_cmd(q, tag, iod, data);
}
}

fn test_add(
Expand All @@ -146,10 +182,11 @@ fn test_add(
backing_file: &String,
ctrl_flags: u64,
fg: bool,
aio: bool,
) {
let _pid = if !fg { unsafe { libc::fork() } } else { 0 };
if _pid == 0 {
__test_add(id, nr_queues, depth, buf_sz, backing_file, ctrl_flags);
__test_add(id, nr_queues, depth, buf_sz, backing_file, ctrl_flags, aio);
}
}

Expand All @@ -160,6 +197,7 @@ fn __test_add(
buf_sz: u32,
backing_file: &String,
ctrl_flags: u64,
aio: bool,
) {
{
// LooTgt has to live in the whole device lifetime
Expand All @@ -186,20 +224,54 @@ fn __test_add(

let tgt_init = |dev: &mut UblkDev| lo_init_tgt(dev, &lo);
let (mut ctrl, dev) = sess.create_devices(tgt_init).unwrap();
let q_fn = move |qid: u16, _dev: &UblkDev| {
let lo_io_handler =
move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| _lo_handle_io(q, tag, io);
let q_async_fn = move |qid: u16, dev: &UblkDev| {
let q_rc = Rc::new(UblkQueue::new(qid as u16, &dev, false).unwrap());
let exe = Executor::new(dev.get_nr_ios());

for tag in 0..depth as u16 {
let q = q_rc.clone();

exe.spawn(tag as u16, async move {
let buf_addr = q.get_io_buf_addr(tag) as u64;
let mut cmd_op = sys::UBLK_IO_FETCH_REQ;
let mut res = 0;
loop {
let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await;
if cmd_res == sys::UBLK_IO_RES_ABORT {
break;
}

res = lo_handle_io_cmd_async(&q, tag).await;
cmd_op = sys::UBLK_IO_COMMIT_AND_FETCH_REQ;
}
});
}
q_rc.wait_and_wake_io_tasks(&exe);
};

let q_sync_fn = move |qid: u16, _dev: &UblkDev| {
let lo_io_handler = move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
lo_handle_io_cmd_sync(q, tag, io)
};

UblkQueue::new(qid, _dev, true)
.unwrap()
.wait_and_handle_io(lo_io_handler);
};

sess.run_target(&mut ctrl, &dev, q_fn, |dev_id| {
let mut d_ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap();
d_ctrl.dump();
})
.unwrap()
if aio {
sess.run_target(&mut ctrl, &dev, q_async_fn, |dev_id| {
let mut d_ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap();
d_ctrl.dump();
})
.unwrap()
} else {
sess.run_target(&mut ctrl, &dev, q_sync_fn, |dev_id| {
let mut d_ctrl = UblkCtrl::new_simple(dev_id, 0).unwrap();
d_ctrl.dump();
})
.unwrap()
}
};
wh.join().unwrap();
}
Expand Down Expand Up @@ -265,6 +337,13 @@ fn main() {
.required(true)
.help("backing file")
.action(ArgAction::Set),
)
.arg(
Arg::new("async")
.long("async")
.short('a')
.action(ArgAction::SetTrue)
.help("use async/await to handle IO command"),
),
)
.subcommand(
Expand Down Expand Up @@ -304,6 +383,11 @@ fn main() {
.unwrap_or(52288);
let backing_file = add_matches.get_one::<String>("backing_file").unwrap();

let aio = if add_matches.get_flag("async") {
true
} else {
false
};
let fg = if add_matches.get_flag("forground") {
true
} else {
Expand All @@ -314,7 +398,16 @@ fn main() {
} else {
0
};
test_add(id, nr_queues, depth, buf_size, backing_file, ctrl_flags, fg);
test_add(
id,
nr_queues,
depth,
buf_size,
backing_file,
ctrl_flags,
fg,
aio,
);
}
Some(("del", add_matches)) => {
let id = add_matches
Expand Down

0 comments on commit 982b20a

Please sign in to comment.