From dde3f0c09d707fd436a4154fe595573633fc5808 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Thu, 12 Nov 2020 22:32:17 +0100 Subject: [PATCH 1/4] Fix slow kafka sink when queue.buffering.max.ms is set to > 0. Also fix librdkafka not batching, whatever value queue.buffering.max.ms had. Signed-off-by: Matthias Wahl --- src/sink/elastic.rs | 2 +- src/sink/kafka.rs | 167 +++++++++++++++++++++++++++-------- src/source/kafka.rs | 9 +- tremor-pipeline/src/event.rs | 16 ++++ 4 files changed, 153 insertions(+), 41 deletions(-) diff --git a/src/sink/elastic.rs b/src/sink/elastic.rs index 002a2fa236..7da4c55305 100644 --- a/src/sink/elastic.rs +++ b/src/sink/elastic.rs @@ -91,7 +91,7 @@ impl offramp::Impl for Elastic { impl Elastic { async fn drain_insights(&mut self) -> ResultVec { - let mut v = Vec::with_capacity(self.tx.len() + 1); + let mut v = Vec::with_capacity(self.rx.len() + 1); while let Ok(e) = self.rx.try_recv() { v.push(e) } diff --git a/src/sink/kafka.rs b/src/sink/kafka.rs index 50823bc385..b0852d26ff 100644 --- a/src/sink/kafka.rs +++ b/src/sink/kafka.rs @@ -23,7 +23,7 @@ //! See [Config](struct.Config.html) for details. use crate::sink::prelude::*; -use crate::source::kafka::SmolRuntime; +use async_channel::{bounded, Receiver, Sender}; use halfbrown::HashMap; use rdkafka::config::ClientConfig; use rdkafka::{ @@ -31,7 +31,6 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, }; use std::fmt; -use std::time::Duration; #[derive(Deserialize)] pub struct Config { @@ -60,11 +59,15 @@ pub struct Config { impl Config { fn producer(&self) -> Result { let mut producer_config = ClientConfig::new(); + + // ENABLE LIBRDKAFKA DEBUGGING: + // - set librdkafka logger to debug in logger.yaml + // - configure: debug: "all" for this onramp let producer_config = producer_config .set("client.id", &format!("tremor-{}-{}", self.hostname, 0)) .set("bootstrap.servers", &self.brokers.join(",")) .set("message.timeout.ms", "5000") - .set("queue.buffering.max.ms", "0"); + .set("queue.buffering.max.ms", "0"); // set to 0 for sending each message out immediately without kafka client internal batching --> low latency, busy network Ok(self .rdkafka_options @@ -82,14 +85,18 @@ fn d_host() -> String { /// Kafka offramp connectoz pub struct Kafka { + sink_url: TremorURL, config: Config, producer: FutureProducer, postprocessors: Postprocessors, + reply_tx: Sender, + error_rx: Receiver, + error_tx: Sender, } impl fmt::Debug for Kafka { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Kafka: {}", self.config.topic) + write!(f, "[Sink::{}] Kafka: {}", &self.sink_url, self.config.topic) } } @@ -99,11 +106,18 @@ impl offramp::Impl for Kafka { let config: Config = Config::new(config)?; let producer = config.producer()?; // Create the thread pool where the expensive computation will be performed. + let (dummy_tx, _) = bounded(1); + // TODO: does this need to be unbounded? + let (error_tx, error_rx) = bounded(crate::QSIZE); Ok(SinkManager::new_box(Self { + sink_url: TremorURL::from_offramp_id("kafka")?, // dummy config, producer, postprocessors: vec![], + reply_tx: dummy_tx, + error_rx, + error_tx, })) } else { Err("Kafka offramp requires a config".into()) @@ -145,6 +159,74 @@ where } } +/// Waits for actual delivery to kafka cluster and sends ack or fail. +/// Also sends fatal errors for handling in offramp task. +async fn wait_for_delivery( + sink_url: String, + futures: Vec, + mut insight_event: Event, + reply_tx: Sender, + error_tx: Sender, +) -> Result<()> { + let cb = match futures::future::try_join_all(futures).await { + Ok(results) => { + if let Some((kafka_error, _)) = results.into_iter().find_map(|res| res.err()) { + error!( + "[Sink::{}] Error delivering kafka record: {}", + sink_url, &kafka_error + ); + if is_fatal(&kafka_error) { + error_tx.send(kafka_error).await?; + } + CBAction::Fail + } else { + // all good. send ack + CBAction::Ack + } + } + Err(e) => { + error!( + "[Sink::{}] DeliveryFuture cancelled. Message delivery status unclear, considering it failed.: {}", + sink_url, e + ); + // oh noes, send fail + CBAction::Fail + } + }; + insight_event.cb = cb; + reply_tx.send(sink::Reply::Insight(insight_event)).await?; + Ok(()) +} + +impl Kafka { + fn drain_fatal_errors(&mut self) -> Result<()> { + let mut handled = false; + while let Ok(e) = self.error_rx.try_recv() { + if !handled { + // only handle on first fatal error + self.handle_fatal_error(e)?; + handled = true; + } + } + Ok(()) + } + + fn handle_fatal_error(&mut self, _fatal_error: KafkaError) -> Result<()> { + let maybe_fatal_error = unsafe { get_fatal_error(self.producer.client()) }; + if let Some(fatal_error) = maybe_fatal_error { + error!( + "[Sink::{}] Fatal Error({:?}): {}", + &self.sink_url, fatal_error.0, fatal_error.1 + ); + } + error!("[Sink::{}] Reinitiating client...", &self.sink_url); + self.producer = self.config.producer()?; + error!("[Sink::{}] Client reinitiated.", &self.sink_url); + + Ok(()) + } +} + #[async_trait::async_trait] impl Sink for Kafka { async fn on_event( @@ -152,48 +234,56 @@ impl Sink for Kafka { _input: &str, codec: &dyn Codec, _codec_map: &HashMap>, - event: Event, + mut event: Event, ) -> ResultVec { - let mut success = true; + // ensure we handle any fatal errors occured during last on_event invocation + self.drain_fatal_errors()?; + let ingest_ns = event.ingest_ns; + let mut delivery_futures = Vec::with_capacity(event.len()); // might not be enough + let mut insight_event = event.insight_ack(); // we gonna change the success status later, if need be for (value, meta) in event.value_meta_iter() { let encoded = codec.encode(value)?; let processed = postprocess(self.postprocessors.as_mut_slice(), ingest_ns, encoded)?; - for raw in processed { - let mut record = FutureRecord::to(&self.config.topic); - record = record.payload(&raw); - - let record = if let Some(k) = meta.get("kafka_key").and_then(Value::as_str) { - record.key(k) - } else if let Some(ref k) = self.config.key { - record.key(k.as_str()) - } else { - record - }; - match self - .producer - .send_with_runtime::(record, Duration::from_secs(0)) - .await - { - Ok(_) => {} - Err((e, _r)) => { - error!("[Kafka Offramp] failed to enque message: {}", e); + let meta_kafka_key = meta.get("kafka_key").and_then(Value::as_str); + for payload in processed { + // TODO: allow defining partition and timestamp in meta + let mut record = FutureRecord::to(self.config.topic.as_str()); + record = record.payload(&payload); + if let Some(kafka_key) = meta_kafka_key { + record = record.key(kafka_key); + } else if let Some(kafka_key) = &self.config.key { + record = record.key(kafka_key.as_str()); + } + + // send out without blocking on delivery + match self.producer.send_result(record) { + Ok(delivery_future) => { + delivery_futures.push(delivery_future); + } + Err((e, _)) => { + error!("[Sink::{}] failed to enque message: {}", &self.sink_url, e); if is_fatal(&e) { - if let Some((code, fatal)) = - unsafe { get_fatal_error(self.producer.client()) } - { - error!("[Kafka Offramp] Fatal Error({:?}): {}", code, fatal); - } - self.producer = self.config.producer()?; - error!("[Kafka Offramp] reinitiating client"); + // handle fatal errors right here, without enqueueing + self.handle_fatal_error(e)?; } - success = false; - break; + // bail out with a CB fail on enqueue error + insight_event.cb = CBAction::Fail; + return Ok(Some(vec![sink::Reply::Insight(insight_event)])); } } } } - Ok(Some(vec![sink::Reply::Insight(event.insight(success))])) + // successfully enqueued all messages + // spawn the task waiting for delivery and send acks/fails then + task::spawn(wait_for_delivery( + self.sink_url.to_string(), + delivery_futures, + insight_event, + self.reply_tx.clone(), + self.error_tx.clone(), + )); + Ok(None) } fn default_codec(&self) -> &str { "json" @@ -202,17 +292,20 @@ impl Sink for Kafka { async fn init( &mut self, _sink_uid: u64, - _sink_url: &TremorURL, + sink_url: &TremorURL, _codec: &dyn Codec, _codec_map: &HashMap>, processors: Processors<'_>, _is_linked: bool, - _reply_channel: Sender, + reply_channel: Sender, ) -> Result<()> { self.postprocessors = make_postprocessors(processors.post)?; + self.reply_tx = reply_channel.clone(); + self.sink_url = sink_url.clone(); Ok(()) } async fn on_signal(&mut self, _signal: Event) -> ResultVec { + self.drain_fatal_errors()?; Ok(None) } fn is_active(&self) -> bool { diff --git a/src/source/kafka.rs b/src/source/kafka.rs index e1e8598015..cb363389b9 100644 --- a/src/source/kafka.rs +++ b/src/source/kafka.rs @@ -37,7 +37,7 @@ use std::time::{Duration, Instant}; pub struct SmolRuntime; impl AsyncRuntime for SmolRuntime { - type Delay = future::Map; + type Delay = future::Map; fn spawn(task: T) where @@ -331,6 +331,10 @@ impl Source for Int { info!("Starting kafka onramp {}", self.onramp_id); // Setting up the configuration with default and then overwriting // them with custom settings. + // + // ENABLE LIBRDKAFKA DEBUGGING: + // - set librdkafka logger to debug in logger.yaml + // - configure: debug: "all" for this onramp let client_config = client_config .set("group.id", &self.config.group_id) .set( @@ -344,8 +348,7 @@ impl Source for Int { .set("enable.auto.commit", "true") .set("auto.commit.interval.ms", "5000") // but only commit the offsets explicitly stored via `consumer.store_offset`. - .set("enable.auto.offset.store", "true") - .set_log_level(RDKafkaLogLevel::Debug); + .set("enable.auto.offset.store", "true"); let client_config = if let Some(options) = self.config.rdkafka_options.as_ref() { options diff --git a/tremor-pipeline/src/event.rs b/tremor-pipeline/src/event.rs index 55dc130ca4..af386041bc 100644 --- a/tremor-pipeline/src/event.rs +++ b/tremor-pipeline/src/event.rs @@ -168,6 +168,22 @@ impl Event { ..Event::default() } } + + #[must_use] + /// return the number of events contained within this event + /// normally 1, but for batched events possibly > 1 + pub fn len(&self) -> usize { + if self.is_batch { + self.data + .suffix() + .value() + .as_array() + .map(|a| a.len()) + .unwrap_or_default() + } else { + 1 + } + } } /// Iterator over the event value and metadata From b321391044a6ef93e9d6eab0e4f88935bd808b07 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Thu, 12 Nov 2020 23:11:50 +0100 Subject: [PATCH 2/4] clippy, clippy, joy, joy Signed-off-by: Matthias Wahl --- src/source/kafka.rs | 2 +- tremor-pipeline/src/event.rs | 78 +++++++++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/source/kafka.rs b/src/source/kafka.rs index cb363389b9..5745b100dc 100644 --- a/src/source/kafka.rs +++ b/src/source/kafka.rs @@ -21,7 +21,7 @@ use futures::future::{self, FutureExt}; use futures::StreamExt; use halfbrown::HashMap; use rdkafka::client::ClientContext; -use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; +use rdkafka::config::ClientConfig; use rdkafka::consumer::stream_consumer::{self, StreamConsumer}; use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext}; use rdkafka::error::KafkaResult; diff --git a/tremor-pipeline/src/event.rs b/tremor-pipeline/src/event.rs index af386041bc..034fad399a 100644 --- a/tremor-pipeline/src/event.rs +++ b/tremor-pipeline/src/event.rs @@ -178,12 +178,25 @@ impl Event { .suffix() .value() .as_array() - .map(|a| a.len()) + .map(Vec::len) .unwrap_or_default() } else { 1 } } + + #[must_use] + /// returns true if this event is batched but has no wrapped events + pub fn is_empty(&self) -> bool { + self.is_batch + && self + .data + .suffix() + .value() + .as_array() + .map(Vec::is_empty) + .unwrap_or(true) + } } /// Iterator over the event value and metadata @@ -194,6 +207,7 @@ pub struct ValueMetaIter<'value> { idx: usize, } +// TODO: descend recursively into batched events in batched events ... impl<'value> Iterator for ValueMetaIter<'value> { type Item = (&'value BorrowedValue<'value>, &'value BorrowedValue<'value>); fn next(&mut self) -> Option { @@ -264,7 +278,8 @@ impl<'value> Iterator for ValueIter<'value> { #[cfg(test)] mod test { use super::*; - use simd_json::value::borrowed::Object; + use crate::errors::Result; + use simd_json::{value::borrowed::Object, StaticNode}; use tremor_script::ValueAndMeta; #[test] @@ -356,4 +371,63 @@ mod test { assert_eq!(Event::cb_trigger(0).cb, CBAction::Close); assert_eq!(e.insight_trigger().cb, CBAction::Close); } + + #[test] + fn len() -> Result<()> { + // default non-batched event + let mut e = Event::default(); + assert_eq!(1, e.len()); + // batched event with 2 elements + e.is_batch = true; + let mut value = BorrowedValue::array_with_capacity(2); + value.push(BorrowedValue::Static(StaticNode::Bool(true)))?; // dummy events + value.push(BorrowedValue::Static(StaticNode::Bool(false)))?; + e.data = (value, BorrowedValue::object_with_capacity(0)).into(); + assert_eq!(2, e.len()); + + // batched event with non-array value + e.data = ( + BorrowedValue::null(), + BorrowedValue::object_with_capacity(0), + ) + .into(); + assert_eq!(0, e.len()); + // batched array with empty array value + e.data = ( + BorrowedValue::array_with_capacity(0), + BorrowedValue::object_with_capacity(0), + ) + .into(); + assert_eq!(0, e.len()); + + Ok(()) + } + + #[test] + fn is_empty() -> Result<()> { + let mut e = Event::default(); + assert_eq!(false, e.is_empty()); + + e.is_batch = true; + e.data = ( + BorrowedValue::null(), + BorrowedValue::object_with_capacity(0), + ) + .into(); + assert_eq!(true, e.is_empty()); + + e.data = ( + BorrowedValue::array_with_capacity(0), + BorrowedValue::object_with_capacity(0), + ) + .into(); + assert_eq!(true, e.is_empty()); + + let mut value = BorrowedValue::array_with_capacity(2); + value.push(BorrowedValue::Static(StaticNode::Bool(true)))?; // dummy events + value.push(BorrowedValue::Static(StaticNode::Bool(false)))?; + e.data = (value, BorrowedValue::object_with_capacity(0)).into(); + assert_eq!(false, e.is_empty()); + Ok(()) + } } From 832ab0bedad91106e6196971c5d2e887bc3ab8c1 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Thu, 12 Nov 2020 23:20:46 +0100 Subject: [PATCH 3/4] Flush messages for 1 sec before terminating kafka sink if we have in flight messages. Signed-off-by: Matthias Wahl --- src/sink/kafka.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/sink/kafka.rs b/src/sink/kafka.rs index b0852d26ff..c898d4ca47 100644 --- a/src/sink/kafka.rs +++ b/src/sink/kafka.rs @@ -30,7 +30,7 @@ use rdkafka::{ error::KafkaError, producer::{FutureProducer, FutureRecord}, }; -use std::fmt; +use std::{fmt, time::Duration}; #[derive(Deserialize)] pub struct Config { @@ -314,4 +314,16 @@ impl Sink for Kafka { fn auto_ack(&self) -> bool { false } + async fn terminate(&mut self) { + if self.producer.in_flight_count() > 0 { + // wait a second in order to flush messages. + let wait_secs = 1; + info!( + "[Sink::{}] Flushing messages. Waiting for {} seconds.", + wait_secs, &self.sink_url + ); + self.producer.flush(Duration::from_secs(1)); + info!("[Sink::{}] Terminating.", &self.sink_url); + } + } } From 0c47a39bb3e97d917339a2b395070ed8576d0c77 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Thu, 12 Nov 2020 23:35:44 +0100 Subject: [PATCH 4/4] oh you, clippy! Signed-off-by: Matthias Wahl --- src/sink/kafka.rs | 10 +++++----- tremor-pipeline/src/event.rs | 10 ++-------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/sink/kafka.rs b/src/sink/kafka.rs index c898d4ca47..cebe3ed746 100644 --- a/src/sink/kafka.rs +++ b/src/sink/kafka.rs @@ -170,7 +170,7 @@ async fn wait_for_delivery( ) -> Result<()> { let cb = match futures::future::try_join_all(futures).await { Ok(results) => { - if let Some((kafka_error, _)) = results.into_iter().find_map(|res| res.err()) { + if let Some((kafka_error, _)) = results.into_iter().find_map(std::result::Result::err) { error!( "[Sink::{}] Error delivering kafka record: {}", sink_url, &kafka_error @@ -204,14 +204,14 @@ impl Kafka { while let Ok(e) = self.error_rx.try_recv() { if !handled { // only handle on first fatal error - self.handle_fatal_error(e)?; + self.handle_fatal_error(&e)?; handled = true; } } Ok(()) } - fn handle_fatal_error(&mut self, _fatal_error: KafkaError) -> Result<()> { + fn handle_fatal_error(&mut self, _fatal_error: &KafkaError) -> Result<()> { let maybe_fatal_error = unsafe { get_fatal_error(self.producer.client()) }; if let Some(fatal_error) = maybe_fatal_error { error!( @@ -265,7 +265,7 @@ impl Sink for Kafka { error!("[Sink::{}] failed to enque message: {}", &self.sink_url, e); if is_fatal(&e) { // handle fatal errors right here, without enqueueing - self.handle_fatal_error(e)?; + self.handle_fatal_error(&e)?; } // bail out with a CB fail on enqueue error insight_event.cb = CBAction::Fail; @@ -300,7 +300,7 @@ impl Sink for Kafka { reply_channel: Sender, ) -> Result<()> { self.postprocessors = make_postprocessors(processors.post)?; - self.reply_tx = reply_channel.clone(); + self.reply_tx = reply_channel; self.sink_url = sink_url.clone(); Ok(()) } diff --git a/tremor-pipeline/src/event.rs b/tremor-pipeline/src/event.rs index 034fad399a..d96fb3b834 100644 --- a/tremor-pipeline/src/event.rs +++ b/tremor-pipeline/src/event.rs @@ -174,12 +174,7 @@ impl Event { /// normally 1, but for batched events possibly > 1 pub fn len(&self) -> usize { if self.is_batch { - self.data - .suffix() - .value() - .as_array() - .map(Vec::len) - .unwrap_or_default() + self.data.suffix().value().as_array().map_or(0, Vec::len) } else { 1 } @@ -194,8 +189,7 @@ impl Event { .suffix() .value() .as_array() - .map(Vec::is_empty) - .unwrap_or(true) + .map_or(true, Vec::is_empty) } }