Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event stream #1239

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft

Event stream #1239

wants to merge 3 commits into from

Conversation

billy1624
Copy link
Member

PR Info

New Features

Breaking Changes

Changes

@billy1624 billy1624 self-assigned this Nov 21, 2022
@billy1624 billy1624 mentioned this pull request Nov 21, 2022
Cargo.toml Outdated Show resolved Hide resolved
Cargo.toml Outdated Show resolved Hide resolved
@tyt2y3
Copy link
Member

tyt2y3 commented Jan 2, 2023

https://github.com/jonhoo/bus might be a good option

@billy1624
Copy link
Member Author

billy1624 commented Jan 27, 2023

Hmmmm I think we need an async broadcast channel. How about https://crates.io/crates/async-broadcast?

@billy1624
Copy link
Member Author

Design choice:

  1. Representing model values in HashMap
#[derive(Debug, Clone)]
pub struct Event {
    pub entity_type_id: TypeId,
    pub action: EventAction,
    pub values: HashMap<String, Value>,
}
  1. Receiver side: filter by entity and then unpack the values
let mut tokio_receiver = db.set_event_stream(async_broadcast::broadcast(10));

while let Ok(event) = tokio_receiver.recv().await {
    // Filter by entity
    if event.of_entity::<cake::Entity>() {
        // Unpack the values
        if let Some(val) = event.values.get(cake::Column::Name.as_str()) {
            todo!()
        }
    }
}

@Diwakar-Gupta
Copy link
Contributor

Diwakar-Gupta commented Jan 30, 2023

Hmmmm I think we need an async broadcast channel. How about https://crates.io/crates/async-broadcast?

use async_broadcast::{broadcast, TryRecvError, Receiver};
use futures_lite::{future::block_on, stream::StreamExt};
 
 
fn main() {
    block_on(async move {
        let (s1, mut r1) = broadcast(2);
        let s2 = s1.clone();
        let mut r2 = r1.clone();
    
        // Send 2 messages from two different senders.
        s1.broadcast(7).await.unwrap();
        s2.broadcast(8).await.unwrap();
    
        // Channel is now at capacity so sending more messages will result in an error.
        assert!(s2.try_broadcast(9).unwrap_err().is_full());
        assert!(s1.try_broadcast(10).unwrap_err().is_full());
    
        // We can use `recv` method of the `Stream` implementation to receive messages.
        assert_eq!(r1.next().await.unwrap(), 7);
        assert_eq!(r1.recv().await.unwrap(), 8);
        assert_eq!(r2.next().await.unwrap(), 7);
        assert_eq!(r2.recv().await.unwrap(), 8);
    
        // All receiver got all messages so channel is now empty.
        assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
        assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
    
        // Close one receiver, which closes all the sender and receivers.
        Receiver::close(&r1);
 
        println!("{}", s1.is_closed()); // prints True
 
        s1.broadcast(10).await.unwrap(); // thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(..)', src/main.rs:34:32
        s2.broadcast(11).await.unwrap();
 
        println!("{}", r2.next().await.unwrap());
        println!("{}", r2.recv().await.unwrap());
 
        // println!("{}", r1.next().await.unwrap());
        // println!("{}", r1.recv().await.unwrap());
        
    })
}

There are 2 problems with this library

  1. when one the receiver is closed, it closed the channel, Hence no other receiver will get any event and sender wont be able to send event updates. Check after line 30.
  2. broadcast caches the messages ( number of cached messages can be set to 1 broadcast(cap = 2); ). tyt2y3 mentioned [PIP] Event stream #1123 (comment) that Notes 1: We should not buffer events. When no one is subscribing, we will discard all events.. So future subscribers can get past event notifications.

--------------EDITED--------------

Verifying if a subscriber get's past events.

Cargo.toml
futures = { version = "0.3", members=["futures-executor"] }
matches with the dev-dependencies in sea-orm but not with the dependencies.

use async_broadcast::{broadcast};
use futures::executor::block_on;

fn main() {
    block_on(async move {
        let (s1, mut r1) = broadcast(2);
        let s2 = s1.clone();
        
        // Event 7 should be received by only r1.
        s1.broadcast(7).await.unwrap();

        // new subscriber after event 7.
        let mut r2 = r1.clone();
        
        // event 8 should be received by both r1 and r2
        s2.broadcast(8).await.unwrap();

        // We can use `recv` method to receive messages.
        assert_eq!(r1.recv().await.unwrap(), 7);
        assert_eq!(r1.recv().await.unwrap(), 8);
        
        // r2 should receive events 8 and onwards.
        assert_eq!(r2.recv().await.unwrap(), 8); // this panics, received event is 7 which is past event for r2, it should be 8
    })
}

Future subscribers receive past event messages. Maybe all the receivers are reading from the same queue with their own index pointer.

@tyt2y3
Copy link
Member

tyt2y3 commented Jan 31, 2023

@Diwakar-Gupta thank you for the investigation. Your example is easy to read!

Close one receiver, which closes all the sender and receivers.

According to the docs, this is not the intended usage:

When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received.

Note that, dropping the Receiver is a different operation from manually closing it.

So future subscribers can get past event notifications.

Yes, I think this is a problem, although the example did not make it clear: we should clone r2 at the later point and verify the behaviour. Otherwise, I think it will be fine, because if the channel is closed, we will drop the Sender as well and do nothing.

@tyt2y3
Copy link
Member

tyt2y3 commented Jan 31, 2023

Are you also interested in investigating the bus crate?

Apart from semantics, an additional requirement is to be compatible with all async runtime we support.

@Diwakar-Gupta
Copy link
Contributor

Diwakar-Gupta commented Jan 31, 2023

Hi @tyt2y3 I did some work with bus.
About async-runtime I don't think bus uses async rather they depend on os-threads,. which is different from sea async support. Check this file https://github.com/jonhoo/bus/blob/main/src/lib.rs

Cargo.toml
bus = "2.3.0"

fn main() {
    use bus::Bus;
    let mut tx = Bus::new(2); // len = 2
    let mut rx1 = tx.add_rx();
    
    tx.broadcast(1);

    let mut rx2 = tx.add_rx(); // should not receive 1
    tx.broadcast(2);
    
    assert_eq!(rx1.recv(), Ok(1));
    assert_eq!(rx1.recv(), Ok(2));

    // rx2 don't receive past events
    assert_eq!(rx2.recv(), Ok(2));

    drop(rx2);

    tx.broadcast(3);
    assert_eq!(rx1.recv(), Ok(3));

    // send more events than len and don't consume value
    let mut rx3 = tx.add_rx();
    tx.broadcast(4);
    tx.broadcast(5);
    assert_eq!(tx.try_broadcast(6), Err(6));

    // 1 receiver consume's value 
    assert_eq!(rx3.recv(), Ok(4));
    assert_eq!(tx.try_broadcast(6), Err(6));

    // both receiver consumes value
    assert_eq!(rx1.recv(), Ok(4));
    assert_eq!(tx.try_broadcast(6), Ok(()));
}

My observation

  1. broadcast works perfectly fine with multiple receivers
  2. past event is not send to receiver.
  3. Drop works fine.
  4. when Bus::new(len = 2); len number of messages are send but not consumed yet broadcast will wait till eternity for all the receivers to consume at least one event value. In the official doc they have mentioned to have larger len. read fn new https://docs.rs/bus/2.3.0/bus/struct.Bus.html

One other think I wanted to ask should we consider the way javascript web handles event, it takes a function ( function or object in our case ) onclick(function) and calls it when event occurs.

@billy1624
Copy link
Member Author

Hey @Diwakar-Gupta, thanks for the experiments!! I didn't have time to do it myself just yet. So, glad you helped :))

  1. when one the receiver is closed, it closed the channel, Hence no other receiver will get any event and sender wont be able to send event updates. Check after line 30.

One shouldn't invoke Receiver::close() method. A Receiver will be dropped when it goes out of scope as defined in impl Drop for Receiver<T>. We want the sender keep alive when there is no receiver but I'm not sure the behaviour of it.

  1. broadcast caches the messages ( number of cached messages can be set to 1 broadcast(cap = 2); ). tyt2y3 mentioned [PIP] Event stream #1123 (comment) that Notes 1: We should not buffer events. When no one is subscribing, we will discard all events.. So future subscribers can get past event notifications.

I found that we cannot spawn a new receiver simply by calling .clone(). Use new_receiver() instead.

Also, can we try .set_await_active() to false? It's true by default. With that set, hopefully it won't "cache" past message (event).

@Diwakar-Gupta
Copy link
Contributor

Diwakar-Gupta commented Feb 3, 2023

@billy1624 @tyt2y3
Have a look at crossbeam-channel it provides mpmc feature and is also merged in rust's standard library here is pr . This can be a good option.
This is added in rust version 1.67 https://github.com/rust-lang/rust/releases/tag/1.67.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Triage
Development

Successfully merging this pull request may close these issues.

[PIP] Event stream
3 participants