diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 771d91b7d..7859231ad 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -29,7 +29,7 @@ use std::net::SocketAddr; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::sync::{oneshot, RwLock}; use tokio::time::timeout; use uuid::Uuid; @@ -158,7 +158,6 @@ impl TransformBuilder for KafkaSinkClusterBuilder { Box::new(KafkaSinkCluster { first_contact_points: self.first_contact_points.clone(), shotover_nodes: self.shotover_nodes.clone(), - pushed_messages_tx: None, read_timeout: self.read_timeout, nodes: vec![], nodes_shared: self.nodes_shared.clone(), @@ -236,7 +235,6 @@ impl SaslStatus { pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, - pushed_messages_tx: Option>, read_timeout: Option, nodes: Vec, nodes_shared: Arc>>, @@ -302,10 +300,6 @@ impl Transform for KafkaSinkCluster { self.receive_responses(&find_coordinator_requests, responses) .await } - - fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { - self.pushed_messages_tx = Some(pushed_messages_tx); - } } impl KafkaSinkCluster { diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index 4eb41a995..8566e1bb1 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -15,7 +15,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::io::split; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use tokio::time::timeout; #[derive(Serialize, Deserialize, Debug)] @@ -82,7 +82,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder { Box::new(KafkaSinkSingle { outbound: None, address_port: self.address_port, - pushed_messages_tx: None, connect_timeout: self.connect_timeout, tls: self.tls.clone(), read_timeout: self.read_timeout, @@ -101,7 +100,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder { pub struct KafkaSinkSingle { address_port: u16, outbound: Option, - pushed_messages_tx: Option>, connect_timeout: Duration, read_timeout: Option, tls: Option, @@ -193,10 +191,6 @@ impl Transform for KafkaSinkSingle { Ok(responses) } - - fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { - self.pushed_messages_tx = Some(pushed_messages_tx); - } } impl KafkaSinkSingle {