From ca933529dca42bcc5203a98268365fd778a05676 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 28 Feb 2024 18:03:37 +0800 Subject: [PATCH] wrap `fetch_watermarks` in `spawn_blocking` (#196) Signed-off-by: Runji Wang --- CHANGELOG.md | 6 +++++ madsim-rdkafka/Cargo.toml | 2 +- madsim-rdkafka/README.md | 6 +++-- madsim-rdkafka/src/std/client.rs | 27 ++++++++++--------- .../src/std/consumer/base_consumer.rs | 2 +- madsim-rdkafka/src/std/consumer/mod.rs | 2 +- .../src/std/consumer/stream_consumer.rs | 2 +- 7 files changed, 29 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fab0cfd0..84300eb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## rdkafka [0.3.3] - 2024-02-28 + +### Changed + +- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`. + ## rdkafka [0.3.2] - 2024-02-28 ### Changed diff --git a/madsim-rdkafka/Cargo.toml b/madsim-rdkafka/Cargo.toml index 9d714a13..fc9af207 100644 --- a/madsim-rdkafka/Cargo.toml +++ b/madsim-rdkafka/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "madsim-rdkafka" -version = "0.3.2+0.34.0" +version = "0.3.3+0.34.0" edition = "2021" authors = ["Runji Wang "] description = "The rdkafka simulator on madsim." diff --git a/madsim-rdkafka/README.md b/madsim-rdkafka/README.md index 1ea46671..a73272a0 100644 --- a/madsim-rdkafka/README.md +++ b/madsim-rdkafka/README.md @@ -25,7 +25,7 @@ The following functions are modified to be `async`: - `ClientConfig::create` - `ClientConfig::create_with_context` - `Client::fetch_metadata` -- `Client::fetch_watermarks` +- `Client::fetch_watermarks`[^1] - `Client::fetch_group_list` - `Consumer::seek` - `Consumer::seek_partitions` @@ -37,7 +37,7 @@ The following functions are modified to be `async`: - `Consumer::offsets_for_timestamp` - `Consumer::offsets_for_times` - `Consumer::fetch_metadata` -- `Consumer::fetch_watermarks` +- `Consumer::fetch_watermarks`[^1] - `Consumer::fetch_group_list` - `Producer::flush` - `Producer::init_transactions` @@ -45,6 +45,8 @@ The following functions are modified to be `async`: - `Producer::commit_transaction` - `Producer::abort_transaction` +[^1]: wrapped in `tokio::task::spawn_blocking` + ## DNS Resolution This crate has cherry-picked [a commit] from Materialize to support rewriting broker addresses. diff --git a/madsim-rdkafka/src/std/client.rs b/madsim-rdkafka/src/std/client.rs index b5555e13..9ad5ce35 100644 --- a/madsim-rdkafka/src/std/client.rs +++ b/madsim-rdkafka/src/std/client.rs @@ -354,29 +354,32 @@ impl Client { } /// Returns high and low watermark for the specified topic and partition. - pub async fn fetch_watermarks>( + pub async fn fetch_watermarks + Send + 'static>( &self, topic: &str, partition: i32, timeout: T, ) -> KafkaResult<(i64, i64)> { - let mut low = -1; - let mut high = -1; let topic_c = CString::new(topic.to_string())?; - let ret = unsafe { - rdsys::rd_kafka_query_watermark_offsets( - self.native_ptr(), + let native_client = unsafe { NativeClient::from_ptr(self.native_ptr()) }; + tokio::task::spawn_blocking(move || unsafe { + let mut low = -1; + let mut high = -1; + let ret = rdsys::rd_kafka_query_watermark_offsets( + native_client.ptr(), topic_c.as_ptr(), partition, &mut low as *mut i64, &mut high as *mut i64, timeout.into().as_millis(), - ) - }; - if ret.is_error() { - return Err(KafkaError::MetadataFetch(ret.into())); - } - Ok((low, high)) + ); + if ret.is_error() { + return Err(KafkaError::MetadataFetch(ret.into())); + } + Ok((low, high)) + }) + .await + .unwrap() } /// Returns the cluster identifier option or None if the cluster identifier is null diff --git a/madsim-rdkafka/src/std/consumer/base_consumer.rs b/madsim-rdkafka/src/std/consumer/base_consumer.rs index 9a794c3d..3f5790e0 100644 --- a/madsim-rdkafka/src/std/consumer/base_consumer.rs +++ b/madsim-rdkafka/src/std/consumer/base_consumer.rs @@ -578,7 +578,7 @@ where self.client.fetch_metadata(topic, timeout).await } - async fn fetch_watermarks + Send>( + async fn fetch_watermarks + Send + 'static>( &self, topic: &str, partition: i32, diff --git a/madsim-rdkafka/src/std/consumer/mod.rs b/madsim-rdkafka/src/std/consumer/mod.rs index 548a725e..72014163 100644 --- a/madsim-rdkafka/src/std/consumer/mod.rs +++ b/madsim-rdkafka/src/std/consumer/mod.rs @@ -391,7 +391,7 @@ where timeout: T, ) -> KafkaResult<(i64, i64)> where - T: Into + Send, + T: Into + Send + 'static, Self: Sized; /// Returns the group membership information for the given group. If no group is diff --git a/madsim-rdkafka/src/std/consumer/stream_consumer.rs b/madsim-rdkafka/src/std/consumer/stream_consumer.rs index 905e2f3e..30e5977b 100644 --- a/madsim-rdkafka/src/std/consumer/stream_consumer.rs +++ b/madsim-rdkafka/src/std/consumer/stream_consumer.rs @@ -526,7 +526,7 @@ where timeout: T, ) -> KafkaResult<(i64, i64)> where - T: Into + Send, + T: Into + Send + 'static, Self: Sized, { self.base.fetch_watermarks(topic, partition, timeout).await