Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rdkafka): fix unintended drop of client in fetch_watermarks #200

Merged
merged 3 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## rdkafka [0.3.4] - 2024-03-22

### Fixed

- Fix unintended drop of client in `fetch_watermarks`.

## madsim [0.2.26] - 2024-03-18

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion madsim-rdkafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "madsim-rdkafka"
version = "0.3.3+0.34.0"
version = "0.3.4+0.34.0"
edition = "2021"
authors = ["Runji Wang <wangrunji0408@163.com>"]
description = "The rdkafka simulator on madsim."
Expand Down
14 changes: 10 additions & 4 deletions madsim-rdkafka/src/std/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,23 @@ impl<C: ClientContext> Client<C> {
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
// XXX: to move the raw pointer into spawn_blocking
struct NativePtr(*mut RDKafka);
unsafe impl Send for NativePtr {}

let topic_c = CString::new(topic.to_string())?;
let native_client = unsafe { NativeClient::from_ptr(self.native_ptr()) };
let native_ptr = NativePtr(self.native_ptr());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not seems to be safe if we cancel the future for spawn_blocking. 😕 We may require an Arc<Self> here considering that StreamConsumer holds an Arc over BaseConsumer.


tokio::task::spawn_blocking(move || unsafe {
let mut low = -1;
let mut high = -1;
let native_ptr = native_ptr;
let ret = rdsys::rd_kafka_query_watermark_offsets(
native_client.ptr(),
native_ptr.0,
topic_c.as_ptr(),
partition,
&mut low as *mut i64,
&mut high as *mut i64,
&mut low,
&mut high,
timeout.into().as_millis(),
);
if ret.is_error() {
Expand Down
1 change: 1 addition & 0 deletions madsim-rdkafka/src/std/mocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::ClientContext;
///
/// In this case, we **must neither** destroy the mock cluster in `MockCluster`'s `drop()`,
/// **nor** outlive the `Client` from which the reference is obtained, hence the lifetime.
#[allow(dead_code)]
enum MockClusterClient<'c, C: ClientContext> {
Owned(Client<C>),
Borrowed(&'c Client<C>),
Expand Down
1 change: 1 addition & 0 deletions madsim-tokio/src/sim/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct Runtime {
abort_handles: Mutex<Vec<AbortHandle>>,
}

#[allow(dead_code)]
pub struct EnterGuard<'a>(&'a Runtime);

impl Runtime {
Expand Down
2 changes: 1 addition & 1 deletion madsim/src/sim/rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn init_std_random_state(seed: u64) -> bool {
}

thread_local! {
static SEED: Cell<Option<u64>> = Cell::new(None);
static SEED: Cell<Option<u64>> = const { Cell::new(None) };
}

/// Obtain a series of random bytes.
Expand Down
4 changes: 2 additions & 2 deletions madsim/src/sim/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
use std::{cell::RefCell, sync::Arc};

thread_local! {
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None);
static TASK: RefCell<Option<Arc<TaskInfo>>> = RefCell::new(None);
static CONTEXT: RefCell<Option<Handle>> = const { RefCell::new(None) };
static TASK: RefCell<Option<Arc<TaskInfo>>> = const { RefCell::new(None) };
}

pub(crate) fn current<T>(map: impl FnOnce(&Handle) -> T) -> T {
Expand Down
Loading