Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes around service discovery, engine and consumer. #327

Merged
merged 9 commits into from
Jan 6, 2025
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
pulsar-version: [ 2.10.4, 2.11.2, 3.0.4, 3.1.3 ]
pulsar-version: [ 2.10.6, 2.11.4, 3.0.8, 3.2.4, 3.3.3, 4.0.1 ]
steps:
- name: Start Pulsar Standalone Container
run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true apachepulsar/pulsar:${{ matrix.pulsar-version }} bin/pulsar standalone
Expand Down
53 changes: 25 additions & 28 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,50 @@ keywords = ["pulsar", "api", "client"]

[dependencies]
async-channel = "^2.3.1"
async-trait = "^0.1.81"
async-std = { version = "^1.12.0", features = ["attributes", "unstable"], optional = true }
async-trait = "^0.1.83"
async-std = { version = "^1.13.0", features = ["attributes", "unstable"], optional = true }
async-native-tls = { version = "^0.5.0", optional = true }
asynchronous-codec = { version = "^0.7.0", optional = true }
bytes = "^1.6.1"
bit-vec = "^0.8.0"
chrono = { version = "^0.4.38", default-features = false, features = ["clock", "std"] }
bytes = "^1.9.0"
chrono = { version = "^0.4.39", default-features = false, features = ["clock", "std"] }
crc = "^3.2.1"
data-url = { version = "^0.3.1", optional = true }
flate2 = { version = "^1.0.30", optional = true }
futures = "^0.3.30"
futures-io = "^0.3.30"
futures-timer = "^3.0.3"
flate2 = { version = "^1.0.35", optional = true }
futures = "^0.3.31"
futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
log = "^0.4.22"
lz4 = { version = "^1.26.0", optional = true }
lz4 = { version = "^1.28.0", optional = true }
native-tls = { version = "^0.2.12", optional = true }
nom = { version = "^7.1.3", default-features = false, features = ["alloc"] }
openidconnect = { version = "^3.5.0", optional = true }
oauth2 = { version = "^4.4.1", optional = true }
oauth2 = { version = "^4.4.2", optional = true }
pem = "^3.0.4"
prost = "^0.13.1"
prost-derive = "^0.13.1"
prost = "^0.13.4"
prost-derive = "^0.13.4"
rand = "^0.8.5"
regex = "^1.10.5"
rustls = { version = "^0.23.12", optional = true }
regex = "^1.11.1"
rustls = { version = "^0.23.20", optional = true }
snap = { version = "^1.1.1", optional = true }
serde = { version = "^1.0.204", features = ["derive"], optional = true }
serde_json = { version = "^1.0.121", optional = true }
tokio = { version = "^1.39.2", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.11", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.26.0", optional = true }
serde = { version = "^1.0.216", features = ["derive"], optional = true }
serde_json = { version = "^1.0.133", optional = true }
tokio = { version = "^1.42.0", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.13", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.26.1", optional = true }
tokio-native-tls = { version = "^0.3.1", optional = true }
tracing = { version = "^0.1.40", optional = true }
url = "^2.5.2"
uuid = { version = "^1.10.0", features = ["v4", "fast-rng"] }
webpki-roots = { version = "^0.26.3", optional = true }
tracing = { version = "^0.1.41", optional = true }
url = "^2.5.4"
uuid = { version = "^1.11.0", features = ["v4", "fast-rng"] }
webpki-roots = { version = "^0.26.7", optional = true }
zstd = { version = "^0.13.2", optional = true }

[dev-dependencies]
env_logger = "^0.11.5"
serde = { version = "^1.0.204", features = ["derive"] }
serde_json = "^1.0.121"
tokio = { version = "^1.39.2", features = ["macros", "rt-multi-thread"] }
serde = { version = "^1.0.216", features = ["derive"] }
serde_json = "^1.0.133"
tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
prost-build = "^0.13.1"
prost-build = "^0.13.4"
protobuf-src = { version = "^2.1.0", optional = true }

[features]
Expand Down
6 changes: 3 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl SerializeMessage for () {
}
}

impl<'a> SerializeMessage for &'a [u8] {
impl SerializeMessage for &[u8] {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
Ok(producer::Message {
Expand Down Expand Up @@ -102,7 +102,7 @@ impl SerializeMessage for String {
}
}

impl<'a> SerializeMessage for &'a String {
impl SerializeMessage for &String {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Expand All @@ -113,7 +113,7 @@ impl<'a> SerializeMessage for &'a String {
}
}

impl<'a> SerializeMessage for &'a str {
impl SerializeMessage for &str {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = input.as_bytes().to_vec();
Expand Down
13 changes: 12 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,18 +348,21 @@ impl<Exe: Executor> ConnectionSender<Exe> {
self.tx.send(messages::ping()).await?,
) {
(Ok(_), ()) => {
debug!("set timeout to {:?} for ping-pong", self.operation_timeout);
let delay_f = self.executor.delay(self.operation_timeout);
pin_mut!(response);
pin_mut!(delay_f);

match select(response, delay_f).await {
Either::Left((res, _)) => res
.map_err(|oneshot::Canceled| {
error!("connection-sender: send ping, we have been canceled");
self.error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
.map(move |_| trace!("received pong from {}", self.connection_id)),
Either::Right(_) => {
error!("connection-sender: send ping, we did not received pong inside the timed out");
self.error.set(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout when sending ping to the Pulsar server",
Expand Down Expand Up @@ -652,6 +655,10 @@ impl<Exe: Executor> ConnectionSender<Exe> {
response
.await
.map_err(|oneshot::Canceled| {
error!(
"response has been canceled (key = {:?}), we are disconnected",
k
);
error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
Expand All @@ -670,12 +677,16 @@ impl<Exe: Executor> ConnectionSender<Exe> {
let connection_id = self.connection_id;
let error = self.error.clone();
let delay_f = self.executor.delay(self.operation_timeout);
trace!(
"Create timeout futures with operation timeout at {:?}",
self.operation_timeout
);
let fut = async move {
pin_mut!(response);
pin_mut!(delay_f);
match select(response, delay_f).await {
Either::Left((res, _)) => {
// println!("recv msg: {:?}", res);
debug!("Received response: {:?}", res);
res
}
Either::Right(_) => {
Expand Down
6 changes: 4 additions & 2 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,15 @@ impl<Exe: Executor> ConnectionManager<Exe> {
// in a mutex, and a case appears where the Arc is cloned
// somewhere at the same time, that just means the manager
// will create a new connection the next time it is asked
let strong_count = Arc::strong_count(conn);
trace!(
"checking connection {}, is valid? {}, strong_count {}",
conn.id(),
conn.is_valid(),
Arc::strong_count(conn)
strong_count
);
conn.is_valid() && Arc::strong_count(conn) > 1

conn.is_valid() && strong_count > 1
}
});
}
Expand Down
8 changes: 3 additions & 5 deletions src/consumer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
// Checks the builder for inconsistencies
// returns a config and a list of topics with associated brokers
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn validate<T: DeserializeMessage>(
self,
) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
async fn validate(self) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
let ConsumerBuilder {
pulsar,
topics,
Expand Down Expand Up @@ -272,7 +270,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn build<T: DeserializeMessage>(self) -> Result<Consumer<T, Exe>, Error> {
// would this clone() consume too much memory?
let (config, joined_topics) = self.clone().validate::<T>().await?;
let (config, joined_topics) = self.clone().validate().await?;

let consumers = try_join_all(joined_topics.into_iter().map(|(topic, addr)| {
TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone())
Expand Down Expand Up @@ -322,7 +320,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn into_reader<T: DeserializeMessage>(self) -> Result<Reader<T, Exe>, Error> {
// would this clone() consume too much memory?
let (mut config, mut joined_topics) = self.clone().validate::<T>().await?;
let (mut config, mut joined_topics) = self.clone().validate().await?;

// Internally, the reader interface is implemented as a consumer using an exclusive,
// non-durable subscription
Expand Down
78 changes: 64 additions & 14 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ConsumerEngine<Exe: Executor> {
event_rx: mpsc::UnboundedReceiver<EngineEvent<Exe>>,
event_tx: UnboundedSender<EngineEvent<Exe>>,
batch_size: u32,
remaining_messages: u32,
remaining_messages: i64,
FlorentinDUBOIS marked this conversation as resolved.
Show resolved Hide resolved
unacked_message_redelivery_delay: Option<Duration>,
unacked_messages: HashMap<MessageIdData, Instant>,
dead_letter_policy: Option<DeadLetterPolicy>,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
event_rx,
event_tx,
batch_size,
remaining_messages: batch_size,
remaining_messages: batch_size as i64,
unacked_message_redelivery_delay,
unacked_messages: HashMap::new(),
dead_letter_policy,
Expand All @@ -108,10 +108,12 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
}
}
}

let send_end_res = event_tx.send(mapper(None)).await;
if let Err(err) = send_end_res {
log::error!("Error sending end event to channel - {err}");
}

log::warn!("rx terminated");
}))
}
Expand Down Expand Up @@ -146,37 +148,78 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
})?;
}

if self.remaining_messages < self.batch_size.div_ceil(2) {
// In the casual workflow, we use the `batch_size` as a maximum number that we could
// send using a send flow command message to ask the broker to send us
// messages, which is why we have a subtraction below (`batch_size` -
// `remaining_messages`).
//
// In the special case of batch messages (which is defined by clients), the number of
// messages could be greater than the given batch size and the remaining
// messages goes negative, which is why we use an `i64` as we want to keep
// track of the number of negative messages as the next send flow will be
// the batch_size - minus remaining messages which allow us to retrieve the casual
// workflow of flow command messages.
//
// Here is the example of it works for a batch_size at 1000 and the error case:
//
// ```
// Message (1) -> (batch_size = 1000, remaing_messages = 999, no flow message trigger)
// ... 499 messages later ...
// Message (1) -> (batch_size = 1000, remaing_messages = 499, flow message trigger, ask broker to send => batch_size - remaining_messages = 501)
// ... 200 messages later ...
// BatchMessage (1024) -> (batch_size = 1000, remaining_messages = 4294967096, no flow message trigger) [underflow on remaining messages - without the patch]
// BatchMessage (1024) -> (batch_size = 1000, remaining_messages = -1124, flow message trigger, ask broker to send => batch_size - remaining_messages = 2124) [no underflow on remaining messages - with the patch]
// ```
if self.remaining_messages < (self.batch_size.div_ceil(2) as i64) {
match self
.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)
.send_flow(
self.id,
(self.batch_size as i64 - self.remaining_messages) as u32,
FlorentinDUBOIS marked this conversation as resolved.
Show resolved Hide resolved
)
.await
{
Ok(()) => {}
Err(ConnectionError::Disconnected) => {
self.reconnect().await?;
self.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)
.await?;
debug!("consumer engine: consumer connection disconnected, trying to reconnect");
continue;
}
// we don't need to handle the SlowDown error, since send_flow waits on the
// channel to be not full
Err(e) => return Err(e.into()),
Err(e) => {
error!("consumer engine: we got a unrecoverable connection error, {e}");
return Err(e.into());
}
}
self.remaining_messages = self.batch_size;

self.remaining_messages = self.batch_size as i64;
}

match Self::timeout(self.event_rx.next(), Duration::from_secs(1)).await {
Err(_timeout) => {}
Err(_) => {
// If you are reading this comment, you may have an issue where you have
// received a batched message that is greater that the batch
// size and then break the way that we send flow command message.
//
// In that case, you could increase your batch size or patch this driver by
// adding the following line, if you are sure that you have
// at least 1 incoming message per second.
//
// ```rust
// self.remaining_messages = 0;
// ```
debug!("consumer engine: timeout (1s)");
}
Ok(Some(EngineEvent::Message(msg))) => {
debug!("consumer engine: received message, {:?}", msg);
let out = self.handle_message_opt(msg).await;
if let Some(res) = out {
return res;
}
}
Ok(Some(EngineEvent::EngineMessage(msg))) => {
debug!("consumer engine: received engine message");
let continue_loop = self.handle_ack_opt(msg).await;
if !continue_loop {
return Ok(());
Expand All @@ -203,8 +246,14 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
self.remaining_messages -= message
.payload
.as_ref()
.and_then(|payload| payload.metadata.num_messages_in_batch)
.unwrap_or(1i32) as u32;
.and_then(|payload| {
debug!(
"Consumer: received message payload, num_messages_in_batch = {:?}",
payload.metadata.num_messages_in_batch
);
payload.metadata.num_messages_in_batch
})
.unwrap_or(1) as i64;

match self.process_message(message).await {
// Continue
Expand Down Expand Up @@ -571,6 +620,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
)
.await?;

self.remaining_messages = self.batch_size as i64;
self.messages_rx = Some(messages);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ mod tests {
msg: u32,
}

impl<'a> SerializeMessage for &'a TestData {
impl SerializeMessage for &TestData {
fn serialize_message(input: Self) -> Result<producer::Message, Error> {
let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?;
Ok(producer::Message {
Expand Down
8 changes: 5 additions & 3 deletions src/consumer/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,11 @@ impl<T: 'static + DeserializeMessage, Exe: Executor> Stream for MultiTopicConsum
}
}

if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
self.update_topics();
return self.poll_next(cx);
if self.topic_regex.is_some() {
if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
self.update_topics();
return self.poll_next(cx);
}
}

let mut topics_to_remove = Vec::new();
Expand Down
5 changes: 4 additions & 1 deletion src/consumer/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,10 @@ impl<T: DeserializeMessage, Exe: Executor> Stream for TopicConsumer<T, Exe> {
self.messages_received += 1;
Poll::Ready(Some(Ok(self.create_message(id, payload))))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Err(e))) => {
error!("we are using in the single-consumer and we got an error, {e}");
Poll::Ready(Some(Err(e)))
}
}
}
}
Loading
Loading