From 1af16868dfe82520f0be9615da2d81d3794f37cf Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Thu, 12 Nov 2020 23:20:46 +0100 Subject: [PATCH] Flush messages for 1 sec before terminating kafka sink if we have in flight messages. Signed-off-by: Matthias Wahl --- src/sink/kafka.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/sink/kafka.rs b/src/sink/kafka.rs index b0852d26ff..c898d4ca47 100644 --- a/src/sink/kafka.rs +++ b/src/sink/kafka.rs @@ -30,7 +30,7 @@ use rdkafka::{ error::KafkaError, producer::{FutureProducer, FutureRecord}, }; -use std::fmt; +use std::{fmt, time::Duration}; #[derive(Deserialize)] pub struct Config { @@ -314,4 +314,16 @@ impl Sink for Kafka { fn auto_ack(&self) -> bool { false } + async fn terminate(&mut self) { + if self.producer.in_flight_count() > 0 { + // wait a second in order to flush messages. + let wait_secs = 1; + info!( + "[Sink::{}] Flushing messages. Waiting for {} seconds.", + wait_secs, &self.sink_url + ); + self.producer.flush(Duration::from_secs(1)); + info!("[Sink::{}] Terminating.", &self.sink_url); + } + } }