Skip to content

Commit

Permalink
Flush messages for 1 sec before terminating kafka sink if we have in …
Browse files Browse the repository at this point in the history
…flight messages.

Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
  • Loading branch information
Matthias Wahl authored and Licenser committed Nov 12, 2020
1 parent 7dcfe89 commit 1af1686
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use rdkafka::{
error::KafkaError,
producer::{FutureProducer, FutureRecord},
};
use std::fmt;
use std::{fmt, time::Duration};

#[derive(Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -314,4 +314,16 @@ impl Sink for Kafka {
fn auto_ack(&self) -> bool {
false
}
async fn terminate(&mut self) {
if self.producer.in_flight_count() > 0 {
// wait a second in order to flush messages.
let wait_secs = 1;
info!(
"[Sink::{}] Flushing messages. Waiting for {} seconds.",
wait_secs, &self.sink_url
);
self.producer.flush(Duration::from_secs(1));
info!("[Sink::{}] Terminating.", &self.sink_url);
}
}
}

0 comments on commit 1af1686

Please sign in to comment.