Skip to content

Commit

Permalink
Update librdkafka dependency to 1.5.2.
Browse files Browse the repository at this point in the history
This mitigates confluentinc/librdkafka#2933 which is fixed in 1.5.2.
This issue was leading to hanging consumers when they tried to commit during a rebalance operation.

Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
  • Loading branch information
Matthias Wahl authored and mfelsche committed Sep 28, 2021
1 parent 074f664 commit 7f971e3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 78 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ postgres-protocol = "0.6"
tokio-postgres = "0.7"

# kafka. cmake is the encouraged way to build this and also the one that works on windows/with musl.
rdkafka = { version = "0.24", features = [
rdkafka = { version = "0.26", features = [
"cmake-build",
"libz-static",
], default-features = false }
rdkafka-sys = { version = "2.0.0", features = [
rdkafka-sys = { version = "4.0.0", features = [
"cmake-build",
"libz-static",
] } # tracking the version rdkafka depends on
Expand Down
79 changes: 24 additions & 55 deletions src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use async_channel::{bounded, Receiver, Sender};
use halfbrown::HashMap;
use rdkafka::config::ClientConfig;
use rdkafka::{
error::KafkaError,
error::{KafkaError, RDKafkaError},
message::OwnedHeaders,
producer::{FutureProducer, FutureRecord},
producer::{FutureProducer, FutureRecord, Producer},
};
use std::{
fmt,
Expand Down Expand Up @@ -94,8 +94,8 @@ pub struct Kafka {
producer: FutureProducer,
postprocessors: Postprocessors,
reply_tx: Sender<sink::Reply>,
error_rx: Receiver<KafkaError>,
error_tx: Sender<KafkaError>,
error_rx: Receiver<RDKafkaError>,
error_tx: Sender<RDKafkaError>,
}

impl fmt::Debug for Kafka {
Expand Down Expand Up @@ -129,40 +129,6 @@ impl offramp::Impl for Kafka {
}
}

fn is_fatal(e: &KafkaError) -> bool {
matches!(
e,
KafkaError::AdminOp(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::ConsumerCommit(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::Global(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::GroupListFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MessageConsumption(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MessageProduction(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MetadataFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::OffsetFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::SetPartitionOffset(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::StoreOffset(rdkafka::error::RDKafkaError::Fatal)
)
}

unsafe fn get_fatal_error<C>(
client: &rdkafka::client::Client<C>,
) -> Option<(rdkafka::types::RDKafkaRespErr, String)>
where
C: rdkafka::ClientContext,
{
const LEN: usize = 4096;
let mut buf: [i8; LEN] = std::mem::MaybeUninit::uninit().assume_init();
let client_ptr = client.native_ptr();

let code = rdkafka_sys::bindings::rd_kafka_fatal_error(client_ptr, buf.as_mut_ptr(), LEN);
if code == rdkafka::types::RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
None
} else {
Some((code, rdkafka::util::cstr_to_owned(buf.as_ptr())))
}
}

/// Waits for actual delivery to kafka cluster and sends ack or fail.
/// Also sends fatal errors for handling in offramp task.
#[allow(clippy::cast_possible_truncation)]
Expand All @@ -172,18 +138,20 @@ async fn wait_for_delivery(
processing_start: Instant,
maybe_event: Option<Event>,
reply_tx: Sender<sink::Reply>,
error_tx: Sender<KafkaError>,
error_tx: Sender<RDKafkaError>,
) -> Result<()> {
let cb = match futures::future::try_join_all(futures).await {
Ok(results) => {
if let Some((kafka_error, _)) = results.into_iter().find_map(std::result::Result::err) {
if let Some((KafkaError::Transaction(rd_error), _)) =
results.into_iter().find_map(std::result::Result::err)
{
error!(
"[Sink::{}] Error delivering kafka record: {}",
sink_url, &kafka_error
sink_url, &rd_error
);
if is_fatal(&kafka_error) {
let err_msg = format!("{}", &kafka_error);
if error_tx.send(kafka_error).await.is_err() {
if rd_error.is_fatal() {
let err_msg = format!("{}", &rd_error);
if error_tx.send(rd_error).await.is_err() {
error!(
"[Sink::{}] Error notifying the system about kafka error: {}",
&sink_url, &err_msg
Expand Down Expand Up @@ -237,14 +205,13 @@ impl Kafka {
Ok(())
}

fn handle_fatal_error(&mut self, _fatal_error: &KafkaError) -> Result<()> {
let maybe_fatal_error = unsafe { get_fatal_error(self.producer.client()) };
if let Some(fatal_error) = maybe_fatal_error {
error!(
"[Sink::{}] Fatal Error({:?}): {}",
&self.sink_url, fatal_error.0, fatal_error.1
);
}
fn handle_fatal_error(&mut self, fatal_error: &RDKafkaError) -> Result<()> {
error!(
"[Sink::{}] Fatal Error({:?}): {}",
&self.sink_url,
fatal_error.code(),
fatal_error.string()
);
error!("[Sink::{}] Reinitiating client...", &self.sink_url);
self.producer = self.config.producer()?;
error!("[Sink::{}] Client reinitiated.", &self.sink_url);
Expand Down Expand Up @@ -310,9 +277,11 @@ impl Sink for Kafka {
"[Sink::{}] failed to enqueue message: {}",
&self.sink_url, e
);
if is_fatal(&e) {
// handle fatal errors right here, without enqueueing
self.handle_fatal_error(&e)?;
if let KafkaError::Transaction(e) = e {
if e.is_fatal() {
// handle fatal errors right here, without enqueueing
self.handle_fatal_error(&e)?;
}
}
// bail out with a CB fail on enqueue error
if event.transactional {
Expand Down
42 changes: 25 additions & 17 deletions src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ impl rentals::MessageStream {
// ALLOW: https://github.com/tremor-rs/tremor-runtime/issues/1023
#[allow(mutable_transmutes, clippy::mut_from_ref)]
unsafe fn mut_suffix(
&self,
) -> &mut stream_consumer::MessageStream<'static, LoggingConsumerContext, SmolRuntime> {
&mut self,
) -> &mut stream_consumer::MessageStream<'_, LoggingConsumerContext, SmolRuntime> {
// ALLOW: https://github.com/tremor-rs/tremor-runtime/issues/1023
mem::transmute(&self.suffix().stream)
}
Expand All @@ -168,7 +168,7 @@ impl rentals::MessageStream {
&mut s.consumer
}
fn commit(&mut self, map: &StdMap<(String, i32), Offset>, mode: CommitMode) -> Result<()> {
let offsets = TopicPartitionList::from_topic_map(map);
let offsets = TopicPartitionList::from_topic_map(map)?;

unsafe { self.consumer().commit(&offsets, mode)? };

Expand Down Expand Up @@ -348,15 +348,15 @@ impl ConsumerContext for LoggingConsumerContext {
}
}
// this is actually not an error - we just didnt have any offset to commit
Err(KafkaError::ConsumerCommit(rdkafka_sys::RDKafkaError::NoOffset)) => {}
Err(KafkaError::ConsumerCommit(rdkafka_sys::RDKafkaErrorCode::NoOffset)) => {}
Err(e) => warn!(
"[Source::{}] Error while committing offsets: {}",
self.onramp_id, e
),
};
}
}
pub type LoggingConsumer = StreamConsumer<LoggingConsumerContext>;
pub type LoggingConsumer = StreamConsumer<LoggingConsumerContext, SmolRuntime>;

/// ensure a zero poll timeout to have a non-blocking call

Expand All @@ -369,21 +369,32 @@ impl Source for Int {
&self.onramp_id
}
async fn pull_event(&mut self, id: u64) -> Result<SourceReply> {
if let Some(stream) = self.stream.as_mut() {
let s = unsafe { stream.mut_suffix() };
let r = match timeout(Duration::from_millis(100), s.next()).await {
if let Self {
stream: Some(stream),
onramp_id,
origin_uri,
auto_commit,
messages,
..
} = &mut self
{
let r = {
let s = unsafe { stream.mut_suffix() };
timeout(Duration::from_millis(100), s.next()).await
};
let r = match r {
Ok(r) => r,
Err(_) => return Ok(SourceReply::Empty(0)),
};
if let Some(Ok(m)) = r {
debug!(
"[Source::{}] EventId: {} Offset: {}",
self.onramp_id,
onramp_id,
id,
m.offset()
);
if let Some(Ok(data)) = m.payload_view::<[u8]>() {
let mut origin_uri = self.origin_uri.clone();
let mut origin_uri = origin_uri.clone();
origin_uri.path = vec![
m.topic().to_string(),
m.partition().to_string(),
Expand Down Expand Up @@ -422,8 +433,8 @@ impl Source for Int {
}
kafka_meta_data.insert("kafka", meta_data)?;

if !self.auto_commit {
self.messages.insert(id, MsgOffset::from(m));
if !*auto_commit {
messages.insert(id, MsgOffset::from(m));
}
Ok(SourceReply::Data {
origin_uri,
Expand Down Expand Up @@ -590,11 +601,8 @@ impl Source for Int {
return Err(e.into());
}
};
let stream = rentals::MessageStream::new(Box::new(consumer), |c| {
StreamAndMsgs::new(
c.start_with_runtime::<SmolRuntime>(Duration::from_millis(100), false),
)
});
let stream =
rentals::MessageStream::new(Box::new(consumer), |c| StreamAndMsgs::new(c.stream()));
self.stream = Some(stream);

Ok(SourceState::Connected)
Expand Down

0 comments on commit 7f971e3

Please sign in to comment.