From 90a675bf1152311e3f58b9ab42005df6ad530982 Mon Sep 17 00:00:00 2001 From: Quentin Gaborit Date: Tue, 9 Jul 2024 10:53:21 +0200 Subject: [PATCH 1/4] feat: Add 20 missing NATS consumer parameters --- src/connector/src/connector_common/common.rs | 56 ++++++++++- src/connector/src/source/nats/mod.rs | 98 +++++++++++++++++++ .../src/source/nats/source/reader.rs | 75 ++++++++++++++ 3 files changed, 227 insertions(+), 2 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 92bb7d9c30677..14ddb826da883 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -18,7 +18,7 @@ use std::io::Write; use std::time::Duration; use anyhow::{anyhow, Context}; -use async_nats::jetstream::consumer::DeliverPolicy; +use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy}; use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; @@ -630,6 +630,26 @@ impl NatsCommon { stream: String, split_id: String, start_sequence: NatsOffset, + durable_name: Option, + description: Option, + ack_policy: Option, + ack_wait: Option, + max_deliver: Option, + filter_subject: Option, + filter_subjects: Option>, + replay_policy: Option, + rate_limit: Option, + sample_frequency: Option, + max_waiting: Option, + max_ack_pending: Option, + _idle_heartbeat: Option, + max_batch: Option, + max_bytes: Option, + max_expires: Option, + inactive_threshold: Option, + num_replicas: Option, + memory_storage: Option, + backoff: Option>, ) -> ConnectorResult< async_nats::jetstream::consumer::Consumer, > { @@ -641,7 +661,24 @@ impl NatsCommon { .replace(['.', '>', '*', ' ', '\t'], "_"); let name = format!("risingwave-consumer-{}-{}", subject_name, split_id); let mut config = jetstream::consumer::pull::Config { - ack_policy: jetstream::consumer::AckPolicy::None, + durable_name, + description, + ack_wait: ack_wait.unwrap_or_default(), + max_deliver: max_deliver.unwrap_or_default(), + filter_subject: filter_subject.unwrap_or_default(), + filter_subjects: filter_subjects.unwrap_or_default(), + rate_limit: rate_limit.unwrap_or_default(), + sample_frequency: sample_frequency.unwrap_or_default(), + max_waiting: max_waiting.unwrap_or_default(), + max_ack_pending: max_ack_pending.unwrap_or_default(), + // idle_heartbeat: idle_heart.unwrap_or_default(), + max_batch: max_batch.unwrap_or_default(), + max_bytes: max_bytes.unwrap_or_default(), + max_expires: max_expires.unwrap_or_default(), + inactive_threshold: inactive_threshold.unwrap_or_default(), + memory_storage: memory_storage.unwrap_or_default(), + backoff: backoff.unwrap_or_default(), + num_replicas: num_replicas.unwrap_or_default(), ..Default::default() }; @@ -662,9 +699,24 @@ impl NatsCommon { }, NatsOffset::None => DeliverPolicy::All, }; + + let ack_policy = match ack_policy.as_deref() { + Some("all") => AckPolicy::All, + Some("explicit") => AckPolicy::Explicit, + _ => AckPolicy::None, + }; + + let replay_policy = match replay_policy.as_deref() { + Some("instant") => ReplayPolicy::Instant, + Some("original") => ReplayPolicy::Original, + _ => ReplayPolicy::Instant, + }; + let consumer = stream .get_or_create_consumer(&name, { config.deliver_policy = deliver_policy; + config.ack_policy = ack_policy; + config.replay_policy = replay_policy; config }) .await?; diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 7ef9c74ee7601..79ff8cc82b4c5 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -45,6 +45,104 @@ pub struct NatsProperties { #[serde(rename = "stream")] pub stream: String, + /// Setting `durable_name` to `Some(...)` will cause this consumer + /// to be "durable". This may be a good choice for workloads that + /// benefit from the `JetStream` server or cluster remembering the + /// progress of consumers for fault tolerance purposes. If a consumer + /// crashes, the `JetStream` server or cluster will remember which + /// messages the consumer acknowledged. When the consumer recovers, + /// this information will allow the consumer to resume processing + /// where it left off. If you're unsure, set this to `Some(...)`. + /// + /// Setting `durable_name` to `None` will cause this consumer to + /// be "ephemeral". This may be a good choice for workloads where + /// you don't need the `JetStream` server to remember the consumer's + /// progress in the case of a crash, such as certain "high churn" + /// workloads or workloads where a crashed instance is not required + /// to recover. + #[serde(rename = "consumer.durable_name")] + pub durable_name: Option, + + /// A short description of the purpose of this consumer. + #[serde(rename = "consumer.description")] + pub description: Option, + + /// How messages should be acknowledged + #[serde(rename = "consumer.ack_policy")] + pub ack_policy: Option, + + /// How long to allow messages to remain un-acknowledged before attempting redelivery + #[serde(rename = "consumer.ack_wait")] + pub ack_wait: Option, + + /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever. + #[serde(rename = "consumer.max_deliver")] + pub max_deliver: Option, + + /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards. + #[serde(rename = "consumer.filter_subject")] + pub filter_subject: Option, + + /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects. + #[serde(rename = "consumer.filter_subjects")] + pub filter_subjects: Option, + + /// Whether messages are sent as quickly as possible or at the rate of receipt + #[serde(rename = "consumer.replay_policy")] + pub replay_policy: Option, + + /// The rate of message delivery in bits per second + #[serde(rename = "consumer.rate_limit")] + pub rate_limit: Option, + + /// What percentage of acknowledgments should be samples for observability, 0-100 + #[serde(rename = "consumer.sample_frequency")] + pub sample_frequency: Option, + + /// The maximum number of waiting consumers. + #[serde(rename = "consumer.max_waiting")] + pub max_waiting: Option, + + /// The maximum number of unacknowledged messages that may be + /// in-flight before pausing sending additional messages to + /// this consumer. + #[serde(rename = "consumer.max_ack_pending")] + pub max_ack_pending: Option, + + /// The maximum number of unacknowledged messages that may be + /// in-flight before pausing sending additional messages to + /// this consumer. + #[serde(rename = "consumer.idle_heartbeat")] + pub idle_heartbeat: Option, + + /// Maximum size of a request batch + #[serde(rename = "consumer.max_batch")] + pub max_batch: Option, + + // / Maximum value of request max_bytes + #[serde(rename = "consumer.max_bytes")] + pub max_bytes: Option, + + /// Maximum value for request expiration + #[serde(rename = "consumer.max_expires")] + pub max_expires: Option, + + /// Threshold for consumer inactivity + #[serde(rename = "consumer.inactive_threshold")] + pub inactive_threshold: Option, + + /// Number of consumer replicas + #[serde(rename = "consumer.num.replicas", alias = "consumer.num_replicas")] + pub num_replicas: Option, + + /// Force consumer to use memory storage. + #[serde(rename = "consumer.memory_storage")] + pub memory_storage: Option, + + /// Custom backoff for missed acknowledgments. + #[serde(rename = "consumer.backoff")] + pub backoff: Option, + #[serde(flatten)] pub unknown_fields: HashMap, } diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 916378a263979..9921d74f4f393 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use anyhow::Context as _; use async_nats::jetstream::consumer; use async_trait::async_trait; @@ -85,8 +87,81 @@ impl SplitReader for NatsSplitReader { properties.stream.clone(), split_id.to_string(), start_position.clone(), + properties.durable_name.clone(), + properties.description.clone(), + properties.ack_policy.clone(), + properties.ack_wait.clone().map(|s| { + Duration::from_secs(s.parse::().expect("failed to parse ack_wait to u64")) + }), + properties.max_deliver.clone().map(|s| { + s.parse::() + .expect("failed to parse max_deliver to i64") + }), + properties.filter_subject.clone(), + properties + .filter_subjects + .clone() + .map(|s| s.split(',').map(|s| s.to_string()).collect()), + properties.replay_policy.clone(), + properties + .rate_limit + .clone() + .map(|s| s.parse::().expect("failed to parse rate_limit to u64")), + properties.sample_frequency.clone().map(|s| { + s.parse::() + .expect("failed to parse sample_frequency to u8") + }), + properties.max_waiting.clone().map(|s| { + s.parse::() + .expect("failed to parse max_waiting to i64") + }), + properties.max_ack_pending.clone().map(|s| { + s.parse::() + .expect("failed to parse max_ack_pending to i64") + }), + properties.idle_heartbeat.clone().map(|s| { + Duration::from_secs( + s.parse::() + .expect("failed to parse idle_heartbeat to u64"), + ) + }), + properties + .max_batch + .clone() + .map(|s| s.parse::().expect("failed to parse max_batch to i64")), + properties + .max_bytes + .clone() + .map(|s| s.parse::().expect("failed to parse max_bytes to i64")), + properties.max_expires.clone().map(|s| { + Duration::from_secs(s.parse::().expect("failed to parse ack_wait to u64")) + }), + properties.inactive_threshold.clone().map(|s| { + Duration::from_secs( + s.parse::() + .expect("failed to parse inactive_threshold to u64"), + ) + }), + properties.num_replicas.clone().map(|s| { + s.parse::() + .expect("failed to parse num_replicas to usize") + }), + properties.memory_storage.clone().map(|s| { + s.parse::() + .expect("failed to parse memory_storage to bool") + }), + properties.backoff.clone().map(|s| { + s.split(',') + .map(|s| { + Duration::from_secs( + s.parse::().expect("failed to parse backoff to u64"), + ) + }) + .collect() + }), ) .await?; + Ok(Self { consumer, properties, From 25494aaa091df2d6f05ebb7a87a212caccfc27e5 Mon Sep 17 00:00:00 2001 From: benjamin-awd Date: Thu, 22 Aug 2024 19:45:52 +0800 Subject: [PATCH 2/4] refactor: create separate struct for NATS consumer properties --- src/connector/src/connector_common/common.rs | 58 +--- src/connector/src/lib.rs | 19 ++ src/connector/src/source/nats/mod.rs | 317 ++++++++++++++---- .../src/source/nats/source/reader.rs | 80 +---- src/connector/src/with_options.rs | 3 + src/connector/with_options_source.yaml | 71 ++++ 6 files changed, 362 insertions(+), 186 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index c385a3e02cf78..b522ae2eda560 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -18,7 +18,7 @@ use std::io::Write; use std::time::Duration; use anyhow::{anyhow, Context}; -use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy}; +use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; @@ -639,26 +639,7 @@ impl NatsCommon { stream: String, split_id: String, start_sequence: NatsOffset, - durable_name: Option, - description: Option, - ack_policy: Option, - ack_wait: Option, - max_deliver: Option, - filter_subject: Option, - filter_subjects: Option>, - replay_policy: Option, - rate_limit: Option, - sample_frequency: Option, - max_waiting: Option, - max_ack_pending: Option, - _idle_heartbeat: Option, - max_batch: Option, - max_bytes: Option, - max_expires: Option, - inactive_threshold: Option, - num_replicas: Option, - memory_storage: Option, - backoff: Option>, + mut config: jetstream::consumer::pull::Config, ) -> ConnectorResult< async_nats::jetstream::consumer::Consumer, > { @@ -669,27 +650,6 @@ impl NatsCommon { .replace(',', "-") .replace(['.', '>', '*', ' ', '\t'], "_"); let name = format!("risingwave-consumer-{}-{}", subject_name, split_id); - let mut config = jetstream::consumer::pull::Config { - durable_name, - description, - ack_wait: ack_wait.unwrap_or_default(), - max_deliver: max_deliver.unwrap_or_default(), - filter_subject: filter_subject.unwrap_or_default(), - filter_subjects: filter_subjects.unwrap_or_default(), - rate_limit: rate_limit.unwrap_or_default(), - sample_frequency: sample_frequency.unwrap_or_default(), - max_waiting: max_waiting.unwrap_or_default(), - max_ack_pending: max_ack_pending.unwrap_or_default(), - // idle_heartbeat: idle_heart.unwrap_or_default(), - max_batch: max_batch.unwrap_or_default(), - max_bytes: max_bytes.unwrap_or_default(), - max_expires: max_expires.unwrap_or_default(), - inactive_threshold: inactive_threshold.unwrap_or_default(), - memory_storage: memory_storage.unwrap_or_default(), - backoff: backoff.unwrap_or_default(), - num_replicas: num_replicas.unwrap_or_default(), - ..Default::default() - }; let deliver_policy = match start_sequence { NatsOffset::Earliest => DeliverPolicy::All, @@ -709,23 +669,9 @@ impl NatsCommon { NatsOffset::None => DeliverPolicy::All, }; - let ack_policy = match ack_policy.as_deref() { - Some("all") => AckPolicy::All, - Some("explicit") => AckPolicy::Explicit, - _ => AckPolicy::None, - }; - - let replay_policy = match replay_policy.as_deref() { - Some("instant") => ReplayPolicy::Instant, - Some("original") => ReplayPolicy::Original, - _ => ReplayPolicy::Instant, - }; - let consumer = stream .get_or_create_consumer(&name, { config.deliver_policy = deliver_policy; - config.ack_policy = ack_policy; - config.replay_policy = replay_policy; config }) .await?; diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 02a3b8c84b50f..7dd9e4130f667 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -112,6 +112,25 @@ where } } +pub(crate) fn deserialize_duration_seq_from_string<'de, D>( + deserializer: D, +) -> std::result::Result>, D::Error> +where + D: de::Deserializer<'de>, +{ + let s: Option = de::Deserialize::deserialize(deserializer)?; + if let Some(s) = s { + let durations = s + .split(',') + .map(|s| s.trim().parse().map(Duration::from_secs)) + .collect::, _>>() + .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid duration")); + Ok(Some(durations?)) + } else { + Ok(None) + } +} + pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result where D: de::Deserializer<'de>, diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 79ff8cc82b4c5..f27b1c9dfb7eb 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -17,22 +17,55 @@ pub mod source; pub mod split; use std::collections::HashMap; +use std::time::Duration; +use async_nats::jetstream::consumer::pull::Config; +use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy}; use serde::Deserialize; +use serde_with::{serde_as, DisplayFromStr, DurationSeconds}; use with_options::WithOptions; use crate::connector_common::NatsCommon; use crate::source::nats::enumerator::NatsSplitEnumerator; use crate::source::nats::source::{NatsSplit, NatsSplitReader}; use crate::source::SourceProperties; +use crate::{deserialize_duration_seq_from_string, deserialize_optional_string_seq_from_string}; pub const NATS_CONNECTOR: &str = "nats"; +pub struct AckPolicyWrapper; + +impl AckPolicyWrapper { + pub fn parse_str(s: &str) -> AckPolicy { + match s { + "none" => AckPolicy::None, + "all" => AckPolicy::All, + "explicit" => AckPolicy::Explicit, + _ => AckPolicy::None, + } + } +} + +pub struct ReplayPolicyWrapper; + +impl ReplayPolicyWrapper { + pub fn parse_str(s: &str) -> ReplayPolicy { + match s { + "instant" => ReplayPolicy::Instant, + "original" => ReplayPolicy::Original, + _ => ReplayPolicy::Instant, + } + } +} + #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct NatsProperties { #[serde(flatten)] pub common: NatsCommon, + #[serde(flatten)] + pub nats_properties_consumer: NatsPropertiesConsumer, + #[serde(rename = "scan.startup.mode")] pub scan_startup_mode: Option, @@ -45,106 +78,175 @@ pub struct NatsProperties { #[serde(rename = "stream")] pub stream: String, - /// Setting `durable_name` to `Some(...)` will cause this consumer - /// to be "durable". This may be a good choice for workloads that - /// benefit from the `JetStream` server or cluster remembering the - /// progress of consumers for fault tolerance purposes. If a consumer - /// crashes, the `JetStream` server or cluster will remember which - /// messages the consumer acknowledged. When the consumer recovers, - /// this information will allow the consumer to resume processing - /// where it left off. If you're unsure, set this to `Some(...)`. - /// - /// Setting `durable_name` to `None` will cause this consumer to - /// be "ephemeral". This may be a good choice for workloads where - /// you don't need the `JetStream` server to remember the consumer's - /// progress in the case of a crash, such as certain "high churn" - /// workloads or workloads where a crashed instance is not required - /// to recover. + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl NatsProperties { + pub fn set_config(&self, c: &mut Config) { + self.nats_properties_consumer.set_config(c); + } +} + +/// Properties for the async-nats library. +/// See +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct NatsPropertiesConsumer { + #[serde(rename = "consumer.deliver_subject")] + pub deliver_subject: Option, + #[serde(rename = "consumer.durable_name")] pub durable_name: Option, - /// A short description of the purpose of this consumer. + #[serde(rename = "consumer.name")] + pub name: Option, + #[serde(rename = "consumer.description")] pub description: Option, - /// How messages should be acknowledged + #[serde(rename = "consumer.deliver_policy")] + #[serde_as(as = "Option")] + pub deliver_policy: Option, + #[serde(rename = "consumer.ack_policy")] + #[serde_as(as = "Option")] pub ack_policy: Option, - /// How long to allow messages to remain un-acknowledged before attempting redelivery #[serde(rename = "consumer.ack_wait")] - pub ack_wait: Option, + #[serde_as(as = "Option>")] + pub ack_wait: Option, - /// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever. #[serde(rename = "consumer.max_deliver")] - pub max_deliver: Option, + #[serde_as(as = "Option")] + pub max_deliver: Option, - /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards. #[serde(rename = "consumer.filter_subject")] pub filter_subject: Option, - /// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects. #[serde(rename = "consumer.filter_subjects")] - pub filter_subjects: Option, + #[serde(deserialize_with = "deserialize_optional_string_seq_from_string")] + pub filter_subjects: Option>, - /// Whether messages are sent as quickly as possible or at the rate of receipt #[serde(rename = "consumer.replay_policy")] + #[serde_as(as = "Option")] pub replay_policy: Option, - /// The rate of message delivery in bits per second #[serde(rename = "consumer.rate_limit")] - pub rate_limit: Option, + #[serde_as(as = "Option")] + pub rate_limit: Option, - /// What percentage of acknowledgments should be samples for observability, 0-100 #[serde(rename = "consumer.sample_frequency")] - pub sample_frequency: Option, + #[serde_as(as = "Option")] + pub sample_frequency: Option, - /// The maximum number of waiting consumers. #[serde(rename = "consumer.max_waiting")] - pub max_waiting: Option, + #[serde_as(as = "Option")] + pub max_waiting: Option, - /// The maximum number of unacknowledged messages that may be - /// in-flight before pausing sending additional messages to - /// this consumer. #[serde(rename = "consumer.max_ack_pending")] - pub max_ack_pending: Option, + #[serde_as(as = "Option")] + pub max_ack_pending: Option, - /// The maximum number of unacknowledged messages that may be - /// in-flight before pausing sending additional messages to - /// this consumer. - #[serde(rename = "consumer.idle_heartbeat")] - pub idle_heartbeat: Option, + #[serde(rename = "consumer.headers_only")] + #[serde_as(as = "Option")] + pub headers_only: Option, - /// Maximum size of a request batch #[serde(rename = "consumer.max_batch")] - pub max_batch: Option, + #[serde_as(as = "Option")] + pub max_batch: Option, - // / Maximum value of request max_bytes #[serde(rename = "consumer.max_bytes")] - pub max_bytes: Option, + #[serde_as(as = "Option")] + pub max_bytes: Option, - /// Maximum value for request expiration #[serde(rename = "consumer.max_expires")] - pub max_expires: Option, + #[serde_as(as = "Option>")] + pub max_expires: Option, - /// Threshold for consumer inactivity #[serde(rename = "consumer.inactive_threshold")] - pub inactive_threshold: Option, + #[serde_as(as = "Option>")] + pub inactive_threshold: Option, - /// Number of consumer replicas #[serde(rename = "consumer.num.replicas", alias = "consumer.num_replicas")] - pub num_replicas: Option, + #[serde_as(as = "Option")] + pub num_replicas: Option, - /// Force consumer to use memory storage. #[serde(rename = "consumer.memory_storage")] - pub memory_storage: Option, + #[serde_as(as = "Option")] + pub memory_storage: Option, - /// Custom backoff for missed acknowledgments. #[serde(rename = "consumer.backoff")] - pub backoff: Option, + #[serde(deserialize_with = "deserialize_duration_seq_from_string")] + pub backoff: Option>, +} - #[serde(flatten)] - pub unknown_fields: HashMap, +impl NatsPropertiesConsumer { + pub fn set_config(&self, c: &mut Config) { + if let Some(v) = &self.name { + c.name = Some(v.clone()) + } + if let Some(v) = &self.durable_name { + c.durable_name = Some(v.clone()) + } + if let Some(v) = &self.description { + c.description = Some(v.clone()) + } + if let Some(v) = &self.ack_policy { + c.ack_policy = AckPolicyWrapper::parse_str(v) + } + if let Some(v) = &self.ack_wait { + c.ack_wait = *v + } + if let Some(v) = &self.max_deliver { + c.max_deliver = *v + } + if let Some(v) = &self.filter_subject { + c.filter_subject = v.clone() + } + if let Some(v) = &self.filter_subjects { + c.filter_subjects = v.clone() + } + if let Some(v) = &self.replay_policy { + c.replay_policy = ReplayPolicyWrapper::parse_str(v) + } + if let Some(v) = &self.rate_limit { + c.rate_limit = *v + } + if let Some(v) = &self.sample_frequency { + c.sample_frequency = *v + } + if let Some(v) = &self.max_waiting { + c.max_waiting = *v + } + if let Some(v) = &self.max_ack_pending { + c.max_ack_pending = *v + } + if let Some(v) = &self.headers_only { + c.headers_only = *v + } + if let Some(v) = &self.max_batch { + c.max_batch = *v + } + if let Some(v) = &self.max_bytes { + c.max_bytes = *v + } + if let Some(v) = &self.max_expires { + c.max_expires = *v + } + if let Some(v) = &self.inactive_threshold { + c.inactive_threshold = *v + } + if let Some(v) = &self.num_replicas { + c.num_replicas = *v + } + if let Some(v) = &self.memory_storage { + c.memory_storage = *v + } + if let Some(v) = &self.backoff { + c.backoff = v.clone() + } + } } impl SourceProperties for NatsProperties { @@ -160,3 +262,106 @@ impl crate::source::UnknownFields for NatsProperties { self.unknown_fields.clone() } } + +#[cfg(test)] +mod test { + use std::collections::BTreeMap; + + use maplit::btreemap; + + use super::*; + + #[test] + fn test_parse_config_consumer() { + let config: BTreeMap = btreemap! { + "stream".to_string() => "risingwave".to_string(), + + // NATS common + "subject".to_string() => "subject1".to_string(), + "server_url".to_string() => "nats-server:4222".to_string(), + "connect_mode".to_string() => "plain".to_string(), + "type".to_string() => "append-only".to_string(), + + // NATS properties consumer + "consumer.name".to_string() => "foobar".to_string(), + "consumer.durable_name".to_string() => "durable_foobar".to_string(), + "consumer.description".to_string() => "A description".to_string(), + "consumer.ack_policy".to_string() => "all".to_string(), + "consumer.ack_wait".to_string() => "10".to_string(), + "consumer.max_deliver".to_string() => "10".to_string(), + "consumer.filter_subject".to_string() => "subject".to_string(), + "consumer.filter_subjects".to_string() => "subject1,subject2".to_string(), + "consumer.replay_policy".to_string() => "instant".to_string(), + "consumer.rate_limit".to_string() => "100".to_string(), + "consumer.sample_frequency".to_string() => "1".to_string(), + "consumer.max_waiting".to_string() => "5".to_string(), + "consumer.max_ack_pending".to_string() => "100".to_string(), + "consumer.headers_only".to_string() => "true".to_string(), + "consumer.max_batch".to_string() => "10".to_string(), + "consumer.max_bytes".to_string() => "1024".to_string(), + "consumer.max_expires".to_string() => "24".to_string(), + "consumer.inactive_threshold".to_string() => "10".to_string(), + "consumer.num_replicas".to_string() => "3".to_string(), + "consumer.memory_storage".to_string() => "true".to_string(), + "consumer.backoff".to_string() => "2,10,15".to_string(), + + }; + + let props: NatsProperties = + serde_json::from_value(serde_json::to_value(config).unwrap()).unwrap(); + + assert_eq!( + props.nats_properties_consumer.name, + Some("foobar".to_string()) + ); + assert_eq!( + props.nats_properties_consumer.durable_name, + Some("durable_foobar".to_string()) + ); + assert_eq!( + props.nats_properties_consumer.description, + Some("A description".to_string()) + ); + assert_eq!( + props.nats_properties_consumer.ack_policy, + Some("all".to_string()) + ); + assert_eq!( + props.nats_properties_consumer.ack_wait, + Some(Duration::from_secs(10)) + ); + assert_eq!( + props.nats_properties_consumer.filter_subjects, + Some(vec!["subject1".to_string(), "subject2".to_string()]) + ); + assert_eq!( + props.nats_properties_consumer.replay_policy, + Some("instant".to_string()) + ); + assert_eq!(props.nats_properties_consumer.rate_limit, Some(100)); + assert_eq!(props.nats_properties_consumer.sample_frequency, Some(1)); + assert_eq!(props.nats_properties_consumer.max_waiting, Some(5)); + assert_eq!(props.nats_properties_consumer.max_ack_pending, Some(100)); + assert_eq!(props.nats_properties_consumer.headers_only, Some(true)); + assert_eq!(props.nats_properties_consumer.max_batch, Some(10)); + assert_eq!(props.nats_properties_consumer.max_bytes, Some(1024)); + assert_eq!( + props.nats_properties_consumer.max_expires, + Some(Duration::from_secs(24)) + ); + assert_eq!( + props.nats_properties_consumer.inactive_threshold, + Some(Duration::from_secs(10)) + ); + assert_eq!(props.nats_properties_consumer.num_replicas, Some(3)); + assert_eq!(props.nats_properties_consumer.memory_storage, Some(true)); + assert_eq!( + props.nats_properties_consumer.backoff, + Some(vec![ + Duration::from_secs(2), + Duration::from_secs(10), + Duration::from_secs(15) + ]) + ); + } +} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 9921d74f4f393..45d13017e0ada 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use anyhow::Context as _; use async_nats::jetstream::consumer; use async_trait::async_trait; @@ -81,84 +79,18 @@ impl SplitReader for NatsSplitReader { start_position => start_position.to_owned(), }; + let mut config = consumer::pull::Config { + ..Default::default() + }; + properties.set_config(&mut config); + let consumer = properties .common .build_consumer( properties.stream.clone(), split_id.to_string(), start_position.clone(), - properties.durable_name.clone(), - properties.description.clone(), - properties.ack_policy.clone(), - properties.ack_wait.clone().map(|s| { - Duration::from_secs(s.parse::().expect("failed to parse ack_wait to u64")) - }), - properties.max_deliver.clone().map(|s| { - s.parse::() - .expect("failed to parse max_deliver to i64") - }), - properties.filter_subject.clone(), - properties - .filter_subjects - .clone() - .map(|s| s.split(',').map(|s| s.to_string()).collect()), - properties.replay_policy.clone(), - properties - .rate_limit - .clone() - .map(|s| s.parse::().expect("failed to parse rate_limit to u64")), - properties.sample_frequency.clone().map(|s| { - s.parse::() - .expect("failed to parse sample_frequency to u8") - }), - properties.max_waiting.clone().map(|s| { - s.parse::() - .expect("failed to parse max_waiting to i64") - }), - properties.max_ack_pending.clone().map(|s| { - s.parse::() - .expect("failed to parse max_ack_pending to i64") - }), - properties.idle_heartbeat.clone().map(|s| { - Duration::from_secs( - s.parse::() - .expect("failed to parse idle_heartbeat to u64"), - ) - }), - properties - .max_batch - .clone() - .map(|s| s.parse::().expect("failed to parse max_batch to i64")), - properties - .max_bytes - .clone() - .map(|s| s.parse::().expect("failed to parse max_bytes to i64")), - properties.max_expires.clone().map(|s| { - Duration::from_secs(s.parse::().expect("failed to parse ack_wait to u64")) - }), - properties.inactive_threshold.clone().map(|s| { - Duration::from_secs( - s.parse::() - .expect("failed to parse inactive_threshold to u64"), - ) - }), - properties.num_replicas.clone().map(|s| { - s.parse::() - .expect("failed to parse num_replicas to usize") - }), - properties.memory_storage.clone().map(|s| { - s.parse::() - .expect("failed to parse memory_storage to bool") - }), - properties.backoff.clone().map(|s| { - s.split(',') - .map(|s| { - Duration::from_secs( - s.parse::().expect("failed to parse backoff to u64"), - ) - }) - .collect() - }), + config, ) .await?; diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index eef5ccbd9cbfa..50d5284e287d6 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; +use std::time::Duration; use risingwave_pb::secret::PbSecretRef; @@ -50,12 +51,14 @@ impl WithOptions impl WithOptions for Option {} impl WithOptions for Vec {} +impl WithOptions for Vec {} impl WithOptions for HashMap {} impl WithOptions for BTreeMap {} impl WithOptions for String {} impl WithOptions for bool {} impl WithOptions for usize {} +impl WithOptions for u8 {} impl WithOptions for u16 {} impl WithOptions for u32 {} impl WithOptions for u64 {} diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 4a208465265e7..97b7c11836054 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -507,6 +507,77 @@ NatsProperties: - name: max_message_size field_type: i32 required: false + - name: consumer.deliver_subject + field_type: String + required: false + - name: consumer.durable_name + field_type: String + required: false + - name: consumer.name + field_type: String + required: false + - name: consumer.description + field_type: String + required: false + - name: consumer.deliver_policy + field_type: String + required: false + - name: consumer.ack_policy + field_type: String + required: false + - name: consumer.ack_wait + field_type: Duration + required: false + - name: consumer.max_deliver + field_type: i64 + required: false + - name: consumer.filter_subject + field_type: String + required: false + - name: consumer.filter_subjects + field_type: Vec + required: false + - name: consumer.replay_policy + field_type: String + required: false + - name: consumer.rate_limit + field_type: u64 + required: false + - name: consumer.sample_frequency + field_type: u8 + required: false + - name: consumer.max_waiting + field_type: i64 + required: false + - name: consumer.max_ack_pending + field_type: i64 + required: false + - name: consumer.headers_only + field_type: bool + required: false + - name: consumer.max_batch + field_type: i64 + required: false + - name: consumer.max_bytes + field_type: i64 + required: false + - name: consumer.max_expires + field_type: Duration + required: false + - name: consumer.inactive_threshold + field_type: Duration + required: false + - name: consumer.num.replicas + field_type: usize + required: false + alias: + - consumer.num_replicas + - name: consumer.memory_storage + field_type: bool + required: false + - name: consumer.backoff + field_type: Vec + required: false - name: scan.startup.mode field_type: String required: false From d5f08583e1ea8fd00d608cb80a99a1713b2929ee Mon Sep 17 00:00:00 2001 From: Benjamin Dornel Date: Sun, 25 Aug 2024 12:14:17 +0800 Subject: [PATCH 3/4] refactor: raise error if policy is invalid --- src/connector/src/source/nats/mod.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index f27b1c9dfb7eb..b11dabadb3e40 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -36,12 +36,12 @@ pub const NATS_CONNECTOR: &str = "nats"; pub struct AckPolicyWrapper; impl AckPolicyWrapper { - pub fn parse_str(s: &str) -> AckPolicy { + pub fn parse_str(s: &str) -> Result { match s { - "none" => AckPolicy::None, - "all" => AckPolicy::All, - "explicit" => AckPolicy::Explicit, - _ => AckPolicy::None, + "none" => Ok(AckPolicy::None), + "all" => Ok(AckPolicy::All), + "explicit" => Ok(AckPolicy::Explicit), + _ => Err(format!("Invalid AckPolicy '{}'", s)), } } } @@ -49,11 +49,11 @@ impl AckPolicyWrapper { pub struct ReplayPolicyWrapper; impl ReplayPolicyWrapper { - pub fn parse_str(s: &str) -> ReplayPolicy { + pub fn parse_str(s: &str) -> Result { match s { - "instant" => ReplayPolicy::Instant, - "original" => ReplayPolicy::Original, - _ => ReplayPolicy::Instant, + "instant" => Ok(ReplayPolicy::Instant), + "original" => Ok(ReplayPolicy::Original), + _ => Err(format!("Invalid ReplayPolicy '{}'", s)), } } } @@ -193,7 +193,7 @@ impl NatsPropertiesConsumer { c.description = Some(v.clone()) } if let Some(v) = &self.ack_policy { - c.ack_policy = AckPolicyWrapper::parse_str(v) + c.ack_policy = AckPolicyWrapper::parse_str(v).unwrap() } if let Some(v) = &self.ack_wait { c.ack_wait = *v @@ -208,7 +208,7 @@ impl NatsPropertiesConsumer { c.filter_subjects = v.clone() } if let Some(v) = &self.replay_policy { - c.replay_policy = ReplayPolicyWrapper::parse_str(v) + c.replay_policy = ReplayPolicyWrapper::parse_str(v).unwrap() } if let Some(v) = &self.rate_limit { c.rate_limit = *v From f54cc24df9afa532a5ab62b9bb74a29b5157a314 Mon Sep 17 00:00:00 2001 From: Benjamin Dornel Date: Sun, 25 Aug 2024 12:51:13 +0800 Subject: [PATCH 4/4] refactor: rename time-related properties to *.sec --- src/connector/src/lib.rs | 14 +++--- src/connector/src/source/nats/mod.rs | 67 +++++++++++--------------- src/connector/src/with_options.rs | 3 +- src/connector/with_options_source.yaml | 16 +++--- 4 files changed, 44 insertions(+), 56 deletions(-) diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 7dd9e4130f667..7ec84b14088e9 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -112,20 +112,20 @@ where } } -pub(crate) fn deserialize_duration_seq_from_string<'de, D>( +pub(crate) fn deserialize_optional_u64_seq_from_string<'de, D>( deserializer: D, -) -> std::result::Result>, D::Error> +) -> std::result::Result>, D::Error> where D: de::Deserializer<'de>, { let s: Option = de::Deserialize::deserialize(deserializer)?; if let Some(s) = s { - let durations = s + let numbers = s .split(',') - .map(|s| s.trim().parse().map(Duration::from_secs)) - .collect::, _>>() - .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid duration")); - Ok(Some(durations?)) + .map(|s| s.trim().parse()) + .collect::, _>>() + .map_err(|_| de::Error::invalid_value(de::Unexpected::Str(&s), &"invalid number")); + Ok(Some(numbers?)) } else { Ok(None) } diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index b11dabadb3e40..0ba35d20269dc 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -22,14 +22,16 @@ use std::time::Duration; use async_nats::jetstream::consumer::pull::Config; use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy}; use serde::Deserialize; -use serde_with::{serde_as, DisplayFromStr, DurationSeconds}; +use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; use crate::connector_common::NatsCommon; use crate::source::nats::enumerator::NatsSplitEnumerator; use crate::source::nats::source::{NatsSplit, NatsSplitReader}; use crate::source::SourceProperties; -use crate::{deserialize_duration_seq_from_string, deserialize_optional_string_seq_from_string}; +use crate::{ + deserialize_optional_string_seq_from_string, deserialize_optional_u64_seq_from_string, +}; pub const NATS_CONNECTOR: &str = "nats"; @@ -113,9 +115,9 @@ pub struct NatsPropertiesConsumer { #[serde_as(as = "Option")] pub ack_policy: Option, - #[serde(rename = "consumer.ack_wait")] - #[serde_as(as = "Option>")] - pub ack_wait: Option, + #[serde(rename = "consumer.ack_wait.sec")] + #[serde_as(as = "Option")] + pub ack_wait: Option, #[serde(rename = "consumer.max_deliver")] #[serde_as(as = "Option")] @@ -160,13 +162,13 @@ pub struct NatsPropertiesConsumer { #[serde_as(as = "Option")] pub max_bytes: Option, - #[serde(rename = "consumer.max_expires")] - #[serde_as(as = "Option>")] - pub max_expires: Option, + #[serde(rename = "consumer.max_expires.sec")] + #[serde_as(as = "Option")] + pub max_expires: Option, - #[serde(rename = "consumer.inactive_threshold")] - #[serde_as(as = "Option>")] - pub inactive_threshold: Option, + #[serde(rename = "consumer.inactive_threshold.sec")] + #[serde_as(as = "Option")] + pub inactive_threshold: Option, #[serde(rename = "consumer.num.replicas", alias = "consumer.num_replicas")] #[serde_as(as = "Option")] @@ -176,9 +178,9 @@ pub struct NatsPropertiesConsumer { #[serde_as(as = "Option")] pub memory_storage: Option, - #[serde(rename = "consumer.backoff")] - #[serde(deserialize_with = "deserialize_duration_seq_from_string")] - pub backoff: Option>, + #[serde(rename = "consumer.backoff.sec")] + #[serde(deserialize_with = "deserialize_optional_u64_seq_from_string")] + pub backoff: Option>, } impl NatsPropertiesConsumer { @@ -196,7 +198,7 @@ impl NatsPropertiesConsumer { c.ack_policy = AckPolicyWrapper::parse_str(v).unwrap() } if let Some(v) = &self.ack_wait { - c.ack_wait = *v + c.ack_wait = Duration::from_secs(*v) } if let Some(v) = &self.max_deliver { c.max_deliver = *v @@ -232,10 +234,10 @@ impl NatsPropertiesConsumer { c.max_bytes = *v } if let Some(v) = &self.max_expires { - c.max_expires = *v + c.max_expires = Duration::from_secs(*v) } if let Some(v) = &self.inactive_threshold { - c.inactive_threshold = *v + c.inactive_threshold = Duration::from_secs(*v) } if let Some(v) = &self.num_replicas { c.num_replicas = *v @@ -244,7 +246,7 @@ impl NatsPropertiesConsumer { c.memory_storage = *v } if let Some(v) = &self.backoff { - c.backoff = v.clone() + c.backoff = v.iter().map(|&x| Duration::from_secs(x)).collect() } } } @@ -287,7 +289,7 @@ mod test { "consumer.durable_name".to_string() => "durable_foobar".to_string(), "consumer.description".to_string() => "A description".to_string(), "consumer.ack_policy".to_string() => "all".to_string(), - "consumer.ack_wait".to_string() => "10".to_string(), + "consumer.ack_wait.sec".to_string() => "10".to_string(), "consumer.max_deliver".to_string() => "10".to_string(), "consumer.filter_subject".to_string() => "subject".to_string(), "consumer.filter_subjects".to_string() => "subject1,subject2".to_string(), @@ -299,11 +301,11 @@ mod test { "consumer.headers_only".to_string() => "true".to_string(), "consumer.max_batch".to_string() => "10".to_string(), "consumer.max_bytes".to_string() => "1024".to_string(), - "consumer.max_expires".to_string() => "24".to_string(), - "consumer.inactive_threshold".to_string() => "10".to_string(), + "consumer.max_expires.sec".to_string() => "24".to_string(), + "consumer.inactive_threshold.sec".to_string() => "10".to_string(), "consumer.num_replicas".to_string() => "3".to_string(), "consumer.memory_storage".to_string() => "true".to_string(), - "consumer.backoff".to_string() => "2,10,15".to_string(), + "consumer.backoff.sec".to_string() => "2,10,15".to_string(), }; @@ -326,10 +328,7 @@ mod test { props.nats_properties_consumer.ack_policy, Some("all".to_string()) ); - assert_eq!( - props.nats_properties_consumer.ack_wait, - Some(Duration::from_secs(10)) - ); + assert_eq!(props.nats_properties_consumer.ack_wait, Some(10)); assert_eq!( props.nats_properties_consumer.filter_subjects, Some(vec!["subject1".to_string(), "subject2".to_string()]) @@ -345,23 +344,13 @@ mod test { assert_eq!(props.nats_properties_consumer.headers_only, Some(true)); assert_eq!(props.nats_properties_consumer.max_batch, Some(10)); assert_eq!(props.nats_properties_consumer.max_bytes, Some(1024)); - assert_eq!( - props.nats_properties_consumer.max_expires, - Some(Duration::from_secs(24)) - ); - assert_eq!( - props.nats_properties_consumer.inactive_threshold, - Some(Duration::from_secs(10)) - ); + assert_eq!(props.nats_properties_consumer.max_expires, Some(24)); + assert_eq!(props.nats_properties_consumer.inactive_threshold, Some(10)); assert_eq!(props.nats_properties_consumer.num_replicas, Some(3)); assert_eq!(props.nats_properties_consumer.memory_storage, Some(true)); assert_eq!( props.nats_properties_consumer.backoff, - Some(vec![ - Duration::from_secs(2), - Duration::from_secs(10), - Duration::from_secs(15) - ]) + Some(vec![2, 10, 15]) ); } } diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 50d5284e287d6..90dcfc5b1d88f 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::time::Duration; use risingwave_pb::secret::PbSecretRef; @@ -51,7 +50,7 @@ impl WithOptions impl WithOptions for Option {} impl WithOptions for Vec {} -impl WithOptions for Vec {} +impl WithOptions for Vec {} impl WithOptions for HashMap {} impl WithOptions for BTreeMap {} diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 97b7c11836054..b3f1a3769f19a 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -525,8 +525,8 @@ NatsProperties: - name: consumer.ack_policy field_type: String required: false - - name: consumer.ack_wait - field_type: Duration + - name: consumer.ack_wait.sec + field_type: u64 required: false - name: consumer.max_deliver field_type: i64 @@ -561,11 +561,11 @@ NatsProperties: - name: consumer.max_bytes field_type: i64 required: false - - name: consumer.max_expires - field_type: Duration + - name: consumer.max_expires.sec + field_type: u64 required: false - - name: consumer.inactive_threshold - field_type: Duration + - name: consumer.inactive_threshold.sec + field_type: u64 required: false - name: consumer.num.replicas field_type: usize @@ -575,8 +575,8 @@ NatsProperties: - name: consumer.memory_storage field_type: bool required: false - - name: consumer.backoff - field_type: Vec + - name: consumer.backoff.sec + field_type: Vec required: false - name: scan.startup.mode field_type: String