From 12d8c0e1a5a34a566a5474bc3697f01d301f99b0 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 25 Jul 2024 10:49:16 +0200 Subject: [PATCH 1/4] Add stream_by_subject Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/context.rs | 76 ++++++++++++++++++++++++++++- async-nats/tests/jetstream_tests.rs | 25 +++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index b7c4e5185..b87ec330c 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -19,10 +19,12 @@ use crate::jetstream::account::Account; use crate::jetstream::publish::PublishAck; use crate::jetstream::response::Response; use crate::subject::ToSubject; -use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode}; +use crate::{ + header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode, +}; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{Future, TryFutureExt}; +use futures::{Future, StreamExt, TryFutureExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{self, json}; @@ -498,6 +500,52 @@ impl Context { } } + /// Looks up Stream that contains provided subject. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::TryStreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let stream_name = jetstream.stream_by_subject("foo.>"); + /// # Ok(()) + /// # } + /// ``` + pub async fn stream_by_subject>( + &self, + subject: T, + ) -> Result { + let subject = subject.into(); + if !is_valid_subject(subject.as_str()) { + return Err(GetStreamByNameError::new( + GetStreamByNameErrorKind::InvalidSubject, + )); + } + let mut names = StreamNames { + context: self.clone(), + offset: 0, + page_request: None, + streams: Vec::new(), + subject: Some(subject), + done: false, + }; + match names.next().await { + Some(name) => match name { + Ok(name) => Ok(name), + Err(err) => Err(GetStreamByNameError::with_source( + GetStreamByNameErrorKind::Request, + err, + )), + }, + None => Err(GetStreamByNameError::new( + GetStreamByNameErrorKind::NotFound, + )), + } + } + /// Lists names of all streams for current context. /// /// # Examples @@ -521,6 +569,7 @@ impl Context { offset: 0, page_request: None, streams: Vec::new(), + subject: None, done: false, } } @@ -1297,6 +1346,7 @@ pub struct StreamNames { context: Context, offset: usize, page_request: Option, + subject: Option, streams: Vec, done: bool, } @@ -1339,12 +1389,14 @@ impl futures::Stream for StreamNames { } let context = self.context.clone(); let offset = self.offset; + let subject = self.subject.clone(); self.page_request = Some(Box::pin(async move { match context .request( "STREAM.NAMES", &json!({ "offset": offset, + "subject": subject }), ) .await? @@ -1611,7 +1663,27 @@ impl Display for GetStreamErrorKind { } } +#[derive(Clone, Debug, PartialEq)] +pub enum GetStreamByNameErrorKind { + Request, + NotFound, + InvalidSubject, + JetStream(super::errors::Error), +} + +impl Display for GetStreamByNameErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Request => write!(f, "request error"), + Self::NotFound => write!(f, "stream not found"), + Self::InvalidSubject => write!(f, "invalid subject"), + Self::JetStream(err) => write!(f, "jetstream error: {}", err), + } + } +} + pub type GetStreamError = Error; +pub type GetStreamByNameError = Error; pub type UpdateStreamError = CreateStreamError; pub type UpdateStreamErrorKind = CreateStreamErrorKind; diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index b1588852f..2ba66a251 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -36,7 +36,7 @@ mod jetstream { self, push, AckPolicy, DeliverPolicy, Info, OrderedPullConsumer, OrderedPushConsumer, PullConsumer, PushConsumer, ReplayPolicy, }; - use async_nats::jetstream::context::{Publish, PublishErrorKind}; + use async_nats::jetstream::context::{GetStreamByNameErrorKind, Publish, PublishErrorKind}; use async_nats::jetstream::response::Response; use async_nats::jetstream::stream::{ self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DiscardPolicy, StorageType, @@ -3654,4 +3654,27 @@ mod jetstream { .await .expect_err("should fail but not panic because of lack of server info"); } + + #[tokio::test] + async fn test_stream_by_subject() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let stream = jetstream + .create_stream(stream::Config { + name: "events".to_string(), + subjects: vec!["events.>".to_string()], + ..Default::default() + }) + .await + .unwrap(); + + let stream_name = jetstream.stream_by_subject("events.>").await.unwrap(); + assert_eq!(stream_name, stream.cached_info().config.name); + + let err = jetstream.stream_by_subject("foo").await.unwrap_err(); + assert_eq!(err.kind(), GetStreamByNameErrorKind::NotFound); + } } From 56f7572d345416ce90b11e19059f287669f24ae1 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 25 Jul 2024 11:33:22 +0200 Subject: [PATCH 2/4] Improve subject validation Signed-off-by: Tomasz Pietrek --- async-nats/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index f6e20a527..a530d63a5 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -1555,9 +1555,11 @@ impl ToServerAddrs for &T { } pub(crate) fn is_valid_subject>(subject: T) -> bool { - !subject.as_ref().contains([' ', '.', '\r', '\n']) + let subject_str = subject.as_ref(); + !subject_str.starts_with('.') + && !subject_str.ends_with('.') + && subject_str.bytes().all(|c| !c.is_ascii_whitespace()) } - macro_rules! from_with_timeout { ($t:ty, $k:ty, $origin: ty, $origin_kind: ty) => { impl From<$origin> for $t { From 139749d7a64fef66058e9794c8f0c1763372a0ab Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 25 Jul 2024 12:08:26 +0200 Subject: [PATCH 3/4] Bump msrv Signed-off-by: Tomasz Pietrek --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a33688a39..0cd348b0c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -164,7 +164,7 @@ jobs: - name: Install msrv Rust on ubuntu-latest id: install-rust - uses: dtolnay/rust-toolchain@1.67.0 + uses: dtolnay/rust-toolchain@1.70.0 - name: Cache the build artifacts uses: Swatinem/rust-cache@v2 with: From 5a712b1c19c327045767ae34e03f52ae6a7fb24a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 26 Jul 2024 14:30:23 +0200 Subject: [PATCH 4/4] Bump time crate version Version 0.3.24 was causing error on minimal versions check: ``` --> /home/runner/.cargo/registry/src/index.crates.io-6f17d22bba15001f/time-0.3.24/src/format_description/parse/mod.rs:83:9 | 83 | let items = format_items | ^^^^^ ... 86 | Ok(items.into()) | ---- type must be known at this point | help: consider giving `items` an explicit type, where the placeholders `_` are specified | 83 | let items: Box<_> = format_items | ++++++++ ``` Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index d24573d32..6dd6db8e9 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -30,7 +30,7 @@ tokio-rustls = { version = "0.26", default-features = false } rustls-pemfile = "2" nuid = "0.5" serde_nanos = "0.1.3" -time = { version = "0.3.24", features = ["parsing", "formatting", "serde", "serde-well-known"] } +time = { version = "0.3.36", features = ["parsing", "formatting", "serde", "serde-well-known"] } rustls-native-certs = "0.7" tracing = "0.1" thiserror = "1.0"