Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does Nats jetstream support websocket as server address url? #1360

Open
yufansong opened this issue Jan 6, 2025 · 3 comments
Open

Does Nats jetstream support websocket as server address url? #1360

yufansong opened this issue Jan 6, 2025 · 3 comments
Labels
proposal Enhancement idea or proposal

Comments

@yufansong
Copy link

yufansong commented Jan 6, 2025

Proposed change

Support websocket in nats jetstream rust package.

Use case

In offical documentation, I find the native nats support the websocket as url here. But does the jetstream support it?
I wasnt able to make jetstream consumer lib to work with rust library.

I check the rust lib doc. Only find this and the corresponding code here. In there lib code, I didn't find other extra requirement for setting websocket. No extra function to set websocket, and seems their source code can directly get ws keywords in server address. But as we discussed before, maybe design for native nats rather than jetstream.

    pub fn from_url(url: Url) -> io::Result<Self> {
        if url.scheme() != "nats"
            && url.scheme() != "tls"
            && url.scheme() != "ws"
            && url.scheme() != "wss"
        {
            return Err(std::io::Error::new(
                ErrorKind::InvalidInput,
                format!("invalid scheme for NATS server URL: {}", url.scheme()),
            ));
        }

        Ok(Self(url))
    }

Contribution

No response

@yufansong yufansong added the proposal Enhancement idea or proposal label Jan 6, 2025
@Jarema
Copy link
Member

Jarema commented Jan 6, 2025

JetStream should work just fine.

Providing a scheme with wss or ws is enough.

This test run just fine:

    #[tokio::test]
    async fn jetstream() {
        let _server = nats_server::run_server("tests/configs/ws.conf");
        let client = async_nats::ConnectOptions::new()
            .retry_on_initial_connect()
            .connect("ws://localhost:8444")
            .await
            .unwrap();

        let jetstream = jetstream::new(client);

        jetstream
            .create_stream(jetstream::stream::Config {
                name: "foo".into(),
                subjects: vec!["foo.*".into()],
                storage: jetstream::stream::StorageType::File,
                ..Default::default()
            })
            .await
            .unwrap();

        let ack = jetstream
            .publish("foo.bar", "hello".into())
            .await
            .unwrap()
            .await
            .unwrap();

        println!("{:?}", ack);
    }

@PierreNowak
Copy link

If I use nats:// it works but with ws:// it fails.

For ws:// I had to add .retry_on_initial_connect() or else it didn't even get the Connected log, if I add it I get:

Connecting to NATS server.
Connected to NATS server.



Error: Error { kind: TimedOut, source: None }

It works fine with .connect("nats://localhost:4222") though

Could that be that it doesn't support credentials file ?

Here is the complete code:

use async_nats::jetstream;
use futures::StreamExt;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Connecting to NATS server.");
    // Connect to the NATS server using credentials
    let client = async_nats::ConnectOptions::with_credentials_file("client.creds")
    .await?
    .retry_on_initial_connect()
    .connect("nats://localhost:4222")
    // .connect("ws://localhost:8080")
    .await?;
    println!("Connected to NATS server.");

    // Create a JetStream context
    let jetstream = jetstream::new(client);


    let stream = jetstream
    .get_or_create_stream(async_nats::jetstream::stream::Config {
        name: "SENSOR_DATA".to_string(),
        ..Default::default()
    })
    .await?;

    let consumer = stream
        .get_or_create_consumer(
            "rust_consumer",
            async_nats::jetstream::consumer::pull::Config {
                durable_name: Some("rust_consumer".to_string()),
                ..Default::default()
            },
        )
        .await?;

    for _ in 0..100 {
        jetstream.publish("events", "data".into()).await?;
    }

    let mut messages = consumer.fetch().max_messages(200).messages().await?;
    while let Some(result) = messages.next().await {
        match result {
            Ok(message) => {
                println!("got message {:?}", message);
                if let Err(e) = message.ack().await {
                    eprintln!("Failed to acknowledge message: {:?}", e);
                }
            }
            Err(e) => {
                eprintln!("Error while receiving message: {:?}", e);
            }
        }
    }
    Ok(())

}


@Jarema
Copy link
Member

Jarema commented Jan 9, 2025

What is your server config?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal
Projects
None yet
Development

No branches or pull requests

3 participants