From e5a6f27bf87cf77a5eba296329fc4614064260f2 Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Fri, 20 Sep 2024 16:37:28 +0200 Subject: [PATCH 1/9] chore: update dependencies Signed-off-by: Florentin Dubois --- Cargo.toml | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6dc16c..49e074a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,43 +17,43 @@ keywords = ["pulsar", "api", "client"] [dependencies] async-channel = "^2.3.1" -async-trait = "^0.1.81" -async-std = { version = "^1.12.0", features = ["attributes", "unstable"], optional = true } +async-trait = "^0.1.82" +async-std = { version = "^1.13.0", features = ["attributes", "unstable"], optional = true } async-native-tls = { version = "^0.5.0", optional = true } asynchronous-codec = { version = "^0.7.0", optional = true } -bytes = "^1.6.1" +bytes = "^1.7.2" bit-vec = "^0.8.0" chrono = { version = "^0.4.38", default-features = false, features = ["clock", "std"] } crc = "^3.2.1" data-url = { version = "^0.3.1", optional = true } -flate2 = { version = "^1.0.30", optional = true } +flate2 = { version = "^1.0.33", optional = true } futures = "^0.3.30" futures-io = "^0.3.30" futures-timer = "^3.0.3" futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls) log = "^0.4.22" -lz4 = { version = "^1.26.0", optional = true } +lz4 = { version = "^1.27.0", optional = true } native-tls = { version = "^0.2.12", optional = true } nom = { version = "^7.1.3", default-features = false, features = ["alloc"] } openidconnect = { version = "^3.5.0", optional = true } -oauth2 = { version = "^4.4.1", optional = true } +oauth2 = { version = "^4.4.2", optional = true } pem = "^3.0.4" -prost = "^0.13.1" -prost-derive = "^0.13.1" +prost = "^0.13.2" +prost-derive = "^0.13.2" rand = "^0.8.5" -regex = "^1.10.5" -rustls = { version = "^0.23.12", optional = true } +regex = "^1.10.6" +rustls = { version = "^0.23.13", optional = true } snap = { version = "^1.1.1", optional = true } -serde = { version = "^1.0.204", features = ["derive"], optional = true } -serde_json = { version = "^1.0.121", optional = true } -tokio = { version = "^1.39.2", features = ["rt", "net", "time"], optional = true } -tokio-util = { version = "^0.7.11", features = ["codec"], optional = true } +serde = { version = "^1.0.210", features = ["derive"], optional = true } +serde_json = { version = "^1.0.128", optional = true } +tokio = { version = "^1.40.0", features = ["rt", "net", "time"], optional = true } +tokio-util = { version = "^0.7.12", features = ["codec"], optional = true } tokio-rustls = { version = "^0.26.0", optional = true } tokio-native-tls = { version = "^0.3.1", optional = true } tracing = { version = "^0.1.40", optional = true } url = "^2.5.2" uuid = { version = "^1.10.0", features = ["v4", "fast-rng"] } -webpki-roots = { version = "^0.26.3", optional = true } +webpki-roots = { version = "^0.26.6", optional = true } zstd = { version = "^0.13.2", optional = true } [dev-dependencies] From a4f394d61174edb5fd84877cd326254830f7fb8b Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Tue, 17 Dec 2024 20:40:17 +0100 Subject: [PATCH 2/9] fix(service-discovery): implements fast path on lookup for partitioned topic Co-authored-by: @Wonshtrum Co-authored-by: @KannarFr Signed-off-by: Florentin Dubois --- src/service_discovery.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/service_discovery.rs b/src/service_discovery.rs index 1393f6c..8aaa512 100644 --- a/src/service_discovery.rs +++ b/src/service_discovery.rs @@ -176,9 +176,22 @@ impl ServiceDiscovery { &self, topic: S, ) -> Result { - let mut connection = self.manager.get_base_connection().await?; let topic = topic.into(); + // This is a fast path to reduce the number of lookup requests for topic partition metadata and as a side effect + // reduce amplification on zookeeper. + // For example, for a topic with 12 partitions, before this patch we did 13 lookup requests and now, we only did + // once. + // There are some cases where we ask if a partition of a topic is partitioned which could not be the case. + // We are able to detect those requests as the topic name is ending with '...-partition-'. + // So, to be effective and avoid a regex here, we use the `contains` method to detect the pattern '-partition-'. + // if it matches the pattern as there is no partition in a partitioned topic, we could safely return that the + // partition number is 0, that implicitly say that there is only 1 topic and the index is 0. + if topic.contains("-partition-") { + return Ok(0); + } + + let mut connection = self.manager.get_base_connection().await?; let mut current_retries = 0u32; let start = std::time::Instant::now(); let operation_retry_options = self.manager.operation_retry_options.clone(); @@ -257,6 +270,7 @@ impl ServiceDiscovery { ) -> Result, ServiceDiscoveryError> { let topic = topic.into(); let partitions = self.lookup_partitioned_topic_number(&topic).await?; + trace!("Partitions for topic {}: {}", &topic, &partitions); let topics = match partitions { 0 => vec![topic], From db6d5f77e60179700ca390b102b356ce3fe357ab Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Wed, 18 Dec 2024 10:18:55 +0100 Subject: [PATCH 3/9] fix(consumer): do not poll refresh topics if consumer isn't using a regex for topic discovery Co-authored-by: @Wonshtrum Co-authored-by: @KannarFr Signed-off-by: Florentin Dubois --- src/consumer/multi.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/consumer/multi.rs b/src/consumer/multi.rs index cd8e0bd..d0fae2e 100644 --- a/src/consumer/multi.rs +++ b/src/consumer/multi.rs @@ -352,9 +352,11 @@ impl Stream for MultiTopicConsum } } - if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) { - self.update_topics(); - return self.poll_next(cx); + if self.topic_regex.is_some() { + if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) { + self.update_topics(); + return self.poll_next(cx); + } } let mut topics_to_remove = Vec::new(); From 1717bed93a6adf8c8fc3899a83ef5e7f9ad3ca3c Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Wed, 18 Dec 2024 18:11:32 +0100 Subject: [PATCH 4/9] fix(engine): handle underflow on batch size when receiving a batch message Co-authored-by: @Wonshtrum Co-authored-by: @KannarFr Signed-off-by: Florentin Dubois --- src/connection.rs | 7 ++++- src/connection_manager.rs | 6 ++-- src/consumer/engine.rs | 66 ++++++++++++++++++++++++++++++--------- src/consumer/topic.rs | 13 ++++++-- 4 files changed, 72 insertions(+), 20 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index fad471b..b86f3a1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -348,6 +348,7 @@ impl ConnectionSender { self.tx.send(messages::ping()).await?, ) { (Ok(_), ()) => { + debug!("set timeout to {:?} for ping-pong", self.operation_timeout); let delay_f = self.executor.delay(self.operation_timeout); pin_mut!(response); pin_mut!(delay_f); @@ -355,11 +356,13 @@ impl ConnectionSender { match select(response, delay_f).await { Either::Left((res, _)) => res .map_err(|oneshot::Canceled| { + error!("connection-sender: send ping, we have been canceled"); self.error.set(ConnectionError::Disconnected); ConnectionError::Disconnected }) .map(move |_| trace!("received pong from {}", self.connection_id)), Either::Right(_) => { + error!("connection-sender: send ping, we did not received pong inside the timed out"); self.error.set(ConnectionError::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, "timeout when sending ping to the Pulsar server", @@ -652,6 +655,7 @@ impl ConnectionSender { response .await .map_err(|oneshot::Canceled| { + error!("response has been canceled (key = {:?}), we are disconnected", k); error.set(ConnectionError::Disconnected); ConnectionError::Disconnected }) @@ -670,12 +674,13 @@ impl ConnectionSender { let connection_id = self.connection_id; let error = self.error.clone(); let delay_f = self.executor.delay(self.operation_timeout); + trace!("Create timeout futures with operation timeout at {:?}", self.operation_timeout); let fut = async move { pin_mut!(response); pin_mut!(delay_f); match select(response, delay_f).await { Either::Left((res, _)) => { - // println!("recv msg: {:?}", res); + debug!("Received response: {:?}", res); res } Either::Right(_) => { diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 6dcb1a9..a7cbdeb 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -486,13 +486,15 @@ impl ConnectionManager { // in a mutex, and a case appears where the Arc is cloned // somewhere at the same time, that just means the manager // will create a new connection the next time it is asked + let strong_count = Arc::strong_count(conn); trace!( "checking connection {}, is valid? {}, strong_count {}", conn.id(), conn.is_valid(), - Arc::strong_count(conn) + strong_count ); - conn.is_valid() && Arc::strong_count(conn) > 1 + + conn.is_valid() && strong_count > 1 } }); } diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index a233101..6f186fc 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -43,7 +43,7 @@ pub struct ConsumerEngine { event_rx: mpsc::UnboundedReceiver>, event_tx: UnboundedSender>, batch_size: u32, - remaining_messages: u32, + remaining_messages: i64, unacked_message_redelivery_delay: Option, unacked_messages: HashMap, dead_letter_policy: Option, @@ -83,7 +83,7 @@ impl ConsumerEngine { event_rx, event_tx, batch_size, - remaining_messages: batch_size, + remaining_messages: batch_size as i64, unacked_message_redelivery_delay, unacked_messages: HashMap::new(), dead_letter_policy, @@ -108,10 +108,12 @@ impl ConsumerEngine { } } } + let send_end_res = event_tx.send(mapper(None)).await; if let Err(err) = send_end_res { log::error!("Error sending end event to channel - {err}"); } + log::warn!("rx terminated"); })) } @@ -146,37 +148,69 @@ impl ConsumerEngine { })?; } - if self.remaining_messages < self.batch_size.div_ceil(2) { + // In the casual workflow, we use the `batch_size` as a maximum number that we could send using a send + // flow command message to ask the broker to send us messages, which is why we have a subtraction below (`batch_size` - `remaining_messages`). + // + // In the special case of batch messages (which is defined by clients), the number of messages could be + // greater than the given batch size and the remaining messages goes negative, which is why we use an + // `i64` as we want to keep track of the number of negative messages as the next send flow will be + // the batch_size - minus remaining messages which allow us to retrieve the casual workflow of flow command messages. + // + // Here is the example of it works for a batch_size at 1000 and the error case: + // + // ``` + // Message (1) -> (batch_size = 1000, remaing_messages = 999, no flow message trigger) + // ... 499 messages later ... + // Message (1) -> (batch_size = 1000, remaing_messages = 499, flow message trigger, ask broker to send => batch_size - remaining_messages = 501) + // ... 200 messages later ... + // BatchMessage (1024) -> (batch_size = 1000, remaining_messages = 4294967096, no flow message trigger) [underflow on remaining messages - without the patch] + // BatchMessage (1024) -> (batch_size = 1000, remaining_messages = -1124, flow message trigger, ask broker to send => batch_size - remaining_messages = 2124) [no underflow on remaining messages - with the patch] + // ``` + if self.remaining_messages < (self.batch_size.div_ceil(2) as i64) { match self .connection .sender() - .send_flow(self.id, self.batch_size - self.remaining_messages) + .send_flow(self.id, (self.batch_size as i64 - self.remaining_messages) as u32) .await { Ok(()) => {} Err(ConnectionError::Disconnected) => { - self.reconnect().await?; - self.connection - .sender() - .send_flow(self.id, self.batch_size - self.remaining_messages) - .await?; + debug!("consumer engine: consumer connection disconnected, trying to reconnect"); + continue; } // we don't need to handle the SlowDown error, since send_flow waits on the // channel to be not full - Err(e) => return Err(e.into()), + Err(e) => { + error!("consumer engine: we got a unrecoverable connection error, {e}"); + return Err(e.into()) + }, } - self.remaining_messages = self.batch_size; + + self.remaining_messages = self.batch_size as i64; } match Self::timeout(self.event_rx.next(), Duration::from_secs(1)).await { - Err(_timeout) => {} + Err(_) => { + // If you are reading this comment, you may have an issue where you have received a batched message + // that is greater that the batch size and then break the way that we send flow command message. + // + // In that case, you could increase your batch size or patch this driver by adding the following line, + // if you are sure that you have at least 1 incoming message per second. + // + // ```rust + // self.remaining_messages = 0; + // ``` + debug!("consumer engine: timeout (1s)"); + } Ok(Some(EngineEvent::Message(msg))) => { + debug!("consumer engine: received message, {:?}", msg); let out = self.handle_message_opt(msg).await; if let Some(res) = out { return res; } } Ok(Some(EngineEvent::EngineMessage(msg))) => { + debug!("consumer engine: received engine message"); let continue_loop = self.handle_ack_opt(msg).await; if !continue_loop { return Ok(()); @@ -203,8 +237,11 @@ impl ConsumerEngine { self.remaining_messages -= message .payload .as_ref() - .and_then(|payload| payload.metadata.num_messages_in_batch) - .unwrap_or(1i32) as u32; + .and_then(|payload| { + debug!("Consumer: received message payload, num_messages_in_batch = {:?}", payload.metadata.num_messages_in_batch); + payload.metadata.num_messages_in_batch + }) + .unwrap_or(1) as i64; match self.process_message(message).await { // Continue @@ -571,6 +608,7 @@ impl ConsumerEngine { ) .await?; + self.remaining_messages = self.batch_size as i64; self.messages_rx = Some(messages); Ok(()) diff --git a/src/consumer/topic.rs b/src/consumer/topic.rs index 1b4e87b..c20cf69 100644 --- a/src/consumer/topic.rs +++ b/src/consumer/topic.rs @@ -323,14 +323,21 @@ impl Stream for TopicConsumer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.messages.as_mut().poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => { + Poll::Pending + }, + Poll::Ready(None) => { + Poll::Ready(None) + }, Poll::Ready(Some(Ok((id, payload)))) => { self.last_message_received = Some(Utc::now()); self.messages_received += 1; Poll::Ready(Some(Ok(self.create_message(id, payload)))) } - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Err(e))) => { + error!("we are using in the single-consumer and we got an error, {e}"); + Poll::Ready(Some(Err(e))) + }, } } } From f20c5f9fcd2d6a4047e360a98432ce14787d29a5 Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Wed, 18 Dec 2024 19:00:59 +0100 Subject: [PATCH 5/9] chore: apply clippy suggestions Signed-off-by: Florentin Dubois --- src/client.rs | 6 +++--- src/consumer/builder.rs | 6 +++--- src/producer.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0fdfc3c..873b571 100644 --- a/src/client.rs +++ b/src/client.rs @@ -71,7 +71,7 @@ impl SerializeMessage for () { } } -impl<'a> SerializeMessage for &'a [u8] { +impl SerializeMessage for &[u8] { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn serialize_message(input: Self) -> Result { Ok(producer::Message { @@ -102,7 +102,7 @@ impl SerializeMessage for String { } } -impl<'a> SerializeMessage for &'a String { +impl SerializeMessage for &String { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn serialize_message(input: Self) -> Result { let payload = input.as_bytes().to_vec(); @@ -113,7 +113,7 @@ impl<'a> SerializeMessage for &'a String { } } -impl<'a> SerializeMessage for &'a str { +impl SerializeMessage for &str { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn serialize_message(input: Self) -> Result { let payload = input.as_bytes().to_vec(); diff --git a/src/consumer/builder.rs b/src/consumer/builder.rs index 3f9607d..fc17df9 100644 --- a/src/consumer/builder.rs +++ b/src/consumer/builder.rs @@ -177,7 +177,7 @@ impl ConsumerBuilder { // Checks the builder for inconsistencies // returns a config and a list of topics with associated brokers #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - async fn validate( + async fn validate( self, ) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> { let ConsumerBuilder { @@ -272,7 +272,7 @@ impl ConsumerBuilder { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn build(self) -> Result, Error> { // would this clone() consume too much memory? - let (config, joined_topics) = self.clone().validate::().await?; + let (config, joined_topics) = self.clone().validate().await?; let consumers = try_join_all(joined_topics.into_iter().map(|(topic, addr)| { TopicConsumer::new(self.pulsar.clone(), topic, addr, config.clone()) @@ -322,7 +322,7 @@ impl ConsumerBuilder { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn into_reader(self) -> Result, Error> { // would this clone() consume too much memory? - let (mut config, mut joined_topics) = self.clone().validate::().await?; + let (mut config, mut joined_topics) = self.clone().validate().await?; // Internally, the reader interface is implemented as a consumer using an exclusive, // non-durable subscription diff --git a/src/producer.rs b/src/producer.rs index d73cf9a..bc003ce 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -360,7 +360,7 @@ impl Producer { /// this function was called, for various reasons: /// - the message was sent successfully but Pulsar did not send the receipt yet /// - the producer is batching messages, so this function must return immediately, - /// and the receipt will come when the batched messages are actually sent + /// and the receipt will come when the batched messages are actually sent /// /// Usage: /// From a0f3118796e1dc1d6b2b9c0cc768dcea51fed1a0 Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Wed, 18 Dec 2024 19:01:21 +0100 Subject: [PATCH 6/9] ci: enable tests to run on newer pulsar version Signed-off-by: Florentin Dubois --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b5a13c9..8c75bdf 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -27,7 +27,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - pulsar-version: [ 2.10.4, 2.11.2, 3.0.4, 3.1.3 ] + pulsar-version: [ 2.10.6, 2.11.4, 3.0.8, 3.2.4, 3.3.3, 4.0.1 ] steps: - name: Start Pulsar Standalone Container run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true apachepulsar/pulsar:${{ matrix.pulsar-version }} bin/pulsar standalone From b71303fd649981905aa2f665cc837434919eaf1f Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Wed, 18 Dec 2024 19:05:58 +0100 Subject: [PATCH 7/9] chore: apply formatter Signed-off-by: Florentin Dubois --- src/connection.rs | 10 ++++++++-- src/consumer/builder.rs | 4 +--- src/consumer/engine.rs | 24 ++++++++++++++++-------- src/consumer/mod.rs | 2 +- src/consumer/topic.rs | 10 +++------- src/lib.rs | 2 +- src/producer.rs | 4 ++-- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index b86f3a1..1b23e26 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -655,7 +655,10 @@ impl ConnectionSender { response .await .map_err(|oneshot::Canceled| { - error!("response has been canceled (key = {:?}), we are disconnected", k); + error!( + "response has been canceled (key = {:?}), we are disconnected", + k + ); error.set(ConnectionError::Disconnected); ConnectionError::Disconnected }) @@ -674,7 +677,10 @@ impl ConnectionSender { let connection_id = self.connection_id; let error = self.error.clone(); let delay_f = self.executor.delay(self.operation_timeout); - trace!("Create timeout futures with operation timeout at {:?}", self.operation_timeout); + trace!( + "Create timeout futures with operation timeout at {:?}", + self.operation_timeout + ); let fut = async move { pin_mut!(response); pin_mut!(delay_f); diff --git a/src/consumer/builder.rs b/src/consumer/builder.rs index fc17df9..837f556 100644 --- a/src/consumer/builder.rs +++ b/src/consumer/builder.rs @@ -177,9 +177,7 @@ impl ConsumerBuilder { // Checks the builder for inconsistencies // returns a config and a list of topics with associated brokers #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - async fn validate( - self, - ) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> { + async fn validate(self) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> { let ConsumerBuilder { pulsar, topics, diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index 6f186fc..8b6c1dd 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -170,7 +170,10 @@ impl ConsumerEngine { match self .connection .sender() - .send_flow(self.id, (self.batch_size as i64 - self.remaining_messages) as u32) + .send_flow( + self.id, + (self.batch_size as i64 - self.remaining_messages) as u32, + ) .await { Ok(()) => {} @@ -182,8 +185,8 @@ impl ConsumerEngine { // channel to be not full Err(e) => { error!("consumer engine: we got a unrecoverable connection error, {e}"); - return Err(e.into()) - }, + return Err(e.into()); + } } self.remaining_messages = self.batch_size as i64; @@ -191,11 +194,13 @@ impl ConsumerEngine { match Self::timeout(self.event_rx.next(), Duration::from_secs(1)).await { Err(_) => { - // If you are reading this comment, you may have an issue where you have received a batched message - // that is greater that the batch size and then break the way that we send flow command message. + // If you are reading this comment, you may have an issue where you have + // received a batched message that is greater that the batch + // size and then break the way that we send flow command message. // - // In that case, you could increase your batch size or patch this driver by adding the following line, - // if you are sure that you have at least 1 incoming message per second. + // In that case, you could increase your batch size or patch this driver by + // adding the following line, if you are sure that you have + // at least 1 incoming message per second. // // ```rust // self.remaining_messages = 0; @@ -238,7 +243,10 @@ impl ConsumerEngine { .payload .as_ref() .and_then(|payload| { - debug!("Consumer: received message payload, num_messages_in_batch = {:?}", payload.metadata.num_messages_in_batch); + debug!( + "Consumer: received message payload, num_messages_in_batch = {:?}", + payload.metadata.num_messages_in_batch + ); payload.metadata.num_messages_in_batch }) .unwrap_or(1) as i64; diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index d04ec83..2a48298 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -454,7 +454,7 @@ mod tests { msg: u32, } - impl<'a> SerializeMessage for &'a TestData { + impl SerializeMessage for &TestData { fn serialize_message(input: Self) -> Result { let payload = serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?; Ok(producer::Message { diff --git a/src/consumer/topic.rs b/src/consumer/topic.rs index c20cf69..6837512 100644 --- a/src/consumer/topic.rs +++ b/src/consumer/topic.rs @@ -323,12 +323,8 @@ impl Stream for TopicConsumer { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.messages.as_mut().poll_next(cx) { - Poll::Pending => { - Poll::Pending - }, - Poll::Ready(None) => { - Poll::Ready(None) - }, + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(Ok((id, payload)))) => { self.last_message_received = Some(Utc::now()); self.messages_received += 1; @@ -337,7 +333,7 @@ impl Stream for TopicConsumer { Poll::Ready(Some(Err(e))) => { error!("we are using in the single-consumer and we got an error, {e}"); Poll::Ready(Some(Err(e))) - }, + } } } } diff --git a/src/lib.rs b/src/lib.rs index 53d473c..166c424 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -233,7 +233,7 @@ mod tests { pub data: String, } - impl<'a> SerializeMessage for &'a TestData { + impl SerializeMessage for &TestData { fn serialize_message(input: Self) -> Result { let payload = serde_json::to_vec(input).map_err(|e| PulsarError::Custom(e.to_string()))?; diff --git a/src/producer.rs b/src/producer.rs index bc003ce..2ba383d 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -359,8 +359,8 @@ impl Producer { /// this function returns a `SendFuture` because the receipt can come long after /// this function was called, for various reasons: /// - the message was sent successfully but Pulsar did not send the receipt yet - /// - the producer is batching messages, so this function must return immediately, - /// and the receipt will come when the batched messages are actually sent + /// - the producer is batching messages, so this function must return immediately, and the + /// receipt will come when the batched messages are actually sent /// /// Usage: /// From ef7c06337a95919bb29fcd399d0fecbc613832be Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Thu, 19 Dec 2024 11:29:31 +0100 Subject: [PATCH 8/9] chore: update dependencies Signed-off-by: Florentin Dubois --- Cargo.toml | 49 +++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 49e074a..221756e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,53 +17,50 @@ keywords = ["pulsar", "api", "client"] [dependencies] async-channel = "^2.3.1" -async-trait = "^0.1.82" +async-trait = "^0.1.83" async-std = { version = "^1.13.0", features = ["attributes", "unstable"], optional = true } async-native-tls = { version = "^0.5.0", optional = true } asynchronous-codec = { version = "^0.7.0", optional = true } -bytes = "^1.7.2" -bit-vec = "^0.8.0" -chrono = { version = "^0.4.38", default-features = false, features = ["clock", "std"] } +bytes = "^1.9.0" +chrono = { version = "^0.4.39", default-features = false, features = ["clock", "std"] } crc = "^3.2.1" data-url = { version = "^0.3.1", optional = true } -flate2 = { version = "^1.0.33", optional = true } -futures = "^0.3.30" -futures-io = "^0.3.30" -futures-timer = "^3.0.3" +flate2 = { version = "^1.0.35", optional = true } +futures = "^0.3.31" futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls) log = "^0.4.22" -lz4 = { version = "^1.27.0", optional = true } +lz4 = { version = "^1.28.0", optional = true } native-tls = { version = "^0.2.12", optional = true } nom = { version = "^7.1.3", default-features = false, features = ["alloc"] } openidconnect = { version = "^3.5.0", optional = true } oauth2 = { version = "^4.4.2", optional = true } pem = "^3.0.4" -prost = "^0.13.2" -prost-derive = "^0.13.2" +prost = "^0.13.4" +prost-derive = "^0.13.4" rand = "^0.8.5" -regex = "^1.10.6" -rustls = { version = "^0.23.13", optional = true } +regex = "^1.11.1" +rustls = { version = "^0.23.20", optional = true } snap = { version = "^1.1.1", optional = true } -serde = { version = "^1.0.210", features = ["derive"], optional = true } -serde_json = { version = "^1.0.128", optional = true } -tokio = { version = "^1.40.0", features = ["rt", "net", "time"], optional = true } -tokio-util = { version = "^0.7.12", features = ["codec"], optional = true } -tokio-rustls = { version = "^0.26.0", optional = true } +serde = { version = "^1.0.216", features = ["derive"], optional = true } +serde_json = { version = "^1.0.133", optional = true } +tokio = { version = "^1.42.0", features = ["rt", "net", "time"], optional = true } +tokio-util = { version = "^0.7.13", features = ["codec"], optional = true } +tokio-rustls = { version = "^0.26.1", optional = true } tokio-native-tls = { version = "^0.3.1", optional = true } -tracing = { version = "^0.1.40", optional = true } -url = "^2.5.2" -uuid = { version = "^1.10.0", features = ["v4", "fast-rng"] } -webpki-roots = { version = "^0.26.6", optional = true } +tracing = { version = "^0.1.41", optional = true } +url = "^2.5.4" +uuid = { version = "^1.11.0", features = ["v4", "fast-rng"] } +webpki-roots = { version = "^0.26.7", optional = true } zstd = { version = "^0.13.2", optional = true } [dev-dependencies] env_logger = "^0.11.5" -serde = { version = "^1.0.204", features = ["derive"] } -serde_json = "^1.0.121" -tokio = { version = "^1.39.2", features = ["macros", "rt-multi-thread"] } +serde = { version = "^1.0.216", features = ["derive"] } +serde_json = "^1.0.133" +tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] } [build-dependencies] -prost-build = "^0.13.1" +prost-build = "^0.13.4" protobuf-src = { version = "^2.1.0", optional = true } [features] From 1bc4ec367ede2fc2e99b35871906b2409ba6ec22 Mon Sep 17 00:00:00 2001 From: Florentin Dubois Date: Thu, 19 Dec 2024 13:36:21 +0100 Subject: [PATCH 9/9] style: apply formatter Signed-off-by: Florentin Dubois --- src/consumer/engine.rs | 16 ++++++++++------ src/service_discovery.rs | 17 +++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index 8b6c1dd..0d7b6f6 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -148,13 +148,17 @@ impl ConsumerEngine { })?; } - // In the casual workflow, we use the `batch_size` as a maximum number that we could send using a send - // flow command message to ask the broker to send us messages, which is why we have a subtraction below (`batch_size` - `remaining_messages`). + // In the casual workflow, we use the `batch_size` as a maximum number that we could + // send using a send flow command message to ask the broker to send us + // messages, which is why we have a subtraction below (`batch_size` - + // `remaining_messages`). // - // In the special case of batch messages (which is defined by clients), the number of messages could be - // greater than the given batch size and the remaining messages goes negative, which is why we use an - // `i64` as we want to keep track of the number of negative messages as the next send flow will be - // the batch_size - minus remaining messages which allow us to retrieve the casual workflow of flow command messages. + // In the special case of batch messages (which is defined by clients), the number of + // messages could be greater than the given batch size and the remaining + // messages goes negative, which is why we use an `i64` as we want to keep + // track of the number of negative messages as the next send flow will be + // the batch_size - minus remaining messages which allow us to retrieve the casual + // workflow of flow command messages. // // Here is the example of it works for a batch_size at 1000 and the error case: // diff --git a/src/service_discovery.rs b/src/service_discovery.rs index 8aaa512..16e7398 100644 --- a/src/service_discovery.rs +++ b/src/service_discovery.rs @@ -178,14 +178,15 @@ impl ServiceDiscovery { ) -> Result { let topic = topic.into(); - // This is a fast path to reduce the number of lookup requests for topic partition metadata and as a side effect - // reduce amplification on zookeeper. - // For example, for a topic with 12 partitions, before this patch we did 13 lookup requests and now, we only did - // once. - // There are some cases where we ask if a partition of a topic is partitioned which could not be the case. - // We are able to detect those requests as the topic name is ending with '...-partition-'. - // So, to be effective and avoid a regex here, we use the `contains` method to detect the pattern '-partition-'. - // if it matches the pattern as there is no partition in a partitioned topic, we could safely return that the + // This is a fast path to reduce the number of lookup requests for topic partition metadata + // and as a side effect reduce amplification on zookeeper. + // For example, for a topic with 12 partitions, before this patch we did 13 lookup requests + // and now, we only did once. + // There are some cases where we ask if a partition of a topic is partitioned which could + // not be the case. We are able to detect those requests as the topic name is ending + // with '...-partition-'. So, to be effective and avoid a regex here, we use + // the `contains` method to detect the pattern '-partition-'. if it matches the + // pattern as there is no partition in a partitioned topic, we could safely return that the // partition number is 0, that implicitly say that there is only 1 topic and the index is 0. if topic.contains("-partition-") { return Ok(0);