Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: adds new filters functions #1281

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
39 changes: 32 additions & 7 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,21 +1026,46 @@ impl Store {
/// # }
/// ```
pub async fn keys(&self) -> Result<Keys, HistoryError> {
let subject = format!("{}>", self.prefix.as_str());
self.keys_with_filters(vec![">"]).await
}

let consumer = self
.stream
.create_consumer(super::consumer::push::OrderedConfig {
pub async fn keys_with_filters(
&self,
filters: impl IntoIterator<Item = &str>,
) -> Result<Keys, HistoryError> {
let mut config: super::consumer::push::OrderedConfig =
super::consumer::push::OrderedConfig {
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("kv history consumer".to_string()),
filter_subject: subject,
headers_only: true,
replay_policy: super::consumer::ReplayPolicy::Instant,
// We only need to know the latest state for each key, not the whole history
deliver_policy: DeliverPolicy::LastPerSubject,
..Default::default()
})
.await?;
};

let mut filters = filters.into_iter().map(|f| format!("{}{}", self.prefix, f));

match (filters.next(), filters.next()) {
(Some(first), None) => {
config.filter_subject = first;
}
(Some(first), Some(_second)) => {
#[cfg(feature = "server_2_10")]
{
config.filter_subjects = vec![first, _second];
config.filter_subjects.extend(filters);
}
#[cfg(not(feature = "server_2_10"))]
{
config.filter_subject = first;
// maybe a warning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the way to go.

As said before - copy-paste is the way to go, for now.

}
}
_ => {}
}

let consumer = self.stream.create_consumer(config).await?;

let entries = History {
done: consumer.info.num_pending == 0,
Expand Down
67 changes: 67 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,73 @@ mod kv {
keys.sort();
assert_eq!(vec!["bar", "foo"], keys);

#[cfg(feature = "server_2_10")]
{
let mut keys_with_filter = kv
.keys_with_filters(vec!["bar"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filter.sort();
assert_eq!(vec!["bar"], keys_with_filter);

kv.put("foo1.bar", 37.to_string().into()).await.unwrap();
kv.put("foo1.baz.boo", 73.to_string().into()).await.unwrap();
kv.put("foo1.baz.baz", 89.to_string().into()).await.unwrap();

let mut keys_with_filters = kv
.keys_with_filters(vec!["foo", "bar"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filters.sort();
assert_eq!(vec!["bar", "foo"], keys_with_filters);

let mut keys_with_filters = kv
.keys_with_filters(vec!["foo1.*.*"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filters.sort();
assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters);

let mut keys_with_filters = kv
.keys_with_filters(vec!["foo1.*.*", "foo1.*"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();
keys_with_filters.sort();
assert_eq!(
vec!["foo1.bar", "foo1.baz.baz", "foo1.baz.boo"],
keys_with_filters
);

let mut keys_with_filters = kv
.keys_with_filters(vec!["*.baz.*"])
.await
.unwrap()
.try_collect::<Vec<String>>()
.await
.unwrap();

keys_with_filters.sort();
assert_eq!(vec!["foo1.baz.baz", "foo1.baz.boo"], keys_with_filters);

// cleanup the keys
kv.delete("foo1.bar").await.unwrap();
kv.delete("foo1.baz.boo").await.unwrap();
kv.delete("foo1.baz.baz").await.unwrap();
}
// filters like "foo.b*" should not return anything because it's not a valid filter

// Delete a key and make sure it doesn't show up in the keys list
kv.delete("bar").await.unwrap();
let keys = kv
Expand Down
Loading