Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Half-blocking method. Is it supported/possible? #271

Open
MageSlayer opened this issue Dec 14, 2022 · 11 comments
Open

Half-blocking method. Is it supported/possible? #271

MageSlayer opened this issue Dec 14, 2022 · 11 comments

Comments

@MageSlayer
Copy link

MageSlayer commented Dec 14, 2022

Hi

I am trying to implement "half-blocking" mode.
That is blocking write & non-blocking read.
Currently I use following code together with tungstenite. Blocking write is done using "write_inner". TcpStream is from "mio" crate.

Unfortunately, it does not work reliably.
I am getting various errors at handshake & later time.

I'd like to ask. If rust-websocket could be used to implement working scheme like that?

struct TcpCustomStream {
    s: TcpStream,

    block: bool,
    poll: Poll,
    events: Events,
}

impl TcpCustomStream {
    pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
        let a = if let Some(x) = addr.to_socket_addrs()?.next() {
            x
        } else {
            return Err(std::io::Error::new(
                std::io::ErrorKind::Other,
                "Tcp connect call failed",
            ));
        };
        match TcpStream::connect(a) {
            Ok(mut x) => {
                let poll = Poll::new()?;
                let events = Events::with_capacity(128);

                // Register the socket with `Poll`
                poll.registry()
                    .register(&mut x, Token(0), Interest::WRITABLE)?;

                Ok(Self {
                    s: x,
                    poll,
                    events,
                    block: true,
                })
            }
            Err(x) => Err(x),
        }
    }

    pub fn set_nonblocking(&mut self) -> io::Result<()> {
        self.block = false;
        Ok(())
    }

    pub fn wait_write(&mut self) -> io::Result<()> {
        loop {
            //println!("tcp wait_write poll");
            self.poll.poll(&mut self.events, None)?;

            for event in &self.events {
                if event.token() == Token(0) && event.is_writable() {
                    // The socket connected (probably, it could still be a spurious
                    // wakeup)
                    //println!("tcp wait_write ready");
                    return Ok(());
                }
            }
        }
    }

}

impl std::io::Read for TcpCustomStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            match self.s.read(buf) {
                Err(x) if self.block && would_block(&x) => {
                    //println!("tcp read would_block {:?}", x);
                    thread::yield_now();
                }
                // x @ Err(_) => {
                //     //println!("tcp read err {:?}", x);
                //     break x;
                // }
                x => break x,
            }
        }
    }
}

impl std::io::Write for TcpCustomStream {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        match self.s.write(buf) {
            // x @ Err(_) => {
            //     println!("tcp write err {:?}", x);
            //     x
            // }
            x => x,
        }
    }
    fn flush(&mut self) -> std::io::Result<()> {
        self.s.flush()
    }
}

... skipped
pub struct Client {
    socket: Option<WebSocket<TcpCustomStream>>,
}

fn would_block(err: &std::io::Error) -> bool {
    match err {
        x if (x.kind() == io::ErrorKind::WouldBlock)
            || (x.kind() == std::io::ErrorKind::Interrupted) =>
        {
            true
        }
        _ => false,
    }
}

impl Client {
    pub fn new() -> Self {
        Self {
            socket: None,
        }
    }

    pub fn connect<'a>(&mut self, uri_str: &'a str) -> Result<(), std::io::Error> {
        println!("Client started");

        let (mut client, _) = {
            let mut c = 10;
            loop {
                let req = Uri::from_maybe_shared(uri_str.to_string()).map_err(|_| {
                    std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid connect URI")
                })?;

                let host_port = req
                    .authority()
                    .ok_or(std::io::Error::new(
                        std::io::ErrorKind::InvalidInput,
                        "Invalid connect URI",
                    ))?
                    .as_str();

                let stream = TcpCustomStream::connect(host_port)?;
                println!("Client connected");

                match client(req, stream) {
                    Err(x) => {
                        println!("Connect {:?}", x);
                        if c == 1 {
                            return Err(std::io::Error::new(
                                std::io::ErrorKind::Other,
                                format!("Cannot connect to {}", uri_str),
                            ));
                        }
                        c -= 1;
                        thread::sleep(std::time::Duration::from_millis(1000));
                    }
                    Ok(x) => break x,
                }
            }
        };
        println!("WSClient connected");

        client.get_mut().set_nonblocking()?;
        println!("Set nonblocking mode");

        self.socket = Some(client);

        Ok(())
    }

    fn write_inner(socket: &mut WebSocket<TcpCustomStream>, m: Message) -> Result<(), Error> {
        match socket.write_message(m) {
            Err(tungstenite::Error::Io(e)) if would_block(&e) => {
                //println!("cl write would_block");
                return loop {
                    socket.get_mut().wait_write()?;
                    match socket.write_pending() {
                        Err(tungstenite::Error::Io(e)) if would_block(&e) => continue,
                        x => break x,
                    }
                };
            }
            x => {
                //println!("cl write {:?}", x);
                x
            }
        }
    }
@vi
Copy link
Member

vi commented Dec 15, 2022

It is not recommended to use rust-websocket (i.e. websocket crate) for new projects, you should stick with tungstenite.

If by "half-blocking" you mean writing messages to Websocket from sync world and reading from it from async (e.g. Tokio) world (or the same with reading and writing swapped), then you should use tokio-tungstenite in async mode and make a channel (e.g. tokio::sync::mpsc or flume) to bridge sync and async worlds.

You should specify you use case (how such "half-blocking" Websockets intergrate into a larger scheme) for a more specific advice.

@MageSlayer
Copy link
Author

MageSlayer commented Dec 15, 2022

You should specify you use case (how such "half-blocking" Websockets intergrate into a larger scheme) for a more specific advice.

My scheme is following:

  1. I have Rust server capable of accepting Websocket connections. It works & behaves ok. It's done using tungstenite.
  2. Client. I have third-party proprietary closed source system, which load dynlibs & call functions via C-like ffi interface. These calls are done in single thread & I cannot block there. Also I cannot spawn any additional threads there. So there is no any runtime (Rust or any other).

So the only approach is non-blocking single-thread C-like manner. That what I call half-blocking mode. So it's:

  • eager reading from socket if anything is there or flagging if no message received
  • blocking on socket write as it's expected to be rather quick.

My point is that reading is problematic here.
It requires some sort of "cursor" letting know other parts of system where it left off last time if message wasn't read entirely (buffering). Unfortunately I cannot see where this "cursor" might be implemented reliably in my scheme.

Perhaps same issue in tungstenite might be useful here snapview/tungstenite-rs#308

@vi
Copy link
Member

vi commented Dec 15, 2022

eager reading from socket if anything is there or flagging if no message received

What thread will wait for socket events?

flagging if no message received

What happens after such flagging? How will the application know when to attempt reading from the websocket next time?


blocking on socket write as it's expected to be rather quick

Note that if you Rust server (or network between the client and the server) is slow then that socket write gets length - backpressure.

@MageSlayer
Copy link
Author

What thread will wait for socket events?

It's the same single thread only available.

What happens after such flagging? How will the application know when to attempt reading from the websocket next time?

It's done by timer. So it's poor man's polling.

blocking on socket write as it's expected to be rather quick

Note that if you Rust server (or network between the client and the server) is slow then that socket write gets length - backpressure.

Yes. That's unfortunate, but I cannot see any other viable alternative.

@vi
Copy link
Member

vi commented Dec 15, 2022

It's done by timer. So it's poor man's polling.

OK, so we are in the hacks land.

In this case I would still use async/nonblocking everywhere (including for sending) by rolling customized low-level async utils (socket wrapper, timers) and maybe executor (though async-executor's try_tick() seems to do the trick on itself. Maybe futures_executor::LocalPool is even better match.).

This way the only tradeoffs I expect are:

  • Latency of read (and maybe written, in case of backpressure) messages - unavoidable for nonblocking without proper polling. The faster global timer ticks, the less the latency.
  • Extraneous syscalls when idling. The slower global timer ticks, the less extraneous syscalls.
  • Cannot just use easy high-level libraries e.g. for Tokio. They expect Runtime::block_on to be really running somewhere.

Other than that, from outside the plugin should look as if socket were used directly, without any additional file descriptors, threads and so on.


Here is my demo using async-tungstenite: https://gist.github.com/vi/28117c2583ea74d35babfcd6abbef9e6

It should handle backpressure properly.

Maybe there are ready-made crates for such use case, but sometimes it is simpler to write than to find.

@MageSlayer
Copy link
Author

That's a real gift (not gist) :)
I'll try that asap!

@MageSlayer
Copy link
Author

Hi again.
Sorry for late response, but I'd like to ask some more stupid questions :)

I'm not that experienced in async Rust, so it looks like "subtask1" & "subtask2" are doing async send/receive. Thus a question - how can I "drive" them synchronously?

I mean - should I build an intermediate queue between sync & async code to pass values when sending/receiving from host application? Making use of something like following does not seem to be right as the "main loop" is hidden behind self.exe.try_tick().

let send_fut = c_tx.send(Message::Text(format!("Hello, {}", 1))); //.await;
block_on(send_fut)?;

@vi
Copy link
Member

vi commented Dec 18, 2022

I'm not that experienced in async Rust, so it looks like "subtask1" & "subtask2" are doing async send/receive. Thus a question - how can I "drive" them synchronously?

What do you mean "to drive synchronously"?

If you want to interact with sync code, you'll probably need a channel like flume. This channel can be sync from one side and async from the other side. Maybe it would just work as is.


For driving subtasks simultaneously futures::future::select is only one of the ways. Just adding more tasks to the executor (i.e. multiple exe.spawns) would probably be better.


Here is my second demo that shows some of the ideas above applied:

https://gist.github.com/vi/39607d1963b069a5167099f3fbffebf4

  • Other executor
  • Explicit list of wakers instead of relying on executor's implementation details in case of wakeup flood
  • Channels to communicate with sync world - flume does not require any additional hacks.
  • Multiple tasks instead of one big multi-pronged task.

@MageSlayer
Copy link
Author

MageSlayer commented Dec 18, 2022

If you want to interact with sync code, you'll probably need a channel like flume. This channel can be sync from one side and async from the other side. Maybe it would just work as is.

Yes. I'd like send/receive values to/from async part into/from sync functions.

Here is my second demo that shows some of the ideas above applied:

Thanks a lot for details. So, I guess the right way to emulate "non-blocking receive" is to read channel after doing following. Right?

self.wakers.wake_all();
self.exe.run_until_stalled();

@vi
Copy link
Member

vi commented Dec 19, 2022

Thanks a lot for details. So, I guess the right way to emulate "non-blocking receive" is to read channel after doing following. Right?

Yes, using flume::Receiver::try_recv.

Note that if you want to do more tricky things (timeouts, retries, reconnects, simultaneous things) while staying single-threaded&nonblocking then you may prefer doing them within async world and only deliver final result to sync when needed.

@MageSlayer
Copy link
Author

Note that if you want to do more tricky things (timeouts, retries, reconnects, simultaneous things) while staying single-threaded&nonblocking then you may prefer doing them within async world and only deliver final result to sync when needed.

Yes. I guess it looks possible now with your help.
Thanks a lot for your help and especially examples.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants