Skip to content

Commit

Permalink
split kafka codec (#1052)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 21, 2023
1 parent 161419d commit d3b0300
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
47 changes: 29 additions & 18 deletions shotover-proxy/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,31 @@ use anyhow::Result;
use bytes::{Buf, BytesMut};
use tokio_util::codec::{Decoder, Encoder};

impl CodecBuilder for KafkaCodec {
type Decoder = KafkaCodec;
type Encoder = KafkaCodec;
fn build(&self) -> (KafkaCodec, KafkaCodec) {
(KafkaCodec::new(), KafkaCodec::new())
#[derive(Clone, Default)]
pub struct KafkaCodecBuilder {}

impl KafkaCodecBuilder {
pub fn new() -> Self {
Default::default()
}
}

#[derive(Debug, Clone)]
pub struct KafkaCodec {
messages: Messages,
impl CodecBuilder for KafkaCodecBuilder {
type Decoder = KafkaDecoder;
type Encoder = KafkaEncoder;
fn build(&self) -> (KafkaDecoder, KafkaEncoder) {
(KafkaDecoder::new(), KafkaEncoder::new())
}
}

impl Default for KafkaCodec {
fn default() -> Self {
Self::new()
}
#[derive(Default)]
pub struct KafkaDecoder {
messages: Messages,
}

impl KafkaCodec {
pub fn new() -> KafkaCodec {
KafkaCodec { messages: vec![] }
impl KafkaDecoder {
pub fn new() -> Self {
KafkaDecoder::default()
}
}

Expand All @@ -43,7 +46,7 @@ fn get_length_of_full_message(src: &mut BytesMut) -> Option<usize> {
}
}

impl Decoder for KafkaCodec {
impl Decoder for KafkaDecoder {
type Item = Messages;
type Error = CodecReadError;

Expand All @@ -66,13 +69,21 @@ impl Decoder for KafkaCodec {
}
}

impl Encoder<Messages> for KafkaCodec {
#[derive(Default)]
pub struct KafkaEncoder {}

impl KafkaEncoder {
pub fn new() -> Self {
KafkaEncoder::default()
}
}

impl Encoder<Messages> for KafkaEncoder {
type Error = anyhow::Error;

fn encode(&mut self, item: Messages, dst: &mut BytesMut) -> Result<()> {
item.into_iter().try_for_each(|m| {
let start = dst.len();
// TODO: MessageType::Kafka
let result = match m.into_encodable(MessageType::Kafka)? {
Encodable::Bytes(bytes) => {
dst.extend_from_slice(&bytes);
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/src/sources/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::codec::kafka::KafkaCodec;
use crate::codec::kafka::KafkaCodecBuilder;
use crate::server::TcpCodecListener;
use crate::sources::Sources;
use crate::tls::{TlsAcceptor, TlsAcceptorConfig};
Expand Down Expand Up @@ -67,7 +67,7 @@ impl KafkaSource {
name.to_string(),
listen_addr.clone(),
hard_connection_limit.unwrap_or(false),
KafkaCodec::new(),
KafkaCodecBuilder::new(),
Arc::new(Semaphore::new(connection_limit.unwrap_or(512))),
trigger_shutdown_rx.clone(),
tls.map(TlsAcceptor::new).transpose()?,
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::codec::kafka::KafkaCodec;
use crate::codec::kafka::KafkaCodecBuilder;
use crate::error::ChainResponse;
use crate::message::Messages;
use crate::tcp;
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct KafkaSinkSingle {
impl Transform for KafkaSinkSingle {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
if self.outbound.is_none() {
let codec = KafkaCodec::new();
let codec = KafkaCodecBuilder::new();
let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
Expand Down

0 comments on commit d3b0300

Please sign in to comment.