Skip to content

Commit

Permalink
wrap fetch_watermarks in spawn_blocking (#196)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
wangrunji0408 authored Feb 28, 2024
1 parent 18bf59c commit ca93352
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 18 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## rdkafka [0.3.3] - 2024-02-28

### Changed

- Wrap `fetch_watermarks` in `tokio::task::spawn_blocking`.

## rdkafka [0.3.2] - 2024-02-28

### Changed
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim-rdkafka"
version = "0.3.2+0.34.0"
version = "0.3.3+0.34.0"
edition = "2021"
authors = ["Runji Wang <wangrunji0408@163.com>"]
description = "The rdkafka simulator on madsim."
Expand Down
6 changes: 4 additions & 2 deletions madsim-rdkafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ The following functions are modified to be `async`:
- `ClientConfig::create`
- `ClientConfig::create_with_context`
- `Client::fetch_metadata`
- `Client::fetch_watermarks`
- `Client::fetch_watermarks`[^1]
- `Client::fetch_group_list`
- `Consumer::seek`
- `Consumer::seek_partitions`
Expand All @@ -37,14 +37,16 @@ The following functions are modified to be `async`:
- `Consumer::offsets_for_timestamp`
- `Consumer::offsets_for_times`
- `Consumer::fetch_metadata`
- `Consumer::fetch_watermarks`
- `Consumer::fetch_watermarks`[^1]
- `Consumer::fetch_group_list`
- `Producer::flush`
- `Producer::init_transactions`
- `Producer::send_offsets_to_transaction`
- `Producer::commit_transaction`
- `Producer::abort_transaction`

[^1]: wrapped in `tokio::task::spawn_blocking`

## DNS Resolution

This crate has cherry-picked [a commit] from Materialize to support rewriting broker addresses.
Expand Down
27 changes: 15 additions & 12 deletions madsim-rdkafka/src/std/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,29 +354,32 @@ impl<C: ClientContext> Client<C> {
}

/// Returns high and low watermark for the specified topic and partition.
pub async fn fetch_watermarks<T: Into<Timeout>>(
pub async fn fetch_watermarks<T: Into<Timeout> + Send + 'static>(
&self,
topic: &str,
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
let mut low = -1;
let mut high = -1;
let topic_c = CString::new(topic.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_query_watermark_offsets(
self.native_ptr(),
let native_client = unsafe { NativeClient::from_ptr(self.native_ptr()) };
tokio::task::spawn_blocking(move || unsafe {
let mut low = -1;
let mut high = -1;
let ret = rdsys::rd_kafka_query_watermark_offsets(
native_client.ptr(),
topic_c.as_ptr(),
partition,
&mut low as *mut i64,
&mut high as *mut i64,
timeout.into().as_millis(),
)
};
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret.into()));
}
Ok((low, high))
);
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret.into()));
}
Ok((low, high))
})
.await
.unwrap()
}

/// Returns the cluster identifier option or None if the cluster identifier is null
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/src/std/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ where
self.client.fetch_metadata(topic, timeout).await
}

async fn fetch_watermarks<T: Into<Timeout> + Send>(
async fn fetch_watermarks<T: Into<Timeout> + Send + 'static>(
&self,
topic: &str,
partition: i32,
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/src/std/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ where
timeout: T,
) -> KafkaResult<(i64, i64)>
where
T: Into<Timeout> + Send,
T: Into<Timeout> + Send + 'static,
Self: Sized;

/// Returns the group membership information for the given group. If no group is
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/src/std/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ where
timeout: T,
) -> KafkaResult<(i64, i64)>
where
T: Into<Timeout> + Send,
T: Into<Timeout> + Send + 'static,
Self: Sized,
{
self.base.fetch_watermarks(topic, partition, timeout).await
Expand Down

0 comments on commit ca93352

Please sign in to comment.