Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watch from revision for KV #1196

Merged
merged 2 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,4 @@ S2
inactive_threshold
max_ack_pending
footgun
KV
70 changes: 70 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,46 @@ impl Store {
.await
}

/// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, starting from
/// provided revision. This is useful to resume watching over big KV buckets without a need to
/// replay all the history.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let mut entries = kv.watch_from_revision("kv", 5).await?;
/// while let Some(entry) = entries.next().await {
/// println!("entry: {:?}", entry);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn watch_from_revision<T: AsRef<str>>(
&self,
key: T,
revision: u64,
) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(
key,
DeliverPolicy::ByStartSequence {
start_sequence: revision,
},
)
.await
}

/// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
/// values whenever there are changes for that key with as well as last value.
///
Expand Down Expand Up @@ -531,6 +571,36 @@ impl Store {
self.watch(ALL_KEYS).await
}

/// Creates a [futures::Stream] over [Entries][Entry] for all keys starting
/// from a provider revision. This can be useful when resuming watching over a big bucket
/// without the need to replay all the history.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let mut entries = kv.watch_all_from_revision(40).await?;
/// while let Some(entry) = entries.next().await {
/// println!("entry: {:?}", entry);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn watch_all_from_revision(&self, revision: u64) -> Result<Watch, WatchError> {
self.watch_from_revision(ALL_KEYS, revision).await
}

/// Retrieves the [Entry] for a given key from a bucket.
///
/// # Examples
Expand Down
46 changes: 46 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,52 @@ mod kv {
}
}
}

#[tokio::test]
async fn watch_from_revision() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "bucket".into(),
description: "test_description".into(),
history: 15,
..Default::default()
})
.await
.unwrap();

kv.put("foo", "bar".into()).await.unwrap();
kv.put("foo", "baz".into()).await.unwrap();
kv.put("bar", "foo".into()).await.unwrap();
kv.put("bar", "bar".into()).await.unwrap();
kv.put("baz", "foo".into()).await.unwrap();

let mut watch = kv.watch_from_revision("foo", 2).await.unwrap().take(1);
let key = watch.next().await.unwrap().unwrap();
assert_eq!(key.key, "foo");
assert_eq!(key.value, "baz".as_bytes());
assert_eq!(key.revision, 2);

let mut watch = kv.watch_all_from_revision(3).await.unwrap().take(3);
let key = watch.next().await.unwrap().unwrap();
assert_eq!(key.key, "bar");
assert_eq!(key.value, "foo".as_bytes());
assert_eq!(key.revision, 3);
let key = watch.next().await.unwrap().unwrap();
assert_eq!(key.key, "bar");
assert_eq!(key.value, "bar".as_bytes());
let key = watch.next().await.unwrap().unwrap();
assert_eq!(key.key, "baz");
assert_eq!(key.value, "foo".as_bytes());
}

#[tokio::test]
async fn keys() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down