Skip to content

Commit

Permalink
libublk: io: define q_ring as RefCell<>
Browse files Browse the repository at this point in the history
So that UblkQueue can be &self.

Signed-off-by: Ming Lei <tom.leiming@gmail.com>
  • Loading branch information
ming1 committed Oct 6, 2023
1 parent 05aed9b commit 72d2bef
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 63 deletions.
2 changes: 1 addition & 1 deletion examples/null_ll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn test_add(dev_id: i32) {
for q in 0..nr_queues {
let dev = Arc::clone(&ublk_dev);
threads.push(std::thread::spawn(move || {
let mut queue = UblkQueue::new(q as u16, &dev).unwrap();
let queue = UblkQueue::new(q as u16, &dev).unwrap();
let ctx = queue.make_queue_ctx();

//IO handling closure(FnMut), we are driven by io_uring CQE, and
Expand Down
140 changes: 79 additions & 61 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,15 @@ pub struct UblkQueue<'a> {
pub dev: &'a UblkDev,
bufs: Vec<*mut u8>,
state: RefCell<UblkQueueState>,
q_ring: IoUring<squeue::Entry>,
q_ring: RefCell<IoUring<squeue::Entry>>,
}

impl Drop for UblkQueue<'_> {
fn drop(&mut self) {
let dev = self.dev;
trace!("dev {} queue {} dropped", dev.dev_info.dev_id, self.q_id);

if let Err(r) = self.q_ring.submitter().unregister_files() {
if let Err(r) = self.q_ring.borrow_mut().submitter().unregister_files() {
error!("unregister fixed files failed {}", r);
}

Expand Down Expand Up @@ -559,7 +559,7 @@ impl UblkQueue<'_> {
bufs[i as usize] = addr;
}

let mut q = UblkQueue {
let q = UblkQueue {
flags: dev.flags,
q_id,
q_depth: depth,
Expand All @@ -569,7 +569,7 @@ impl UblkQueue<'_> {
cmd_inflight: 0,
state: 0,
}),
q_ring: ring,
q_ring: RefCell::new(ring),
bufs,
};
q.submit_fetch_commands();
Expand All @@ -592,7 +592,14 @@ impl UblkQueue<'_> {

#[inline(always)]
#[allow(unused_assignments)]
fn queue_io_cmd(&mut self, tag: u16, cmd_op: u32, buf_addr: u64, res: i32) -> i32 {
fn queue_io_cmd(
&self,
r: &mut IoUring<squeue::Entry>,
tag: u16,
cmd_op: u32,
buf_addr: u64,
res: i32,
) -> i32 {
let mut state = self.state.borrow_mut();
if state.is_stopping() {
return 0;
Expand All @@ -612,10 +619,7 @@ impl UblkQueue<'_> {
.user_data(data);

unsafe {
self.q_ring
.submission()
.push(&sqe)
.expect("submission fail");
r.submission().push(&sqe).expect("submission fail");
}

state.inc_cmd_inflight();
Expand All @@ -633,8 +637,15 @@ impl UblkQueue<'_> {
}

#[inline(always)]
fn commit_and_queue_io_cmd(&mut self, tag: u16, buf_addr: u64, io_cmd_result: i32) {
fn commit_and_queue_io_cmd(
&self,
r: &mut IoUring<squeue::Entry>,
tag: u16,
buf_addr: u64,
io_cmd_result: i32,
) {
self.queue_io_cmd(
r,
tag,
sys::UBLK_IO_COMMIT_AND_FETCH_REQ,
buf_addr,
Expand All @@ -647,21 +658,32 @@ impl UblkQueue<'_> {
/// Only called during queue initialization. After queue is setup,
/// COMMIT_AND_FETCH_REQ command is used for both committing io command
/// result and fetching new incoming IO
fn submit_fetch_commands(&mut self) {
fn submit_fetch_commands(&self) {
for i in 0..self.q_depth {
let buf_addr = self.get_io_buf_addr(i as usize);
self.queue_io_cmd(i as u16, sys::UBLK_IO_FETCH_REQ, buf_addr, -1);
self.queue_io_cmd(
&mut self.q_ring.borrow_mut(),
i as u16,
sys::UBLK_IO_FETCH_REQ,
buf_addr,
-1,
);
}
}

#[inline(always)]
fn complete_ios(&mut self, tag: usize, res: Result<UblkIORes, UblkError>) {
fn complete_ios(
&self,
r: &mut IoUring<squeue::Entry>,
tag: usize,
res: Result<UblkIORes, UblkError>,
) {
match res {
Ok(UblkIORes::Result(res))
| Err(UblkError::OtherError(res))
| Err(UblkError::UringIOError(res)) => {
let buf_addr = self.get_io_buf_addr(tag);
self.commit_and_queue_io_cmd(tag as u16, buf_addr, res);
self.commit_and_queue_io_cmd(r, tag as u16, buf_addr, res);
}
Err(UblkError::IoQueued(_)) => {}
#[cfg(feature = "fat_complete")]
Expand All @@ -671,31 +693,32 @@ impl UblkQueue<'_> {
for item in ios {
let tag = item.0;
let buf_addr = self.get_io_buf_addr(tag as usize);
self.commit_and_queue_io_cmd(tag, buf_addr, item.1);
self.commit_and_queue_io_cmd(r, tag, buf_addr, item.1);
}
}
UblkFatRes::ZonedAppendRes((res, lba)) => {
self.commit_and_queue_io_cmd(tag as u16, lba, res);
self.commit_and_queue_io_cmd(r, tag as u16, lba, res);
}
},
_ => {}
};
}

#[inline(always)]
fn call_io_closure<F>(&mut self, mut ops: F, tag: u32, e: &UblkCQE)
fn call_io_closure<F>(&self, mut ops: F, tag: u32, e: &UblkCQE)
where
F: FnMut(&mut UblkIOCtx) -> Result<UblkIORes, UblkError>,
{
let mut ctx = UblkIOCtx(&mut self.q_ring, self.bufs[tag as usize], e);
let mut r = self.q_ring.borrow_mut();
let mut ctx = UblkIOCtx(&mut r, self.bufs[tag as usize], e);

let res = ops(&mut ctx);
self.complete_ios(tag as usize, res);
self.complete_ios(&mut r, tag as usize, res);
}

#[inline(always)]
#[allow(unused_assignments)]
fn handle_cqe<F>(&mut self, ops: F, e: &UblkCQE)
fn handle_cqe<F>(&self, ops: F, e: &UblkCQE)
where
F: FnMut(&mut UblkIOCtx) -> Result<UblkIORes, UblkError>,
{
Expand Down Expand Up @@ -751,31 +774,7 @@ impl UblkQueue<'_> {
}

#[inline(always)]
fn reap_one_event<F>(&mut self, ops: F, idx: i32, cnt: i32) -> usize
where
F: FnMut(&mut UblkIOCtx) -> Result<UblkIORes, UblkError>,
{
if idx >= cnt {
return 0;
}

let cqe = match self.q_ring.completion().next() {
None => return 0,
Some(r) => r,
};

let ublk_cqe = UblkCQE(
&cqe,
if idx == 0 { UBLK_IO_F_FIRST } else { 0 }
| if idx + 1 == cnt { UBLK_IO_F_LAST } else { 0 },
);
self.handle_cqe(ops, &ublk_cqe);

1
}

#[inline(always)]
fn wait_ios(&mut self, to_wait: usize) -> Result<i32, UblkError> {
fn wait_ios(&self, to_wait: usize) -> Result<Vec<cqueue::Entry>, UblkError> {
let state = self.state.borrow();
info!(
"dev{}-q{}: to_submit {} inflight cmd {} stopping {}",
Expand All @@ -788,18 +787,29 @@ impl UblkQueue<'_> {

let ret = self
.q_ring
.borrow()
.submit_and_wait(to_wait)
.map_err(UblkError::UringSubmissionError)?;

let nr_cqes = self.q_ring.completion().len() as i32;
info!(
"submit result {}, nr_cqes {} stop {} idle {}",
ret,
nr_cqes,
state.is_stopping(),
state.is_idle(),
);
Ok(nr_cqes)
let cqes = {
let _cqes: Vec<cqueue::Entry> = self
.q_ring
.borrow_mut()
.completion()
.map(Into::into)
.collect();
info!(
"submit result {}, nr_cqes {} stop {} idle {}",
ret,
_cqes.len(),
state.is_stopping(),
state.is_idle(),
);

_cqes
};

Ok(cqes)
}

/// Process the incoming IOs(io commands & target IOs) from io_uring
Expand Down Expand Up @@ -852,19 +862,27 @@ impl UblkQueue<'_> {
/// provided, and target code can return UblkFatRes::BatchRes(batch) to
/// cover each completed IO(tag, result) in io closure. Then, all these
/// added IOs will be completed automatically.
pub fn process_ios<F>(&mut self, mut ops: F, to_wait: usize) -> Result<i32, UblkError>
pub fn process_ios<F>(&self, mut ops: F, to_wait: usize) -> Result<i32, UblkError>
where
F: FnMut(&mut UblkIOCtx) -> Result<UblkIORes, UblkError>,
{
if self.state.borrow().queue_is_done() && self.q_ring.submission().is_empty() {
return Err(UblkError::QueueIsDown(-libc::ENODEV));
if self.state.borrow().queue_is_done() {
if self.q_ring.borrow_mut().submission().is_empty() {
return Err(UblkError::QueueIsDown(-libc::ENODEV));
}
}

match self.wait_ios(to_wait) {
Err(r) => Err(r),
Ok(done) => {
for idx in 0..done {
self.reap_one_event(&mut ops, idx, done);
Ok(cqes) => {
let cnt = cqes.len();
for (idx, cqe) in cqes.iter().enumerate() {
let ublk_cqe = UblkCQE(
&cqe,
if idx == 0 { UBLK_IO_F_FIRST } else { 0 }
| if idx + 1 == cnt { UBLK_IO_F_LAST } else { 0 },
);
self.handle_cqe(&mut ops, &ublk_cqe);
}
Ok(0)
}
Expand All @@ -880,7 +898,7 @@ impl UblkQueue<'_> {
/// Called in queue context. won't return unless error is observed.
/// Wait and handle any incoming cqe until queue is down.
///
pub fn wait_and_handle_io<F>(&mut self, mut ops: F)
pub fn wait_and_handle_io<F>(&self, mut ops: F)
where
F: FnMut(&mut UblkIOCtx) -> Result<UblkIORes, UblkError>,
{
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl UblkSession {
}
_tx.send((q, unsafe { libc::gettid() })).unwrap();

let mut queue = io::UblkQueue::new(q, &_dev).unwrap();
let queue = io::UblkQueue::new(q, &_dev).unwrap();
let queue_closure = {
let ctx = queue.make_queue_ctx();
move |io_ctx: &mut io::UblkIOCtx| _q_fn(&ctx, io_ctx)
Expand Down

0 comments on commit 72d2bef

Please sign in to comment.