Skip to content
This repository was archived by the owner on Oct 28, 2025. It is now read-only.
This repository was archived by the owner on Oct 28, 2025. It is now read-only.

async/await runtime on top of tarantool fibers/coio #12

@chertov

Description

@chertov

Hi! A part of my code was written with async/await syntax and i need only sleep future.. i am trying to implement a simple proof of concept with async/await runtime for tarantool side of rust application
the poc based on https://rust-lang.github.io/async-book/02_execution/04_executor.html

use std::sync::{Arc, Mutex};
use futures::future::FutureExt;


// --------------------- Async/Await Example --------------------- //
async fn test() {
    let mut counter = 0;
    loop {
        let f1 = async {
            TntSleep::new(std::time::Duration::new(5, 0)).await;
            println!("[rust_async] tick_5s!");
        };
        let f2 = async {
            TntSleep::new(std::time::Duration::new(10, 0)).await;
            println!("[rust_async] tick_10s!");
        };
        futures::select!{
            _ = TntSleep::new(std::time::Duration::new(7, 0)).fuse() => println!("[rust_async] tick_7s!"),
            _ = async { futures::join!(f1, f2); }.fuse() => panic!("async select"),
        }
        println!("[rust_async] tick counter: {}!", counter);
        counter += 1;
    }
}

#[no_mangle]
pub extern "C" fn example(_: tarantool::tuple::FunctionCtx, _: tarantool::tuple::FunctionArgs) -> std::os::raw::c_int {
    let mut fiber = tarantool::fiber::Fiber::new("rust_async_runtime", &mut |_| {
        let runtime = Runtime::new();
        spawn(async { test().await });
        runtime.run();
        0
    });
    fiber.start(());
    0
}


// --------------------- RUNTIME --------------------- //
thread_local! {
    static SENDER: std::cell::RefCell<Option<tarantool::coio::Sender<Arc<Task>>>> = std::cell::RefCell::new(None);
}
fn spawn(future: impl std::future::Future<Output = ()> + 'static + Send) {
    let future = future.boxed();
    let task = Arc::new(Task { future: Mutex::new(Some(future)), });
    SENDER.with(|s| {
        if let Some(task_sender) = &mut *s.borrow_mut() {
            task_sender.send(task).expect("too many tasks queued");
        }
    });
}

struct Runtime {
    ready_queue: tarantool::coio::Receiver<Arc<Task>>,
}
impl Runtime {
    fn new() -> Self {
        const MAX_QUEUED_TASKS: usize = 10_000;
        let (task_sender, ready_queue) = tarantool::coio::channel(MAX_QUEUED_TASKS);
        SENDER.with(|mut s| s.borrow_mut().replace(task_sender));
        Self { ready_queue }
    }
    fn run(&self) {
        while let Some(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = futures::task::waker_ref(&task);
                let context = &mut std::task::Context::from_waker(&*waker);
                if let std::task::Poll::Pending = future.as_mut().poll(context) {
                    *future_slot = Some(future);
                }
            }
        }
    }
}

struct Task {
    future: Mutex<Option<futures::future::BoxFuture<'static, ()>>>,
}
impl futures::task::ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        SENDER.with(|s| {
            if let Some(task_sender) = &mut *s.borrow_mut() {
                task_sender.send(cloned).expect("too many tasks queued");
            }
        });
    }
}


// --------------------- Sleep Future --------------------- //
struct TntSleepState {
    completed: bool,
    waker: Option<std::task::Waker>,
}

pub struct TntSleep {
    shared_state: Arc<Mutex<TntSleepState>>,
}
impl std::future::Future for TntSleep {
    type Output = ();
    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            std::task::Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            std::task::Poll::Pending
        }
    }
}
impl TntSleep {
    pub fn new(duration: std::time::Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(TntSleepState { completed: false, waker: None, }));

        let mut fiber = tarantool::fiber::Fiber::new("", &mut |mut args: Box<(std::time::Duration, Arc<Mutex<TntSleepState>>)>| {
            use std::borrow::BorrowMut;
            let (duration,thread_shared_state) = args.borrow_mut();
            tarantool::fiber::sleep(duration.as_secs_f64());
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
            0
        });
        fiber.start((duration, shared_state.clone()));

        Self { shared_state }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions