Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec direction logging #1066

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions shotover-proxy/benches/benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cassandra_protocol::frame::message_result::{
use cassandra_protocol::frame::Version;
use criterion::{black_box, criterion_group, BatchSize, Criterion};
use shotover_proxy::codec::cassandra::CassandraCodecBuilder;
use shotover_proxy::codec::CodecBuilder;
use shotover_proxy::codec::{CodecBuilder, Direction};
use shotover_proxy::frame::cassandra::{parse_statement_single, Tracing};
use shotover_proxy::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use shotover_proxy::message::Message;
Expand All @@ -28,7 +28,7 @@ fn criterion_benchmark(c: &mut Criterion) {
},
}))];

let (_, mut encoder) = CassandraCodecBuilder::new().build();
let (_, mut encoder) = CassandraCodecBuilder::new(Direction::Sink).build();

group.bench_function("encode_cassandra_system.local_query", |b| {
b.iter_batched(
Expand All @@ -52,7 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) {
operation: CassandraOperation::Result(peers_v2_result()),
}))];

let (_, mut encoder) = CassandraCodecBuilder::new().build();
let (_, mut encoder) = CassandraCodecBuilder::new(Direction::Sink).build();

group.bench_function("encode_cassandra_system.local_result", |b| {
b.iter_batched(
Expand Down
54 changes: 32 additions & 22 deletions shotover-proxy/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Direction;
use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing};
use crate::frame::{CassandraFrame, Frame, MessageType};
Expand All @@ -17,23 +18,24 @@ use std::sync::RwLock;
use tokio_util::codec::{Decoder, Encoder};
use tracing::info;

#[derive(Clone, Default)]
pub struct CassandraCodecBuilder {}

impl CassandraCodecBuilder {
pub fn new() -> Self {
Self::default()
}
#[derive(Clone)]
pub struct CassandraCodecBuilder {
direction: Direction,
}

impl CodecBuilder for CassandraCodecBuilder {
type Decoder = CassandraDecoder;
type Encoder = CassandraEncoder;

fn new(direction: Direction) -> Self {
Self { direction }
}

fn build(&self) -> (CassandraDecoder, CassandraEncoder) {
let compression = Arc::new(RwLock::new(Compression::None));
(
CassandraDecoder::new(compression.clone()),
CassandraEncoder::new(compression),
CassandraDecoder::new(compression.clone(), self.direction),
CassandraEncoder::new(compression, self.direction),
)
}
}
Expand All @@ -42,14 +44,16 @@ pub struct CassandraDecoder {
compression: Arc<RwLock<Compression>>,
messages: Vec<Message>,
current_use_keyspace: Option<Identifier>,
direction: Direction,
}

impl CassandraDecoder {
pub fn new(compression: Arc<RwLock<Compression>>) -> CassandraDecoder {
pub fn new(compression: Arc<RwLock<Compression>>, direction: Direction) -> CassandraDecoder {
CassandraDecoder {
compression,
messages: vec![],
current_use_keyspace: None,
direction,
}
}
}
Expand Down Expand Up @@ -102,7 +106,8 @@ impl Decoder for CassandraDecoder {
// Clear the read bytes from the FramedReader
let bytes = src.split_to(frame_len);
tracing::debug!(
"incoming cassandra message:\n{}",
"{}: incoming cassandra message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);

Expand Down Expand Up @@ -253,11 +258,15 @@ fn reject_protocol_version(version: u8) -> CodecReadError {

pub struct CassandraEncoder {
compression: Arc<RwLock<Compression>>,
direction: Direction,
}

impl CassandraEncoder {
pub fn new(compression: Arc<RwLock<Compression>>) -> CassandraEncoder {
CassandraEncoder { compression }
pub fn new(compression: Arc<RwLock<Compression>>, direction: Direction) -> CassandraEncoder {
CassandraEncoder {
compression,
direction,
}
}
}

Expand Down Expand Up @@ -308,7 +317,8 @@ impl Encoder<Messages> for CassandraEncoder {
}
}
tracing::debug!(
"outgoing cassandra message:\n{}",
"{}: outgoing cassandra message:\n{}",
self.direction,
pretty_hex::pretty_hex(&&dst[start..])
);
}
Expand All @@ -319,7 +329,7 @@ impl Encoder<Messages> for CassandraEncoder {
#[cfg(test)]
mod cassandra_protocol_tests {
use crate::codec::cassandra::CassandraCodecBuilder;
use crate::codec::CodecBuilder;
use crate::codec::{CodecBuilder, Direction};
use crate::frame::cassandra::{
parse_statement_single, CassandraFrame, CassandraOperation, CassandraResult, Tracing,
};
Expand Down Expand Up @@ -376,7 +386,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_startup() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let mut startup_body: HashMap<String, String> = HashMap::new();
startup_body.insert("CQL_VERSION".into(), "3.0.0".into());
let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30");
Expand All @@ -392,7 +402,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_options() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!("040000000500000000");
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
Expand All @@ -406,7 +416,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_ready() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!("840000000200000000");
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
Expand All @@ -420,7 +430,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_register() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"040000010b000000310003000f544f504f4c4f47595f4348414e4745
000d5354415455535f4348414e4745000d534348454d415f4348414e4745"
Expand All @@ -443,7 +453,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_result() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"840000020800000099000000020000000100000009000673797374656
d000570656572730004706565720010000b646174615f63656e746572000d0007686f73745f6964000c000c70726566
Expand Down Expand Up @@ -551,7 +561,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_query_select() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"0400000307000000350000002e53454c454354202a2046524f4d20737973
74656d2e6c6f63616c205748455245206b6579203d20276c6f63616c27000100"
Expand All @@ -574,7 +584,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_query_insert() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"0400000307000000330000002c494e5345525420494e544f207379737465
6d2e666f6f2028626172292056414c554553202827626172322729000100"
Expand Down
55 changes: 33 additions & 22 deletions shotover-proxy/src/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Direction;
use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, Messages, ProtocolType};
Expand All @@ -7,16 +8,6 @@ use kafka_protocol::messages::ApiKey;
use std::sync::mpsc;
use tokio_util::codec::{Decoder, Encoder};

/// Depending on if the codec is used in a sink or a source requires different processing logic:
/// * Sources parse requests which do not require any special handling
/// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request
/// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder
#[derive(Copy, Clone)]
pub enum Direction {
Source,
Sink,
}

#[derive(Copy, Clone, Debug, PartialEq)]
pub struct RequestHeader {
pub api_key: ApiKey,
Expand All @@ -28,15 +19,18 @@ pub struct KafkaCodecBuilder {
direction: Direction,
}

impl KafkaCodecBuilder {
pub fn new(direction: Direction) -> Self {
KafkaCodecBuilder { direction }
}
}

// Depending on if the codec is used in a sink or a source requires different processing logic:
// * Sources parse requests which do not require any special handling
// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request
// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder
impl CodecBuilder for KafkaCodecBuilder {
type Decoder = KafkaDecoder;
type Encoder = KafkaEncoder;

fn new(direction: Direction) -> Self {
Self { direction }
}

fn build(&self) -> (KafkaDecoder, KafkaEncoder) {
let (tx, rx) = match self.direction {
Direction::Source => (None, None),
Expand All @@ -45,20 +39,28 @@ impl CodecBuilder for KafkaCodecBuilder {
(Some(tx), Some(rx))
}
};
(KafkaDecoder::new(rx), KafkaEncoder::new(tx))
(
KafkaDecoder::new(rx, self.direction),
KafkaEncoder::new(tx, self.direction),
)
}
}

pub struct KafkaDecoder {
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
messages: Messages,
direction: Direction,
}

impl KafkaDecoder {
pub fn new(request_header_rx: Option<mpsc::Receiver<RequestHeader>>) -> Self {
pub fn new(
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
direction: Direction,
) -> Self {
KafkaDecoder {
request_header_rx,
messages: vec![],
direction,
}
}
}
Expand All @@ -85,7 +87,8 @@ impl Decoder for KafkaDecoder {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"incoming kafka message:\n{}",
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header = if let Some(rx) = self.request_header_rx.as_ref() {
Expand All @@ -110,11 +113,18 @@ impl Decoder for KafkaDecoder {

pub struct KafkaEncoder {
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
direction: Direction,
}

impl KafkaEncoder {
pub fn new(request_header_tx: Option<mpsc::Sender<RequestHeader>>) -> Self {
KafkaEncoder { request_header_tx }
pub fn new(
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
direction: Direction,
) -> Self {
KafkaEncoder {
request_header_tx,
direction,
}
}
}

Expand All @@ -140,7 +150,8 @@ impl Encoder<Messages> for KafkaEncoder {
tx.send(RequestHeader { api_key, version })?;
}
tracing::debug!(
"outgoing kafka message:\n{}",
"{}: outgoing kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&&dst[start..])
);
result
Expand Down
18 changes: 18 additions & 0 deletions shotover-proxy/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
use crate::message::Messages;
use cassandra_protocol::compression::Compression;
use core::fmt;
use kafka::RequestHeader;
use tokio_util::codec::{Decoder, Encoder};

pub mod cassandra;
pub mod kafka;
pub mod redis;

#[derive(Copy, Clone)]
pub enum Direction {
Source,
Sink,
}

impl fmt::Display for Direction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sink => write!(f, "Sink"),
Self::Source => write!(f, "Source"),
}
}
}

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum CodecState {
Cassandra {
Expand Down Expand Up @@ -66,4 +82,6 @@ pub trait CodecBuilder: Clone + Send {
type Decoder: DecoderHalf;
type Encoder: EncoderHalf;
fn build(&self) -> (Self::Decoder, Self::Encoder);

fn new(direction: Direction) -> Self;
}
Loading