diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index ff99e364a..69fae490c 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -1026,21 +1026,46 @@ impl Store { /// # } /// ``` pub async fn keys(&self) -> Result { - let subject = format!("{}>", self.prefix.as_str()); + self.keys_with_filters(vec![">"]).await + } - let consumer = self - .stream - .create_consumer(super::consumer::push::OrderedConfig { + pub async fn keys_with_filters( + &self, + filters: impl IntoIterator, + ) -> Result { + let mut config: super::consumer::push::OrderedConfig = + super::consumer::push::OrderedConfig { deliver_subject: self.stream.context.client.new_inbox(), description: Some("kv history consumer".to_string()), - filter_subject: subject, headers_only: true, replay_policy: super::consumer::ReplayPolicy::Instant, // We only need to know the latest state for each key, not the whole history deliver_policy: DeliverPolicy::LastPerSubject, ..Default::default() - }) - .await?; + }; + + let mut filters = filters.into_iter().map(|f| format!("{}{}", self.prefix, f)); + + match (filters.next(), filters.next()) { + (Some(first), None) => { + config.filter_subject = first; + } + (Some(first), Some(_second)) => { + #[cfg(feature = "server_2_10")] + { + config.filter_subjects = vec![first, _second]; + config.filter_subjects.extend(filters); + } + #[cfg(not(feature = "server_2_10"))] + { + config.filter_subject = first; + // maybe a warning + } + } + _ => {} + } + + let consumer = self.stream.create_consumer(config).await?; let entries = History { done: consumer.info.num_pending == 0, diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 670c19127..917466679 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -795,6 +795,73 @@ mod kv { keys.sort(); assert_eq!(vec!["bar", "foo"], keys); + #[cfg(feature = "server_2_10")] + { + let mut keys_with_filter = kv + .keys_with_filters(vec!["bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filter.sort(); + assert_eq!(vec!["bar"], keys_with_filter); + + kv.put("foo1.bar", 37.to_string().into()).await.unwrap(); + kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap(); + kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap(); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo", "bar"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["bar", "foo"], keys_with_filters); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["foo1.*.*", "foo1.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + keys_with_filters.sort(); + assert_eq!( + vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"], + keys_with_filters + ); + + let mut keys_with_filters = kv + .keys_with_filters(vec!["*.baz.*"]) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + keys_with_filters.sort(); + assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters); + + // cleanup the keys + kv.delete("foo1.bar").await.unwrap(); + kv.delete("foo1.baz.boo").await.unwrap(); + kv.delete("foo1.baz.baz").await.unwrap(); + } + // filters like "foo.b*" should not return anything because it's not a valid filter + // Delete a key and make sure it doesn't show up in the keys list kv.delete("bar").await.unwrap(); let keys = kv