Skip to content

Commit

Permalink
Add ability to fetch stream subjects
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema authored Oct 24, 2024
1 parent b17d71d commit 1fbcfdd
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,5 @@ get_stream
get_stream_no_info
lifecycle
AtomicU64
with_deleted
StreamInfoBuilder
258 changes: 254 additions & 4 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
//
//! Manage operations on a [Stream], create/delete/update [Consumer].

#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
use std::{
collections::{self, HashMap},
fmt::{self, Debug, Display},
future::IntoFuture,
io::{self, ErrorKind},
Expand All @@ -31,7 +30,7 @@ use crate::{
use base64::engine::general_purpose::STANDARD;
use base64::engine::Engine;
use bytes::Bytes;
use futures::{future::BoxFuture, TryFutureExt};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};
Expand Down Expand Up @@ -192,6 +191,79 @@ impl<I> Stream<I> {
}
}

/// Retrieves [[Info]] from the server and returns a [[futures::Stream]] that allows
/// iterating over all subjects in the stream fetched via paged API.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let mut stream = jetstream.get_stream("events").await?;
///
/// let mut info = stream.info_with_subjects("events.>").await?;
///
/// while let Some((subject, count)) = info.try_next().await? {
/// println!("Subject: {} count: {}", subject, count);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn info_with_subjects<F: AsRef<str>>(
&self,
subjects_filter: F,
) -> Result<InfoWithSubjects, InfoError> {
let subjects_filter = subjects_filter.as_ref().to_string();
// TODO: validate the subject and decide if this should be a `Subject`
let info = stream_info_with_details(
self.context.clone(),
self.name.clone(),
0,
false,
subjects_filter.clone(),
)
.await?;

Ok(InfoWithSubjects::new(
self.context.clone(),
info,
subjects_filter,
))
}

/// Creates a builder that allows to customize `Stream::Info`.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let mut stream = jetstream.get_stream("events").await?;
///
/// let mut info = stream
/// .info_builder()
/// .with_deleted(true)
/// .subjects("events.>")
/// .fetch()
/// .await?;
///
/// while let Some((subject, count)) = info.try_next().await? {
/// println!("Subject: {} count: {}", subject, count);
/// }
/// # Ok(())
/// # }
/// ```
pub fn info_builder(&self) -> StreamInfoBuilder {
StreamInfoBuilder::new(self.context.clone(), self.name.clone())
}

/// Gets next message for a [Stream].
///
/// Requires a [Stream] with `allow_direct` set to `true`.
Expand Down Expand Up @@ -995,6 +1067,47 @@ impl<I> Stream<I> {
}
}

pub struct StreamInfoBuilder {
pub(crate) context: Context,
pub(crate) name: String,
pub(crate) deleted: bool,
pub(crate) subject: String,
}

impl StreamInfoBuilder {
fn new(context: Context, name: String) -> Self {
Self {
context,
name,
deleted: false,
subject: "".to_string(),
}
}

pub fn with_deleted(mut self, deleted: bool) -> Self {
self.deleted = deleted;
self
}

pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
self.subject = subject.into();
self
}

pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
let info = stream_info_with_details(
self.context.clone(),
self.name.clone(),
0,
self.deleted,
self.subject.clone(),
)
.await?;

Ok(InfoWithSubjects::new(self.context, info, self.subject))
}
}

/// `StreamConfig` determines the properties for a stream.
/// There are sensible defaults for most. If no subjects are
/// given the name will be used as the only subject.
Expand Down Expand Up @@ -1246,6 +1359,122 @@ pub enum StorageType {
Memory = 1,
}

async fn stream_info_with_details(
context: Context,
stream: String,
offset: usize,
deleted_details: bool,
subjects_filter: String,
) -> Result<Info, InfoError> {
let subject = format!("STREAM.INFO.{}", stream);

let payload = StreamInfoRequest {
offset,
deleted_details,
subjects_filter,
};

let response: Response<Info> = context.request(subject, &payload).await?;

match response {
Response::Ok(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
}

type InfoRequest = BoxFuture<'static, Result<Info, InfoError>>;

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct StreamInfoRequest {
offset: usize,
deleted_details: bool,
subjects_filter: String,
}

pub struct InfoWithSubjects {
stream: String,
context: Context,
pub info: Info,
offset: usize,
subjects: collections::hash_map::IntoIter<String, usize>,
info_request: Option<InfoRequest>,
subjects_filter: String,
pages_done: bool,
}

impl InfoWithSubjects {
pub fn new(context: Context, mut info: Info, subject: String) -> Self {
let subjects = info.state.subjects.take().unwrap_or_default();
let name = info.config.name.clone();
InfoWithSubjects {
context,
info,
pages_done: subjects.is_empty(),
offset: subjects.len(),
subjects: subjects.into_iter(),
subjects_filter: subject,
stream: name,
info_request: None,
}
}
}

impl futures::Stream for InfoWithSubjects {
type Item = Result<(String, usize), InfoError>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.subjects.next() {
Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
None => {
// If we have already requested all pages, stop the iterator.
if self.pages_done {
return Poll::Ready(None);
}
let stream = self.stream.clone();
let context = self.context.clone();
let subjects_filter = self.subjects_filter.clone();
let offset = self.offset;
match self
.info_request
.get_or_insert_with(|| {
Box::pin(stream_info_with_details(
context,
stream,
offset,
false,
subjects_filter,
))
})
.poll_unpin(cx)
{
Poll::Ready(resp) => match resp {
Ok(info) => {
let subjects = info.state.subjects.clone();
self.offset += subjects.as_ref().map_or_else(|| 0, |s| s.len());
self.info_request = None;
let subjects = subjects.unwrap_or_default();
self.subjects = info.state.subjects.unwrap_or_default().into_iter();
let total = info.paged_info.map(|info| info.total).unwrap_or(0);
if total <= self.offset || subjects.is_empty() {
self.pages_done = true;
}
match self.subjects.next() {
Some((subject, count)) => Poll::Ready(Some(Ok((subject, count)))),
None => Poll::Ready(None),
}
}
Err(err) => Poll::Ready(Some(Err(err))),
},
Poll::Pending => Poll::Pending,
}
}
}
}
}

/// Shows config and current state for this stream.
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Info {
Expand All @@ -1264,6 +1493,15 @@ pub struct Info {
/// Information about sources configs if present.
#[serde(default)]
pub sources: Vec<SourceInfo>,
#[serde(flatten)]
paged_info: Option<PagedInfo>,
}

#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct PagedInfo {
offset: usize,
total: usize,
limit: usize,
}

#[derive(Deserialize)]
Expand All @@ -1272,7 +1510,7 @@ pub struct DeleteStatus {
}

/// information about the given stream.
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct State {
/// The number of messages contained in this stream
pub messages: u64,
Expand All @@ -1292,6 +1530,18 @@ pub struct State {
pub last_timestamp: time::OffsetDateTime,
/// The number of consumers configured to consume this stream
pub consumer_count: usize,
/// The number of subjects in the stream
#[serde(default, rename = "num_subjects")]
pub subjects_count: u64,
/// The number of deleted messages in the stream
#[serde(default, rename = "num_deleted")]
pub deleted_count: Option<u64>,
/// The list of deleted subjects from the Stream.
/// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
#[serde(default)]
pub deleted: Option<Vec<u64>>,

pub(crate) subjects: Option<HashMap<String, usize>>,
}

/// A raw stream message in the representation it is stored.
Expand Down
Loading

0 comments on commit 1fbcfdd

Please sign in to comment.