Skip to content

Commit

Permalink
Fix de/serialize sample_frequency correctly for Push and Pull Consu…
Browse files Browse the repository at this point in the history
…mers

Co-authored-by: Benjamin Sparks <b.sparks@alugha.com>
  • Loading branch information
bengsparks and Benjamin Sparks authored Aug 21, 2024
1 parent 053944d commit f044e06
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 5 deletions.
27 changes: 26 additions & 1 deletion async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(default, skip_serializing_if = "is_default")]
#[serde(
rename = "sample_freq",
with = "from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
/// The maximum number of waiting consumers.
#[serde(default, skip_serializing_if = "is_default")]
Expand Down Expand Up @@ -428,6 +433,26 @@ fn is_default<T: Default + Eq>(t: &T) -> bool {
t == &T::default()
}

pub(crate) mod from_str {
pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
D: serde::Deserializer<'de>,
{
let s = <String as serde::Deserialize>::deserialize(deserializer)?;
T::from_str(&s).map_err(serde::de::Error::custom)
}

pub(crate) fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: std::fmt::Display,
S: serde::Serializer,
{
serializer.serialize_str(&value.to_string())
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum StreamErrorKind {
TimedOut,
Expand Down
14 changes: 12 additions & 2 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,12 @@ pub struct OrderedConfig {
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(default, skip_serializing_if = "is_default")]
#[serde(
rename = "sample_freq",
with = "super::from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
/// Only deliver headers without payloads.
#[serde(default, skip_serializing_if = "is_default")]
Expand Down Expand Up @@ -2044,7 +2049,12 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(default, skip_serializing_if = "is_default")]
#[serde(
rename = "sample_freq",
with = "super::from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
/// The maximum number of waiting consumers.
#[serde(default, skip_serializing_if = "is_default")]
Expand Down
14 changes: 12 additions & 2 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,12 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(default, skip_serializing_if = "is_default")]
#[serde(
rename = "sample_freq",
with = "super::from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
/// The maximum number of waiting consumers.
#[serde(default, skip_serializing_if = "is_default")]
Expand Down Expand Up @@ -382,7 +387,12 @@ pub struct OrderedConfig {
#[serde(default, skip_serializing_if = "is_default")]
pub rate_limit: u64,
/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(default, skip_serializing_if = "is_default")]
#[serde(
rename = "sample_freq",
with = "super::from_str",
default,
skip_serializing_if = "is_default"
)]
pub sample_frequency: u8,
/// Only deliver headers without payloads.
#[serde(default, skip_serializing_if = "is_default")]
Expand Down
53 changes: 53 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,59 @@ mod jetstream {
}
}

#[tokio::test]
async fn consumer_configs_sample_frequency() {
let server = nats_server::run_server("tests/configs/jetstream.conf");

let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
.connect(server.client_url())
.await
.unwrap();

let js = async_nats::jetstream::new(client.clone());

let stream = js
.create_stream(stream::Config {
name: "StreamWithSampledConsumers".into(),
..Default::default()
})
.await
.unwrap();

{
let consumer = stream
.create_consumer(consumer::pull::Config {
name: Some("SampledPullConsumer".into()),
description: Some(
"See below to check that Ack Sampling has been set to 100%!".to_string(),
),
sample_frequency: 100, // <--- sample all the messages
..Default::default()
})
.await
.unwrap();

assert_eq!(100, consumer.cached_info().config.sample_frequency);
}

{
let consumer = stream
.create_consumer(consumer::pull::Config {
name: Some("SampledPushConsumer".into()),
description: Some(
"See below to check that Ack Sampling has been set to 100%!".to_string(),
),
sample_frequency: 100, // <--- sample all the messages
..Default::default()
})
.await
.unwrap();

assert_eq!(100, consumer.cached_info().config.sample_frequency);
}
}

#[tokio::test]
async fn timeout_out_request() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit f044e06

Please sign in to comment.