Skip to content
This repository was archived by the owner on Oct 28, 2025. It is now read-only.
This repository was archived by the owner on Oct 28, 2025. It is now read-only.

std/tokio runtime -> tarantool coio runtime channels #10

@chertov

Description

@chertov

I want to send data from any rust thread to fiber in the tarantool thread.
But the coio channel works only in the tarantool thread. Right?
I am trying to implement a simple channel with system pipe and queue, but i don't know what is the best solution.
Please consider adding a similar feature.

my simple demo:

use std::collections::VecDeque;
use std::io::{Read, Write};
use std::os::unix::prelude::{AsRawFd, IntoRawFd};
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::io::AsyncWriteExt;

struct TNTChannelInner<T> {
    queue: Arc<RwLock<VecDeque<T>>>,
    tnt_tx: os_pipe::PipeWriter,
}

#[derive(Clone)]
pub struct TNTSender<T> {
    queue: Arc<RwLock<VecDeque<T>>>,
    tx: Arc<RwLock<os_pipe::PipeWriter>>,
}

impl<T> TNTSender<T> {
    pub fn send(&mut self, val: T) -> Result<(), anyhow::Error> {
        self.queue.write().push_back(val);
        use byteorder::WriteBytesExt;
        self.tx.write().write_u8(0);
        Ok(())
    }
}

pub struct TNTReceiver<T> {
    queue: Arc<RwLock<VecDeque<T>>>,
    rx: tarantool::coio::CoIOStream,
}
impl<T> TNTReceiver<T> {
    pub fn recv(&mut self) -> Result<Option<T>, anyhow::Error> {
        if let Some(val) = self.queue.write().pop_front() {
            tarantool::fiber::fiber_yield();
            return Ok(Some(val));
        }
        loop {
            use byteorder::ReadBytesExt;
            self.rx.read_u8()?;
            if let Some(val) = self.queue.write().pop_front() {
                return Ok(Some(val));
            }
        }
        Ok(None)
    }
}

pub fn channel<T>() -> Result<(TNTSender<T>, TNTReceiver<T>), anyhow::Error> {
    let queue = Arc::new(RwLock::new(VecDeque::<T>::new()));
    let (rx, tx) = os_pipe::pipe().unwrap();

    let rx = tarantool::coio::CoIOStream::new(rx.into_raw_fd()).unwrap();
    let rx = TNTReceiver { queue: queue.clone(), rx };

    let tx = Arc::new(RwLock::new(tx));
    let tx = TNTSender { queue, tx };

    Ok((tx, rx))
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions