Skip to content

Commit

Permalink
libublk: investigate how to support async/await
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Lei <tom.leiming@gmail.com>
  • Loading branch information
ming1 committed Sep 28, 2023
1 parent a389de5 commit fd6241e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bitmaps = "3.2.0"
log = {version = "0.4", features = ["release_max_level_off"]}
thiserror = "1.0.43"
derive_builder = "0.12"
futures = { version = "0.3", optional = true }

[dev-dependencies]
block-utils = "0.11.0"
Expand Down
18 changes: 15 additions & 3 deletions examples/null_ll.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue, UblkQueueCtx};
use libublk::io::{UblkDev, UblkIOCtx, UblkQueue, UblkQueueCtx, Executor};
use libublk::{ctrl::UblkCtrl, UblkError};
use std::sync::Arc;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

fn null_handle_io(ctx: &UblkQueueCtx, io: &mut UblkIOCtx, park: bool) -> Result<i32, UblkError> {
fn null_handle_io(ctx: UblkQueueCtx, io: &mut UblkIOCtx, park: bool) -> Result<i32, UblkError> {
let iod = ctx.get_iod(io.get_tag());
let bytes = unsafe { (*iod).nr_sectors << 9 } as i32;

Expand Down Expand Up @@ -45,11 +50,18 @@ fn test_add(dev_id: i32) {
threads.push(std::thread::spawn(move || {
let mut queue = UblkQueue::new(q as u16, &dev).unwrap();
let ctx = queue.make_queue_ctx();
let mut exe = Executor::new(ctx.depth);

//IO handling closure(FnMut), we are driven by io_uring CQE, and
//this closure is called for every incoming CQE(io command or
//target io completion)
let io_handler = move |io: &mut UblkIOCtx| null_handle_io(&ctx, io, park != 0);
let io_handler = move |io: &mut UblkIOCtx| {
let tag = io.get_tag() as u16;
exe.spawn(tag, Box::pin(async {
null_handle_io(ctx, io, park != 0);
}));
Ok(0)
};
queue.wait_and_handle_io(io_handler);
}));
}
Expand Down
43 changes: 43 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use serde::{Deserialize, Serialize};
use std::fs;
use std::os::unix::io::AsRawFd;

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use std::ptr::NonNull;

/// Return value of IO handling closure.
///
/// If `UBLK_IO_S_COMP_BATCH` is returned from the closure, we need
Expand All @@ -17,6 +24,42 @@ use std::os::unix::io::AsRawFd;
/// `UblkIOCtx::add_to_comp_batch()` for later completion.
pub const UBLK_IO_S_COMP_BATCH: i32 = 1;

pub struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
}
impl Task {
pub fn new(future: Pin<Box<dyn Future<Output = ()>>>) -> Task {
Task {
future,
}
}
fn poll(&mut self, context: &mut Context) -> Poll<()> {
self.future.as_mut().poll(context)
}
}

pub struct Executor {
tasks: Vec<Task>,
}

impl Executor {
pub fn new(nr_tasks: u16) -> Executor {
let mut tasks = Vec::<Task>::with_capacity(nr_tasks as usize);
unsafe {
tasks.set_len(nr_tasks as usize);
}

Executor {
tasks,
}
}

pub fn spawn(&mut self, tag: u16, future: Pin<Box<dyn Future<Output = ()>>>) {
self.tasks[tag as usize] = Task::new(future);
}
}


/// UblkIOCtx
///
/// When any io_uring CQE is received, libublk lets the target code handle
Expand Down

0 comments on commit fd6241e

Please sign in to comment.