Skip to content

Commit

Permalink
feat(cq): implementation poll_cq for cq_ex
Browse files Browse the repository at this point in the history
We implement an iterator version of poll_cq for cq_ex, this adheres to
the flavor of `ibv_start_poll` and `ibv_next_poll` APIs.

Signed-off-by: Luke Yue <lukedyue@gmail.com>
  • Loading branch information
dragonJACson committed Sep 18, 2024
1 parent cd0548a commit c5dfc9e
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 3 deletions.
13 changes: 12 additions & 1 deletion examples/test_post_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,18 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let _err = guard.do_post().unwrap();

thread::sleep(time::Duration::from_millis(5));
thread::sleep(time::Duration::from_millis(10));

// poll for the completion
{
let mut poller = sq.start_poll().unwrap();
let mut wc = poller.iter_mut();
println!("wr_id {}, status: {}, opcode: {}", wc.wr_id(), wc.status(), wc.opcode());
assert_eq!(wc.wr_id(), 233);
while let Some(wc) = wc.next() {
println!("wr_id {}, status: {}, opcode: {}", wc.wr_id(), wc.status(), wc.opcode())
}
}

unsafe {
let slice = std::slice::from_raw_parts(mr.buf.data.as_ptr(), mr.buf.len);
Expand Down
95 changes: 93 additions & 2 deletions src/verbs/completion.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::ptr;
use std::ptr::NonNull;
use std::{marker::PhantomData, mem::MaybeUninit};

use super::device_context::DeviceContext;
use rdma_mummy_sys::{
ibv_comp_channel, ibv_cq, ibv_cq_ex, ibv_cq_init_attr_ex, ibv_create_comp_channel, ibv_create_cq, ibv_create_cq_ex,
ibv_destroy_comp_channel, ibv_destroy_cq, ibv_pd,
ibv_destroy_comp_channel, ibv_destroy_cq, ibv_end_poll, ibv_next_poll, ibv_pd, ibv_poll_cq_attr, ibv_start_poll,
ibv_wc_read_byte_len, ibv_wc_read_completion_ts, ibv_wc_read_opcode, ibv_wc_read_vendor_err,
};

#[derive(Debug)]
Expand Down Expand Up @@ -85,6 +86,27 @@ impl CompletionQueue for ExtendedCompletionQueue<'_> {
}
}

impl ExtendedCompletionQueue<'_> {
pub fn start_poll<'cq>(&'cq self) -> Result<ExtendedPoller<'cq>, String> {
let ret = unsafe {
ibv_start_poll(
self.cq().as_ptr() as _,
MaybeUninit::<ibv_poll_cq_attr>::zeroed().as_mut_ptr(),
)
};

unsafe {
match ret {
0 => Ok(ExtendedPoller {
cq: NonNull::new(self.cq().as_ptr().cast::<ibv_cq_ex>()).unwrap_unchecked(),
_phantom: PhantomData,
}),
err => Err(format!("ibv_start_poll failed, ret={err}")),
}
}
}
}

// generic builder for both cq and cq_ex
pub struct CompletionQueueBuilder<'res> {
dev_ctx: &'res DeviceContext,
Expand Down Expand Up @@ -131,6 +153,7 @@ impl<'res> CompletionQueueBuilder<'res> {
self.init_attr.comp_vector = comp_vector;
self
}

// TODO(fuji): set various attributes

// build extended cq
Expand Down Expand Up @@ -167,3 +190,71 @@ impl<'res> CompletionQueueBuilder<'res> {
}

// TODO trait for both cq and cq_ex?

pub struct ExtendedWorkCompletion<'cq> {
pub(crate) cq: NonNull<ibv_cq_ex>,
_phantom: PhantomData<&'cq ()>,
}

impl<'cq> ExtendedWorkCompletion<'cq> {
pub fn wr_id(&self) -> u64 {
unsafe { self.cq.as_ref().wr_id }
}

pub fn status(&self) -> u32 {
unsafe { self.cq.as_ref().status }
}

pub fn opcode(&self) -> u32 {
unsafe { ibv_wc_read_opcode(self.cq.as_ptr()) }
}

pub fn vendor_err(&self) -> u32 {
unsafe { ibv_wc_read_vendor_err(self.cq.as_ptr()) }
}

pub fn byte_len(&self) -> u32 {
unsafe { ibv_wc_read_byte_len(self.cq.as_ptr()) }
}

pub fn completion_timestamp(&self) -> u64 {
unsafe { ibv_wc_read_completion_ts(self.cq.as_ptr()) }
}
}

pub struct ExtendedPoller<'cq> {
pub(crate) cq: NonNull<ibv_cq_ex>,
_phantom: PhantomData<&'cq ()>,
}

impl ExtendedPoller<'_> {
pub fn iter_mut(&mut self) -> ExtendedWorkCompletion {
ExtendedWorkCompletion {
cq: self.cq,
_phantom: PhantomData,
}
}
}

impl<'a> Iterator for ExtendedWorkCompletion<'a> {
type Item = ExtendedWorkCompletion<'a>;

fn next(&mut self) -> Option<Self::Item> {
let ret = unsafe { ibv_next_poll(self.cq.as_ptr()) };

if ret != 0 {
None
} else {
Some(ExtendedWorkCompletion {
cq: self.cq,
_phantom: PhantomData,
})
}
}
}

impl Drop for ExtendedPoller<'_> {
fn drop(&mut self) {
unsafe { ibv_end_poll(self.cq.as_ptr()) }
}
}

0 comments on commit c5dfc9e

Please sign in to comment.