diff --git a/.config/nats.dic b/.config/nats.dic index 8cb2484a3..fa6496d6d 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -153,3 +153,4 @@ update_consumer update_consumer_on_stream create_consumer_strict create_consumer_strict_on_stream +leafnodes diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index d09618f0f..47ab987e8 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -115,6 +115,10 @@ impl Client { /// Returns true if the server version is compatible with the version components. /// + /// This has to be used with caution, as it is not guaranteed that the server + /// that client is connected to is the same version that the one that is + /// a JetStream meta/stream/consumer leader, especially across leafnodes. + /// /// # Examples /// /// ```no_run @@ -128,7 +132,10 @@ impl Client { pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool { let info = self.server_info(); - let server_version_captures = VERSION_RE.captures(&info.version).unwrap(); + let server_version_captures = match VERSION_RE.captures(&info.version) { + Some(captures) => captures, + None => return false, + }; let server_major = server_version_captures .get(1) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 2b3b2aed5..15721432a 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -1003,32 +1003,17 @@ impl Context { let config = config.into_consumer_config(); let subject = { - if self.client.is_server_compatible(2, 9, 0) { - let filter = if config.filter_subject.is_empty() { - "".to_string() - } else { - format!(".{}", config.filter_subject) - }; - config - .name - .as_ref() - .or(config.durable_name.as_ref()) - .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter)) - .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref())) - } else if config.name.is_some() { - return Err(ConsumerError::with_source( - ConsumerErrorKind::Other, - "can't use consumer name with server < 2.9.0", - )); - } else if let Some(ref durable_name) = config.durable_name { - format!( - "CONSUMER.DURABLE.CREATE.{}.{}", - stream.as_ref(), - durable_name - ) + let filter = if config.filter_subject.is_empty() { + "".to_string() } else { - format!("CONSUMER.CREATE.{}", stream.as_ref()) - } + format!(".{}", config.filter_subject) + }; + config + .name + .as_ref() + .or(config.durable_name.as_ref()) + .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter)) + .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref())) }; match self diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index f2821e016..b1588852f 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -3633,4 +3633,25 @@ mod jetstream { assert_eq!(err.kind(), ConsumerUpdateErrorKind::DoesNotExist); } + + #[tokio::test] + async fn test_version_on_initial_connect() { + let client = async_nats::ConnectOptions::new() + .retry_on_initial_connect() + .connect("nats://localhost:4222") + .await + .unwrap(); + let jetstream = async_nats::jetstream::new(client.clone()); + + jetstream + .create_consumer_on_stream( + consumer::pull::Config { + durable_name: Some("name".to_string()), + ..Default::default() + }, + "events", + ) + .await + .expect_err("should fail but not panic because of lack of server info"); + } }