Skip to content

Commit

Permalink
Maintain op order in iocp and io_uring
Browse files Browse the repository at this point in the history
  • Loading branch information
h33p committed Oct 31, 2023
1 parent 7c89399 commit 86c313e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
9 changes: 9 additions & 0 deletions mfio-rt/src/native/impls/io_uring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ impl<'a> IoUringPushHandle<'a> {
}

pub fn try_push_op(&mut self, ring_entry: Entry, ops_entry: Operation) {
// Clear out any pending ops to maintain order
while self.ops.len() + 1 < self.ring_capacity {
if let Some((ring_entry, ops_entry)) = self.pending_ops.pop_front() {
self.push_op(ring_entry, ops_entry);
} else {
break;
}
}

if self.ops.len() + 1 < self.ring_capacity {
self.push_op(ring_entry, ops_entry);
} else {
Expand Down
44 changes: 25 additions & 19 deletions mfio-rt/src/native/impls/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,31 @@ impl IocpState {
true
}

fn submit_pending_ops(&mut self) {
// Submit all pending ops, without changing order if the queue gets
// full again.
while let Some(op) = self.pending_ops.pop_front() {
let idx = match op {
Ok(idx) => idx,
Err(op) => {
if self.ops.len() < self.ops.capacity() {
self.ops.insert(op)
} else {
self.pending_ops.push_front(Err(op));
break;
}
}
};

if !unsafe { self.submit_op(idx, false) } {
break;
}
}
}

unsafe fn try_submit_op(&mut self, operation: Operation) -> Result<(), Option<usize>> {
self.submit_pending_ops();

if self.ops.len() < self.ops.capacity() {
let idx = self.ops.insert(operation);
match self.submit_op(idx, true) {
Expand Down Expand Up @@ -652,25 +676,7 @@ impl Runtime {
break;
}

// Submit all pending ops, without changing order if the queue gets
// full again.
while let Some(op) = state.pending_ops.pop_front() {
let idx = match op {
Ok(idx) => idx,
Err(op) => {
if state.ops.len() < state.ops.capacity() {
state.ops.insert(op)
} else {
state.pending_ops.push_front(Err(op));
break;
}
}
};

if !unsafe { state.submit_op(idx, false) } {
break;
}
}
state.submit_pending_ops();
}

core::mem::swap(&mut old_deferred_pkts, &mut state.deferred_pkts);
Expand Down

0 comments on commit 86c313e

Please sign in to comment.