Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix slow kafka sink when queue.buffering.max.ms is set to > 0. #585

Merged
merged 4 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sink/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl offramp::Impl for Elastic {

impl Elastic {
async fn drain_insights(&mut self) -> ResultVec {
let mut v = Vec::with_capacity(self.tx.len() + 1);
let mut v = Vec::with_capacity(self.rx.len() + 1);
while let Ok(e) = self.rx.try_recv() {
v.push(e)
}
Expand Down
181 changes: 143 additions & 38 deletions src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
//! See [Config](struct.Config.html) for details.

use crate::sink::prelude::*;
use crate::source::kafka::SmolRuntime;
use async_channel::{bounded, Receiver, Sender};
use halfbrown::HashMap;
use rdkafka::config::ClientConfig;
use rdkafka::{
error::KafkaError,
producer::{FutureProducer, FutureRecord},
};
use std::fmt;
use std::time::Duration;
use std::{fmt, time::Duration};

#[derive(Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -60,11 +59,15 @@ pub struct Config {
impl Config {
fn producer(&self) -> Result<FutureProducer> {
let mut producer_config = ClientConfig::new();

// ENABLE LIBRDKAFKA DEBUGGING:
// - set librdkafka logger to debug in logger.yaml
// - configure: debug: "all" for this onramp
let producer_config = producer_config
.set("client.id", &format!("tremor-{}-{}", self.hostname, 0))
.set("bootstrap.servers", &self.brokers.join(","))
.set("message.timeout.ms", "5000")
.set("queue.buffering.max.ms", "0");
.set("queue.buffering.max.ms", "0"); // set to 0 for sending each message out immediately without kafka client internal batching --> low latency, busy network

Ok(self
.rdkafka_options
Expand All @@ -82,14 +85,18 @@ fn d_host() -> String {

/// Kafka offramp connectoz
pub struct Kafka {
sink_url: TremorURL,
config: Config,
producer: FutureProducer,
postprocessors: Postprocessors,
reply_tx: Sender<sink::Reply>,
error_rx: Receiver<KafkaError>,
error_tx: Sender<KafkaError>,
}

impl fmt::Debug for Kafka {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Kafka: {}", self.config.topic)
write!(f, "[Sink::{}] Kafka: {}", &self.sink_url, self.config.topic)
}
}

Expand All @@ -99,11 +106,18 @@ impl offramp::Impl for Kafka {
let config: Config = Config::new(config)?;
let producer = config.producer()?;
// Create the thread pool where the expensive computation will be performed.
let (dummy_tx, _) = bounded(1);

// TODO: does this need to be unbounded?
let (error_tx, error_rx) = bounded(crate::QSIZE);
Ok(SinkManager::new_box(Self {
sink_url: TremorURL::from_offramp_id("kafka")?, // dummy
config,
producer,
postprocessors: vec![],
reply_tx: dummy_tx,
error_rx,
error_tx,
}))
} else {
Err("Kafka offramp requires a config".into())
Expand Down Expand Up @@ -145,55 +159,131 @@ where
}
}

/// Waits for actual delivery to kafka cluster and sends ack or fail.
/// Also sends fatal errors for handling in offramp task.
async fn wait_for_delivery(
sink_url: String,
futures: Vec<rdkafka::producer::DeliveryFuture>,
mut insight_event: Event,
reply_tx: Sender<sink::Reply>,
error_tx: Sender<KafkaError>,
) -> Result<()> {
let cb = match futures::future::try_join_all(futures).await {
Ok(results) => {
if let Some((kafka_error, _)) = results.into_iter().find_map(std::result::Result::err) {
error!(
"[Sink::{}] Error delivering kafka record: {}",
sink_url, &kafka_error
);
if is_fatal(&kafka_error) {
error_tx.send(kafka_error).await?;
}
CBAction::Fail
} else {
// all good. send ack
CBAction::Ack
}
}
Err(e) => {
error!(
"[Sink::{}] DeliveryFuture cancelled. Message delivery status unclear, considering it failed.: {}",
sink_url, e
);
// oh noes, send fail
CBAction::Fail
}
};
insight_event.cb = cb;
reply_tx.send(sink::Reply::Insight(insight_event)).await?;
Ok(())
}

impl Kafka {
fn drain_fatal_errors(&mut self) -> Result<()> {
let mut handled = false;
while let Ok(e) = self.error_rx.try_recv() {
if !handled {
// only handle on first fatal error
self.handle_fatal_error(&e)?;
handled = true;
}
}
Ok(())
}

fn handle_fatal_error(&mut self, _fatal_error: &KafkaError) -> Result<()> {
let maybe_fatal_error = unsafe { get_fatal_error(self.producer.client()) };
if let Some(fatal_error) = maybe_fatal_error {
error!(
"[Sink::{}] Fatal Error({:?}): {}",
&self.sink_url, fatal_error.0, fatal_error.1
);
}
error!("[Sink::{}] Reinitiating client...", &self.sink_url);
self.producer = self.config.producer()?;
error!("[Sink::{}] Client reinitiated.", &self.sink_url);

Ok(())
}
}

#[async_trait::async_trait]
impl Sink for Kafka {
async fn on_event(
&mut self,
_input: &str,
codec: &dyn Codec,
_codec_map: &HashMap<String, Box<dyn Codec>>,
event: Event,
mut event: Event,
) -> ResultVec {
let mut success = true;
// ensure we handle any fatal errors occured during last on_event invocation
self.drain_fatal_errors()?;

let ingest_ns = event.ingest_ns;
let mut delivery_futures = Vec::with_capacity(event.len()); // might not be enough
let mut insight_event = event.insight_ack(); // we gonna change the success status later, if need be
for (value, meta) in event.value_meta_iter() {
let encoded = codec.encode(value)?;
let processed = postprocess(self.postprocessors.as_mut_slice(), ingest_ns, encoded)?;
for raw in processed {
let mut record = FutureRecord::to(&self.config.topic);
record = record.payload(&raw);

let record = if let Some(k) = meta.get("kafka_key").and_then(Value::as_str) {
record.key(k)
} else if let Some(ref k) = self.config.key {
record.key(k.as_str())
} else {
record
};
match self
.producer
.send_with_runtime::<SmolRuntime, _, _, _>(record, Duration::from_secs(0))
.await
{
Ok(_) => {}
Err((e, _r)) => {
error!("[Kafka Offramp] failed to enque message: {}", e);
let meta_kafka_key = meta.get("kafka_key").and_then(Value::as_str);
for payload in processed {
// TODO: allow defining partition and timestamp in meta
let mut record = FutureRecord::to(self.config.topic.as_str());
record = record.payload(&payload);
if let Some(kafka_key) = meta_kafka_key {
record = record.key(kafka_key);
} else if let Some(kafka_key) = &self.config.key {
record = record.key(kafka_key.as_str());
}

// send out without blocking on delivery
match self.producer.send_result(record) {
Ok(delivery_future) => {
delivery_futures.push(delivery_future);
}
Err((e, _)) => {
error!("[Sink::{}] failed to enque message: {}", &self.sink_url, e);
if is_fatal(&e) {
if let Some((code, fatal)) =
unsafe { get_fatal_error(self.producer.client()) }
{
error!("[Kafka Offramp] Fatal Error({:?}): {}", code, fatal);
}
self.producer = self.config.producer()?;
error!("[Kafka Offramp] reinitiating client");
// handle fatal errors right here, without enqueueing
self.handle_fatal_error(&e)?;
}
success = false;
break;
// bail out with a CB fail on enqueue error
insight_event.cb = CBAction::Fail;
return Ok(Some(vec![sink::Reply::Insight(insight_event)]));
}
}
}
}
Ok(Some(vec![sink::Reply::Insight(event.insight(success))]))
// successfully enqueued all messages
// spawn the task waiting for delivery and send acks/fails then
task::spawn(wait_for_delivery(
self.sink_url.to_string(),
delivery_futures,
insight_event,
self.reply_tx.clone(),
self.error_tx.clone(),
));
Ok(None)
}
fn default_codec(&self) -> &str {
"json"
Expand All @@ -202,17 +292,20 @@ impl Sink for Kafka {
async fn init(
&mut self,
_sink_uid: u64,
_sink_url: &TremorURL,
sink_url: &TremorURL,
_codec: &dyn Codec,
_codec_map: &HashMap<String, Box<dyn Codec>>,
processors: Processors<'_>,
_is_linked: bool,
_reply_channel: Sender<sink::Reply>,
reply_channel: Sender<sink::Reply>,
) -> Result<()> {
self.postprocessors = make_postprocessors(processors.post)?;
self.reply_tx = reply_channel;
self.sink_url = sink_url.clone();
Ok(())
}
async fn on_signal(&mut self, _signal: Event) -> ResultVec {
self.drain_fatal_errors()?;
Ok(None)
}
fn is_active(&self) -> bool {
Expand All @@ -221,4 +314,16 @@ impl Sink for Kafka {
fn auto_ack(&self) -> bool {
false
}
async fn terminate(&mut self) {
if self.producer.in_flight_count() > 0 {
// wait a second in order to flush messages.
let wait_secs = 1;
info!(
"[Sink::{}] Flushing messages. Waiting for {} seconds.",
wait_secs, &self.sink_url
);
self.producer.flush(Duration::from_secs(1));
info!("[Sink::{}] Terminating.", &self.sink_url);
}
}
}
11 changes: 7 additions & 4 deletions src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::future::{self, FutureExt};
use futures::StreamExt;
use halfbrown::HashMap;
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::{self, StreamConsumer};
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext};
use rdkafka::error::KafkaResult;
Expand All @@ -37,7 +37,7 @@ use std::time::{Duration, Instant};
pub struct SmolRuntime;

impl AsyncRuntime for SmolRuntime {
type Delay = future::Map<async_io::Timer, fn(Instant)>;
type Delay = future::Map<smol::Timer, fn(Instant)>;

fn spawn<T>(task: T)
where
Expand Down Expand Up @@ -331,6 +331,10 @@ impl Source for Int {
info!("Starting kafka onramp {}", self.onramp_id);
// Setting up the configuration with default and then overwriting
// them with custom settings.
//
// ENABLE LIBRDKAFKA DEBUGGING:
// - set librdkafka logger to debug in logger.yaml
// - configure: debug: "all" for this onramp
let client_config = client_config
.set("group.id", &self.config.group_id)
.set(
Expand All @@ -344,8 +348,7 @@ impl Source for Int {
.set("enable.auto.commit", "true")
.set("auto.commit.interval.ms", "5000")
// but only commit the offsets explicitly stored via `consumer.store_offset`.
.set("enable.auto.offset.store", "true")
.set_log_level(RDKafkaLogLevel::Debug);
.set("enable.auto.offset.store", "true");

let client_config = if let Some(options) = self.config.rdkafka_options.as_ref() {
options
Expand Down
Loading