Skip to content

Commit

Permalink
Enhance SubmissionQueue to support prepare SQE in place
Browse files Browse the repository at this point in the history
Export SubmissionQueue::get_available_sqe() to prepare available SQE
for in-place preparation. And export SubmissionQueue::move_forward()
to commit prepared SQEs.

Sample code to use the new interface:
pub fn prepare_sqe(mut sq: SubmissionQueue<'_>) {
    unsafe {
        match sq.get_available_sqe(0) {
            Ok(sqe) => {
                let nop_sqe: &mut crate::opcode::NopSqe = sqe.into();
                nop_sqe.prepare();
                sq.move_forward(1);
            }
            Err(_) => return,
        }
    }
}

Signed-off-by: Liu Jiang <gerry@linux.alibaba.com>
  • Loading branch information
jiangliu committed Jan 3, 2022
1 parent 21bbc6c commit c3869d2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 33 deletions.
5 changes: 5 additions & 0 deletions src/opcode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ macro_rules! opcode {
$( $set_op_sqe_special_fields )?
}

#[inline]
pub fn get_mut_sqe(&mut self) -> &mut sys::io_uring_sqe {
&mut self.sqe
}

$(
$( #[$opt_meta] )*
#[inline]
Expand Down
117 changes: 84 additions & 33 deletions src/squeue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::error::Error;
use std::fmt::{self, Display, Formatter};
use std::sync::atomic;

use crate::opcode::sqe_zeroed;
#[cfg(feature = "unstable")]
use crate::opcode::PrepareSQE;
use crate::sys;
Expand Down Expand Up @@ -223,13 +224,10 @@ impl SubmissionQueue<'_> {
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
#[inline]
pub unsafe fn push(&mut self, Entry(entry): &Entry) -> Result<(), PushError> {
if !self.is_full() {
*self.next_sqe() = *entry;
self.move_forward();
Ok(())
} else {
Err(PushError)
}
let sqe = self.get_available_sqe(0)?;
*sqe = *entry;
self.move_forward(1);
Ok(())
}

/// Attempts to push several [entries](Entry) into the queue.
Expand All @@ -248,36 +246,72 @@ impl SubmissionQueue<'_> {
}

for Entry(entry) in entries {
*self.next_sqe() = *entry;
self.move_forward();
let sqe = self.get_available_sqe(0)?;
*sqe = *entry;
self.move_forward(1);
}

Ok(())
}
}

#[cfg(not(feature = "unstable"))]
impl SubmissionQueue<'_> {
/// Try to get a mutable reference to the SQE at `next_available + offset`.
///
/// # Safety
///
/// The returned mutable reference has been zeroed-out. Developers must ensure that legal
/// values are set onto the SQE and call `move_forward()` to commit the prepared SQE.
#[inline]
unsafe fn get_available_sqe(
&mut self,
offset: u32,
) -> Result<&mut sys::io_uring_sqe, PushError> {
if self.capacity() - self.len() <= offset as usize {
Err(PushError)
} else {
let sqe = &mut *self
.queue
.sqes
.add((self.tail.wrapping_add(offset) & self.queue.ring_mask) as usize);
*sqe = sqe_zeroed();
Ok(sqe)
}
}

/// Move the submission queue forward by `count` steps.
///
/// # Safety
///
/// Developers must ensure that `count` is valid and the next `count` SQEs have been correctly
/// initialized.
#[inline]
unsafe fn move_forward(&mut self, count: u32) {
self.tail = self.tail.wrapping_add(count);
}
}

#[cfg(feature = "unstable")]
impl SubmissionQueue<'_> {
/// Attempts to push an opcode into the submission queue.
/// If the queue is full, an error is returned.
///
/// # Safety
///
/// Developers must ensure that parameters of the opcode (such as buffer) are valid and will
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
#[cfg(feature = "unstable")]
#[inline]
pub unsafe fn push_command<'a, T: PrepareSQE>(
&'a mut self,
opcode: &T,
options: Option<&SqeCommonOptions>,
) -> Result<(), PushError> {
if !self.is_full() {
let sqe = self.next_sqe();
options.map(|v| v.set(sqe));
opcode.prepare(sqe);
self.move_forward();
Ok(())
} else {
Err(PushError)
}
let sqe = self.get_available_sqe(0)?;
opcode.prepare(sqe);
options.map(|v| v.set(sqe));
self.move_forward(1);
Ok(())
}

/// Attempts to push several opcodes into the queue.
Expand All @@ -288,7 +322,6 @@ impl SubmissionQueue<'_> {
/// Developers must ensure that parameters of all the entries (such as buffer) are valid and
/// will be valid for the entire duration of the operation, otherwise it may cause memory
/// problems.
#[cfg(feature = "unstable")]
#[inline]
pub unsafe fn push_commands<'a, T: PrepareSQE>(
&'a mut self,
Expand All @@ -299,29 +332,47 @@ impl SubmissionQueue<'_> {
}

for (opcode, options) in ops {
let sqe = self.next_sqe();
options.map(|v| v.set(sqe));
let sqe = self.get_available_sqe(0)?;
opcode.prepare(sqe);
self.move_forward();
options.map(|v| v.set(sqe));
self.move_forward(1);
}

Ok(())
}

// Unsafe because it may return entry being used by kernel and make kernel use the
// uninitialized entry.
/// Try to get a mutable reference to the SQE at `next_available + offset`.
///
/// # Safety
///
/// The returned mutable reference has been zeroed-out. Developers must ensure that legal
/// values are set onto the SQE and call `move_forward()` to commit the prepared SQE.
#[inline]
unsafe fn next_sqe(&mut self) -> &mut sys::io_uring_sqe {
&mut *self
.queue
.sqes
.add((self.tail & self.queue.ring_mask) as usize)
pub unsafe fn get_available_sqe(
&mut self,
offset: u32,
) -> Result<&mut sys::io_uring_sqe, PushError> {
if self.capacity() - self.len() <= offset as usize {
Err(PushError)
} else {
let sqe = &mut *self
.queue
.sqes
.add((self.tail.wrapping_add(offset) & self.queue.ring_mask) as usize);
*sqe = sqe_zeroed();
Ok(sqe)
}
}

// Unsafe because it may cause kernel to access uninitialized entry.
/// Move the submission queue forward by `count` steps.
///
/// # Safety
///
/// Developers must ensure that `count` is valid and the next `count` SQEs have been correctly
/// initialized.
#[inline]
unsafe fn move_forward(&mut self) {
self.tail = self.tail.wrapping_add(1);
pub unsafe fn move_forward(&mut self, count: u32) {
self.tail = self.tail.wrapping_add(count);
}
}

Expand Down

0 comments on commit c3869d2

Please sign in to comment.