diff --git a/src/io.rs b/src/io.rs index 08b893c..6bd7d3d 100644 --- a/src/io.rs +++ b/src/io.rs @@ -4,6 +4,7 @@ use super::{ctrl::UblkCtrl, sys, UblkError, UblkIORes}; use io_uring::{cqueue, opcode, squeue, types, IoUring}; use log::{error, info, trace}; use serde::{Deserialize, Serialize}; +use std::cell::RefCell; use std::fs; use std::os::unix::io::AsRawFd; @@ -357,6 +358,59 @@ impl UblkQueueCtx { const UBLK_QUEUE_STOPPING: u32 = 1_u32 << 0; const UBLK_QUEUE_IDLE: u32 = 1_u32 << 1; +#[derive(Debug, Clone, Default)] +pub struct UblkQueueState { + cmd_inflight: u32, + state: u32, +} + +impl UblkQueueState { + pub fn new() -> Self { + Self { + ..Default::default() + } + } + + #[inline(always)] + fn queue_is_quiesced(&self) -> bool { + self.cmd_inflight == 0 + } + + #[inline(always)] + fn queue_is_done(&self) -> bool { + self.is_stopping() && self.queue_is_quiesced() + } + + #[inline(always)] + fn get_nr_cmd_inflight(&self) -> u32 { + self.cmd_inflight + } + + #[inline(always)] + fn is_stopping(&self) -> bool { + (self.state & UBLK_QUEUE_STOPPING) != 0 + } + + #[inline(always)] + fn is_idle(&self) -> bool { + (self.state & UBLK_QUEUE_IDLE) != 0 + } + + #[inline(always)] + fn inc_cmd_inflight(&mut self) { + self.cmd_inflight += 1; + } + + #[inline(always)] + fn dec_cmd_inflight(&mut self) { + self.cmd_inflight -= 1; + } + + fn mark_stopping(&mut self) { + self.state |= UBLK_QUEUE_STOPPING; + } +} + /// UBLK queue abstraction /// /// UblkQueue is the core part of the whole stack, which communicates with @@ -377,9 +431,8 @@ pub struct UblkQueue<'a> { io_cmd_buf: u64, //ops: Box, pub dev: &'a UblkDev, - cmd_inflight: u32, - q_state: u32, bufs: Vec<*mut u8>, + state: RefCell, q_ring: IoUring, } @@ -512,8 +565,10 @@ impl UblkQueue<'_> { q_depth: depth, io_cmd_buf: io_cmd_buf as u64, dev, - cmd_inflight: 0, - q_state: 0, + state: RefCell::new(UblkQueueState { + cmd_inflight: 0, + state: 0, + }), q_ring: ring, bufs, }; @@ -538,7 +593,8 @@ impl UblkQueue<'_> { #[inline(always)] #[allow(unused_assignments)] fn queue_io_cmd(&mut self, tag: u16, cmd_op: u32, buf_addr: u64, res: i32) -> i32 { - if (self.q_state & UBLK_QUEUE_STOPPING) != 0 { + let mut state = self.state.borrow_mut(); + if state.is_stopping() { return 0; } @@ -562,14 +618,15 @@ impl UblkQueue<'_> { .expect("submission fail"); } - self.cmd_inflight += 1; + state.inc_cmd_inflight(); + trace!( "{}: (qid {} tag {} cmd_op {}) stopping {}", "queue_io_cmd", self.q_id, tag, cmd_op, - (self.q_state & UBLK_QUEUE_STOPPING) != 0 + state.is_stopping(), ); 1 @@ -597,16 +654,6 @@ impl UblkQueue<'_> { } } - #[inline(always)] - fn queue_is_idle(&self) -> bool { - self.cmd_inflight == 0 - } - - #[inline(always)] - fn queue_is_done(&self) -> bool { - (self.q_state & UBLK_QUEUE_STOPPING) != 0 && self.queue_is_idle() - } - #[inline(always)] fn complete_ios(&mut self, tag: usize, res: Result) { match res { @@ -657,16 +704,18 @@ impl UblkQueue<'_> { let tag = UblkIOCtx::user_data_to_tag(data); let cmd_op = UblkIOCtx::user_data_to_op(data); - trace!( - "{}: res {} (qid {} tag {} cmd_op {} target {}) state {}", - "handle_cqe", - res, - self.q_id, - tag, - cmd_op, - is_target_io(data), - self.q_state, - ); + { + trace!( + "{}: res {} (qid {} tag {} cmd_op {} target {}) state {:?}", + "handle_cqe", + res, + self.q_id, + tag, + cmd_op, + is_target_io(data), + self.state.borrow(), + ); + } if is_target_io(data) { let res = e.result(); @@ -686,10 +735,13 @@ impl UblkQueue<'_> { return; } - self.cmd_inflight -= 1; + { + let mut state = self.state.borrow_mut(); + state.dec_cmd_inflight(); - if res == sys::UBLK_IO_RES_ABORT { - self.q_state |= UBLK_QUEUE_STOPPING; + if res == sys::UBLK_IO_RES_ABORT { + state.mark_stopping(); + } } if res == sys::UBLK_IO_RES_OK as i32 { @@ -724,13 +776,14 @@ impl UblkQueue<'_> { #[inline(always)] fn wait_ios(&mut self, to_wait: usize) -> Result { + let state = self.state.borrow(); info!( "dev{}-q{}: to_submit {} inflight cmd {} stopping {}", self.dev.dev_info.dev_id, self.q_id, 0, - self.cmd_inflight, - (self.q_state & UBLK_QUEUE_STOPPING) + state.get_nr_cmd_inflight(), + state.is_stopping(), ); let ret = self @@ -743,8 +796,8 @@ impl UblkQueue<'_> { "submit result {}, nr_cqes {} stop {} idle {}", ret, nr_cqes, - (self.q_state & UBLK_QUEUE_STOPPING), - (self.q_state & UBLK_QUEUE_IDLE) + state.is_stopping(), + state.is_idle(), ); Ok(nr_cqes) } @@ -803,7 +856,7 @@ impl UblkQueue<'_> { where F: FnMut(&mut UblkIOCtx) -> Result, { - if self.queue_is_done() && self.q_ring.submission().is_empty() { + if self.state.borrow().queue_is_done() && self.q_ring.submission().is_empty() { return Err(UblkError::QueueIsDown(-libc::ENODEV)); }