From d3b0300b8fe6cf1e04105ef625da19b012687b6d Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 21 Feb 2023 12:26:35 +1100 Subject: [PATCH] split kafka codec (#1052) --- shotover-proxy/src/codec/kafka.rs | 47 ++++++++++++------- shotover-proxy/src/sources/kafka.rs | 4 +- .../src/transforms/kafka/sink_single.rs | 4 +- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/shotover-proxy/src/codec/kafka.rs b/shotover-proxy/src/codec/kafka.rs index e1e3b1fbc..944f31337 100644 --- a/shotover-proxy/src/codec/kafka.rs +++ b/shotover-proxy/src/codec/kafka.rs @@ -5,28 +5,31 @@ use anyhow::Result; use bytes::{Buf, BytesMut}; use tokio_util::codec::{Decoder, Encoder}; -impl CodecBuilder for KafkaCodec { - type Decoder = KafkaCodec; - type Encoder = KafkaCodec; - fn build(&self) -> (KafkaCodec, KafkaCodec) { - (KafkaCodec::new(), KafkaCodec::new()) +#[derive(Clone, Default)] +pub struct KafkaCodecBuilder {} + +impl KafkaCodecBuilder { + pub fn new() -> Self { + Default::default() } } -#[derive(Debug, Clone)] -pub struct KafkaCodec { - messages: Messages, +impl CodecBuilder for KafkaCodecBuilder { + type Decoder = KafkaDecoder; + type Encoder = KafkaEncoder; + fn build(&self) -> (KafkaDecoder, KafkaEncoder) { + (KafkaDecoder::new(), KafkaEncoder::new()) + } } -impl Default for KafkaCodec { - fn default() -> Self { - Self::new() - } +#[derive(Default)] +pub struct KafkaDecoder { + messages: Messages, } -impl KafkaCodec { - pub fn new() -> KafkaCodec { - KafkaCodec { messages: vec![] } +impl KafkaDecoder { + pub fn new() -> Self { + KafkaDecoder::default() } } @@ -43,7 +46,7 @@ fn get_length_of_full_message(src: &mut BytesMut) -> Option { } } -impl Decoder for KafkaCodec { +impl Decoder for KafkaDecoder { type Item = Messages; type Error = CodecReadError; @@ -66,13 +69,21 @@ impl Decoder for KafkaCodec { } } -impl Encoder for KafkaCodec { +#[derive(Default)] +pub struct KafkaEncoder {} + +impl KafkaEncoder { + pub fn new() -> Self { + KafkaEncoder::default() + } +} + +impl Encoder for KafkaEncoder { type Error = anyhow::Error; fn encode(&mut self, item: Messages, dst: &mut BytesMut) -> Result<()> { item.into_iter().try_for_each(|m| { let start = dst.len(); - // TODO: MessageType::Kafka let result = match m.into_encodable(MessageType::Kafka)? { Encodable::Bytes(bytes) => { dst.extend_from_slice(&bytes); diff --git a/shotover-proxy/src/sources/kafka.rs b/shotover-proxy/src/sources/kafka.rs index f0be10abf..97aa72ca1 100644 --- a/shotover-proxy/src/sources/kafka.rs +++ b/shotover-proxy/src/sources/kafka.rs @@ -1,4 +1,4 @@ -use crate::codec::kafka::KafkaCodec; +use crate::codec::kafka::KafkaCodecBuilder; use crate::server::TcpCodecListener; use crate::sources::Sources; use crate::tls::{TlsAcceptor, TlsAcceptorConfig}; @@ -67,7 +67,7 @@ impl KafkaSource { name.to_string(), listen_addr.clone(), hard_connection_limit.unwrap_or(false), - KafkaCodec::new(), + KafkaCodecBuilder::new(), Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx.clone(), tls.map(TlsAcceptor::new).transpose()?, diff --git a/shotover-proxy/src/transforms/kafka/sink_single.rs b/shotover-proxy/src/transforms/kafka/sink_single.rs index a28bb3e02..228232f6d 100644 --- a/shotover-proxy/src/transforms/kafka/sink_single.rs +++ b/shotover-proxy/src/transforms/kafka/sink_single.rs @@ -1,4 +1,4 @@ -use crate::codec::kafka::KafkaCodec; +use crate::codec::kafka::KafkaCodecBuilder; use crate::error::ChainResponse; use crate::message::Messages; use crate::tcp; @@ -87,7 +87,7 @@ pub struct KafkaSinkSingle { impl Transform for KafkaSinkSingle { async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { if self.outbound.is_none() { - let codec = KafkaCodec::new(); + let codec = KafkaCodecBuilder::new(); let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?; let (rx, tx) = tcp_stream.into_split(); self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));