Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): add NATS source consumer parameters #17615

Merged
merged 5 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::io::Write;
use std::time::Duration;

use anyhow::{anyhow, Context};
use async_nats::jetstream::consumer::DeliverPolicy;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, ReplayPolicy};
use async_nats::jetstream::{self};
use aws_sdk_kinesis::Client as KinesisClient;
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
Expand Down Expand Up @@ -639,6 +639,26 @@ impl NatsCommon {
stream: String,
split_id: String,
start_sequence: NatsOffset,
durable_name: Option<String>,
description: Option<String>,
ack_policy: Option<String>,
ack_wait: Option<Duration>,
max_deliver: Option<i64>,
filter_subject: Option<String>,
filter_subjects: Option<Vec<String>>,
replay_policy: Option<String>,
rate_limit: Option<u64>,
sample_frequency: Option<u8>,
max_waiting: Option<i64>,
max_ack_pending: Option<i64>,
_idle_heartbeat: Option<Duration>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deprecated? May just remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the behavior of the missed idle heartbeat is a bit cryptic with the ‘async_nats’ crate and is a relatively important parameter user because it translates into error messages sent by the stream in the form of “missed idle heartbeat”, which can cause pipelines to fail. Not sure how to handle this one tbh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from the configuration, since it doesn't exist in the pull consumer config: https://docs.rs/async-nats/latest/async_nats/jetstream/consumer/pull/struct.Config.html

max_batch: Option<i64>,
max_bytes: Option<i64>,
max_expires: Option<Duration>,
inactive_threshold: Option<Duration>,
num_replicas: Option<usize>,
memory_storage: Option<bool>,
backoff: Option<Vec<Duration>>,
) -> ConnectorResult<
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
> {
Expand All @@ -650,7 +670,24 @@ impl NatsCommon {
.replace(['.', '>', '*', ' ', '\t'], "_");
let name = format!("risingwave-consumer-{}-{}", subject_name, split_id);
let mut config = jetstream::consumer::pull::Config {
ack_policy: jetstream::consumer::AckPolicy::None,
durable_name,
description,
ack_wait: ack_wait.unwrap_or_default(),
max_deliver: max_deliver.unwrap_or_default(),
filter_subject: filter_subject.unwrap_or_default(),
filter_subjects: filter_subjects.unwrap_or_default(),
rate_limit: rate_limit.unwrap_or_default(),
sample_frequency: sample_frequency.unwrap_or_default(),
max_waiting: max_waiting.unwrap_or_default(),
max_ack_pending: max_ack_pending.unwrap_or_default(),
// idle_heartbeat: idle_heart.unwrap_or_default(),
max_batch: max_batch.unwrap_or_default(),
max_bytes: max_bytes.unwrap_or_default(),
max_expires: max_expires.unwrap_or_default(),
inactive_threshold: inactive_threshold.unwrap_or_default(),
memory_storage: memory_storage.unwrap_or_default(),
backoff: backoff.unwrap_or_default(),
num_replicas: num_replicas.unwrap_or_default(),
..Default::default()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These .unwrap_or_default()s have subtly different semantics from ..Default::default(), although in current fact they behave the same because Config is just deriving Default but not manually implementing it.

I'm OK with this but we should be aware of the difference.

};

Expand All @@ -671,9 +708,24 @@ impl NatsCommon {
},
NatsOffset::None => DeliverPolicy::All,
};

let ack_policy = match ack_policy.as_deref() {
Some("all") => AckPolicy::All,
Some("explicit") => AckPolicy::Explicit,
_ => AckPolicy::None,
};

let replay_policy = match replay_policy.as_deref() {
Some("instant") => ReplayPolicy::Instant,
Some("original") => ReplayPolicy::Original,
_ => ReplayPolicy::Instant,
};

let consumer = stream
.get_or_create_consumer(&name, {
config.deliver_policy = deliver_policy;
config.ack_policy = ack_policy;
config.replay_policy = replay_policy;
config
})
.await?;
Expand Down
98 changes: 98 additions & 0 deletions src/connector/src/source/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,104 @@ pub struct NatsProperties {
#[serde(rename = "stream")]
pub stream: String,

/// Setting `durable_name` to `Some(...)` will cause this consumer
/// to be "durable". This may be a good choice for workloads that
/// benefit from the `JetStream` server or cluster remembering the
/// progress of consumers for fault tolerance purposes. If a consumer
/// crashes, the `JetStream` server or cluster will remember which
/// messages the consumer acknowledged. When the consumer recovers,
/// this information will allow the consumer to resume processing
/// where it left off. If you're unsure, set this to `Some(...)`.
///
/// Setting `durable_name` to `None` will cause this consumer to
/// be "ephemeral". This may be a good choice for workloads where
/// you don't need the `JetStream` server to remember the consumer's
/// progress in the case of a crash, such as certain "high churn"
/// workloads or workloads where a crashed instance is not required
/// to recover.
#[serde(rename = "consumer.durable_name")]
pub durable_name: Option<String>,

/// A short description of the purpose of this consumer.
#[serde(rename = "consumer.description")]
pub description: Option<String>,

/// How messages should be acknowledged
#[serde(rename = "consumer.ack_policy")]
pub ack_policy: Option<String>,

/// How long to allow messages to remain un-acknowledged before attempting redelivery
#[serde(rename = "consumer.ack_wait")]
pub ack_wait: Option<String>,

/// Maximum number of times a specific message will be delivered. Use this to avoid poison pill messages that repeatedly crash your consumer processes forever.
#[serde(rename = "consumer.max_deliver")]
pub max_deliver: Option<String>,

/// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards.
#[serde(rename = "consumer.filter_subject")]
pub filter_subject: Option<String>,

/// Fulfills the same role as [Config::filter_subject], but allows filtering by many subjects.
#[serde(rename = "consumer.filter_subjects")]
pub filter_subjects: Option<String>,

/// Whether messages are sent as quickly as possible or at the rate of receipt
#[serde(rename = "consumer.replay_policy")]
pub replay_policy: Option<String>,

/// The rate of message delivery in bits per second
#[serde(rename = "consumer.rate_limit")]
pub rate_limit: Option<String>,

/// What percentage of acknowledgments should be samples for observability, 0-100
#[serde(rename = "consumer.sample_frequency")]
pub sample_frequency: Option<String>,

/// The maximum number of waiting consumers.
#[serde(rename = "consumer.max_waiting")]
pub max_waiting: Option<String>,

/// The maximum number of unacknowledged messages that may be
/// in-flight before pausing sending additional messages to
/// this consumer.
#[serde(rename = "consumer.max_ack_pending")]
pub max_ack_pending: Option<String>,

/// The maximum number of unacknowledged messages that may be
/// in-flight before pausing sending additional messages to
/// this consumer.
#[serde(rename = "consumer.idle_heartbeat")]
pub idle_heartbeat: Option<String>,

/// Maximum size of a request batch
#[serde(rename = "consumer.max_batch")]
pub max_batch: Option<String>,

// / Maximum value of request max_bytes
#[serde(rename = "consumer.max_bytes")]
pub max_bytes: Option<String>,

/// Maximum value for request expiration
#[serde(rename = "consumer.max_expires")]
pub max_expires: Option<String>,

/// Threshold for consumer inactivity
#[serde(rename = "consumer.inactive_threshold")]
pub inactive_threshold: Option<String>,

/// Number of consumer replicas
#[serde(rename = "consumer.num.replicas", alias = "consumer.num_replicas")]
pub num_replicas: Option<String>,

/// Force consumer to use memory storage.
#[serde(rename = "consumer.memory_storage")]
pub memory_storage: Option<String>,

/// Custom backoff for missed acknowledgments.
#[serde(rename = "consumer.backoff")]
pub backoff: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
Expand Down
75 changes: 75 additions & 0 deletions src/connector/src/source/nats/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use anyhow::Context as _;
use async_nats::jetstream::consumer;
use async_trait::async_trait;
Expand Down Expand Up @@ -85,8 +87,81 @@ impl SplitReader for NatsSplitReader {
properties.stream.clone(),
split_id.to_string(),
start_position.clone(),
properties.durable_name.clone(),
properties.description.clone(),
properties.ack_policy.clone(),
properties.ack_wait.clone().map(|s| {
Duration::from_secs(s.parse::<u64>().expect("failed to parse ack_wait to u64"))
}),
properties.max_deliver.clone().map(|s| {
s.parse::<i64>()
.expect("failed to parse max_deliver to i64")
}),
properties.filter_subject.clone(),
properties
.filter_subjects
.clone()
.map(|s| s.split(',').map(|s| s.to_string()).collect()),
Copy link
Member

@xxchan xxchan Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These don't look good. We'd better use typed properties, and parse the config when creating the properties struct.
So that CREATE SOURCE will reject invalid config, instead of failing at runtime.

e.g., using #[serde_as(as = "Option<DisplayFromStr>")] or #[serde(deserialize_with = ...)] like other connectors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed that seems much better, even without understanding the error related to config parsing that parsing seemed a bit sketchy. I’ll give it a try in the upcoming days @xxchan thank you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the consumer specific parameters into a separate struct, and added some typed properties based on @xxchan's suggestion

properties.replay_policy.clone(),
properties
.rate_limit
.clone()
.map(|s| s.parse::<u64>().expect("failed to parse rate_limit to u64")),
properties.sample_frequency.clone().map(|s| {
s.parse::<u8>()
.expect("failed to parse sample_frequency to u8")
}),
properties.max_waiting.clone().map(|s| {
s.parse::<i64>()
.expect("failed to parse max_waiting to i64")
}),
properties.max_ack_pending.clone().map(|s| {
s.parse::<i64>()
.expect("failed to parse max_ack_pending to i64")
}),
properties.idle_heartbeat.clone().map(|s| {
Duration::from_secs(
s.parse::<u64>()
.expect("failed to parse idle_heartbeat to u64"),
)
}),
properties
.max_batch
.clone()
.map(|s| s.parse::<i64>().expect("failed to parse max_batch to i64")),
properties
.max_bytes
.clone()
.map(|s| s.parse::<i64>().expect("failed to parse max_bytes to i64")),
properties.max_expires.clone().map(|s| {
Duration::from_secs(s.parse::<u64>().expect("failed to parse ack_wait to u64"))
}),
properties.inactive_threshold.clone().map(|s| {
Duration::from_secs(
s.parse::<u64>()
.expect("failed to parse inactive_threshold to u64"),
)
}),
properties.num_replicas.clone().map(|s| {
s.parse::<usize>()
.expect("failed to parse num_replicas to usize")
}),
properties.memory_storage.clone().map(|s| {
s.parse::<bool>()
.expect("failed to parse memory_storage to bool")
}),
properties.backoff.clone().map(|s| {
s.split(',')
.map(|s| {
Duration::from_secs(
s.parse::<u64>().expect("failed to parse backoff to u64"),
)
})
.collect()
}),
)
.await?;

Ok(Self {
consumer,
properties,
Expand Down
Loading