Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] PubSubService does not subscribe fast enough to the ActiveSubscription Sender and drops first message #1601

Open
tchardin opened this issue Nov 1, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@tchardin
Copy link

tchardin commented Nov 1, 2024

Component

provider, pubsub

What version of Alloy are you on?

alloy-provider v0.5.4

Operating System

macOS (Apple Silicon)

Describe the bug

When registering a JSON RPC subscription with WebSocket, the pubsub frontend sends an async request to the service loop to subscribe to the broadcast channel, by the time the receiver is returned by the frontend, any message that was sent by the server in the meantime is missed.

To reproduce, simply run this test:

  async fn run_server() -> eyre::Result<std::net::SocketAddr> {
        use jsonrpsee::server::{RpcModule, Server, SubscriptionMessage};

        let server = Server::builder().build("127.0.0.1:0").await?;
        let mut module = RpcModule::new(());
        module
            .register_subscription(
                "subscribe_hello",
                "s_hello",
                "unsubscribe_hello",
                |_, pending, _, _| async move {
                    let sub = pending.accept().await.unwrap();

                    for i in 0..usize::MAX {
                        let msg = SubscriptionMessage::from_json(&i).unwrap();
                        sub.send(msg).await.unwrap();
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
                    }

                    Ok(())
                },
            )
            .unwrap();
        let addr = server.local_addr()?;

        let handle = server.start(module);

        tokio::spawn(handle.stopped());

        Ok(addr)
    }

    #[tokio::test]
    async fn test_subscription() -> eyre::Result<()> {
        use alloy_provider::{Provider, ProviderBuilder};

        let addr = run_server().await?;

        let ws_provider = ProviderBuilder::new()
            .with_recommended_fillers()
            .on_builtin(&format!("ws://{}", addr).as_str())
            .await?;
        let mut request = ws_provider.client().request("subscribe_hello", ());
        // required if not eth_subscribe
        request.set_is_subscription();
        let sub_id = request.await?;
        // call the pubsub service to get a broadcast receiver.
        let mut sub = ws_provider.root().get_subscription(sub_id).await?;

        let num: usize = sub.recv().await.unwrap();
        assert_eq!(num, 0);

        Ok(())
    }

You will find the first message to be 1 which means the client missed the 0.

@tchardin tchardin added the bug Something isn't working label Nov 1, 2024
@mattsse
Copy link
Member

mattsse commented Nov 1, 2024

yeah this setup is a bit flawed here,

the only solution I can think of rn if we replace the set_is_subscription bool with the actual channel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants