Skip to content

Commit

Permalink
Simplify ProtocolType
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 21, 2024
1 parent f501c7e commit 65d4f6b
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 52 deletions.
7 changes: 4 additions & 3 deletions shotover/benches/benches/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use cassandra_protocol::compression::Compression;
use cassandra_protocol::{consistency::Consistency, frame::Version, query::QueryParams};
use criterion::{criterion_group, BatchSize, Criterion};
use hex_literal::hex;
use shotover::codec::CodecState;
use shotover::frame::cassandra::{parse_statement_single, Tracing};
use shotover::frame::RedisFrame;
use shotover::frame::{CassandraFrame, CassandraOperation, Frame};
use shotover::message::{Message, MessageIdMap, ProtocolType, QueryType};
use shotover::message::{Message, MessageIdMap, QueryType};
use shotover::transforms::cassandra::peers_rewrite::CassandraPeersRewrite;
use shotover::transforms::chain::{TransformChain, TransformChainBuilder};
use shotover::transforms::debug::returner::{DebugReturner, Response};
Expand Down Expand Up @@ -210,7 +211,7 @@ fn criterion_benchmark(c: &mut Criterion) {
)
.to_vec(),
),
ProtocolType::Cassandra {
CodecState::Cassandra {
compression: Compression::None,
},
)],
Expand Down Expand Up @@ -264,7 +265,7 @@ fn criterion_benchmark(c: &mut Criterion) {
}
.encode(Compression::None)
.into(),
ProtocolType::Cassandra {
CodecState::Cassandra {
compression: Compression::None,
},
)],
Expand Down
8 changes: 4 additions & 4 deletions shotover/benches/benches/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use bytes::{Bytes, BytesMut};
use criterion::{black_box, criterion_group, BatchSize, Criterion};
use shotover::codec::kafka::KafkaCodecBuilder;
use shotover::codec::{CodecBuilder, Direction};
use shotover::message::{Message, ProtocolType};
use shotover::codec::{CodecBuilder, CodecState, Direction};
use shotover::message::Message;
use tokio_util::codec::{Decoder, Encoder};

const KAFKA_REQUESTS: &[(&[u8], &str)] = &[
Expand Down Expand Up @@ -76,7 +76,7 @@ fn criterion_benchmark(c: &mut Criterion) {
{
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
ProtocolType::Kafka {
CodecState::Kafka {
request_header: None,
},
);
Expand Down Expand Up @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) {
for (message, _) in KAFKA_REQUESTS {
let mut message = Message::from_bytes(
Bytes::from(message.to_vec()),
ProtocolType::Kafka {
CodecState::Kafka {
request_header: None,
},
);
Expand Down
5 changes: 3 additions & 2 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{CodecBuilder, CodecReadError, CodecWriteError, Direction};
use crate::codec::CodecState;
use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing};
use crate::frame::{CassandraFrame, Frame, MessageType};
use crate::message::{Encodable, Message, Messages, Metadata};
Expand Down Expand Up @@ -358,7 +359,7 @@ impl CassandraDecoder {

let message = Message::from_bytes_at_instant(
bytes.freeze(),
crate::message::ProtocolType::Cassandra {
CodecState::Cassandra {
compression: if compressed {
compression
} else {
Expand Down Expand Up @@ -510,7 +511,7 @@ impl CassandraDecoder {

envelopes.push(Message::from_bytes_at_instant(
envelope,
crate::message::ProtocolType::Cassandra {
CodecState::Cassandra {
compression: Compression::None,
},
Some(received_at),
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{message_latency, CodecWriteError, Direction};
use crate::codec::{CodecBuilder, CodecReadError};
use crate::codec::{CodecBuilder, CodecReadError, CodecState};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, MessageId, Messages, ProtocolType};
use crate::message::{Encodable, Message, MessageId, Messages};
use anyhow::{anyhow, Result};
use bytes::BytesMut;
use kafka_protocol::messages::ApiKey;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Decoder for KafkaDecoder {
.map_err(|_| CodecReadError::Parser(anyhow!("kafka encoder half was lost")))?;
let mut message = Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka {
CodecState::Kafka {
request_header: Some(header),
},
Some(received_at),
Expand All @@ -122,7 +122,7 @@ impl Decoder for KafkaDecoder {
} else {
Message::from_bytes_at_instant(
bytes.freeze(),
ProtocolType::Kafka {
CodecState::Kafka {
request_header: None,
},
Some(received_at),
Expand Down
15 changes: 8 additions & 7 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! parsed AST-like representations of messages

use crate::{codec::CodecState, message::ProtocolType};
use crate::codec::CodecState;
use anyhow::{anyhow, Result};
use bytes::Bytes;
#[cfg(feature = "cassandra")]
Expand Down Expand Up @@ -38,17 +38,18 @@ pub enum MessageType {
OpenSearch,
}

impl From<&ProtocolType> for MessageType {
fn from(value: &ProtocolType) -> Self {
impl From<&CodecState> for MessageType {
fn from(value: &CodecState) -> Self {
match value {
#[cfg(feature = "cassandra")]
ProtocolType::Cassandra { .. } => Self::Cassandra,
CodecState::Cassandra { .. } => Self::Cassandra,
#[cfg(feature = "redis")]
ProtocolType::Redis => Self::Redis,
CodecState::Redis => Self::Redis,
#[cfg(feature = "kafka")]
ProtocolType::Kafka { .. } => Self::Kafka,
CodecState::Kafka { .. } => Self::Kafka,
#[cfg(feature = "opensearch")]
ProtocolType::OpenSearch => Self::OpenSearch,
CodecState::OpenSearch => Self::OpenSearch,
CodecState::Dummy => Self::Dummy,
}
}
}
Expand Down
39 changes: 7 additions & 32 deletions shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Message and supporting types - used to hold a message/query/result going between the client and database

#[cfg(feature = "kafka")]
use crate::codec::kafka::RequestHeader;
use crate::codec::CodecState;
#[cfg(feature = "cassandra")]
use crate::frame::{cassandra, cassandra::CassandraMetadata};
Expand All @@ -10,8 +8,6 @@ use crate::frame::{redis::redis_query_type, RedisFrame};
use crate::frame::{Frame, MessageType};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
#[cfg(feature = "cassandra")]
use cassandra_protocol::compression::Compression;
use derivative::Derivative;
use fnv::FnvBuildHasher;
use nonzero_ext::nonzero;
Expand All @@ -37,36 +33,15 @@ pub enum Metadata {
#[derive(PartialEq)]
pub enum ProtocolType {
#[cfg(feature = "cassandra")]
Cassandra { compression: Compression },
Cassandra,
#[cfg(feature = "redis")]
Redis,
#[cfg(feature = "kafka")]
Kafka {
request_header: Option<RequestHeader>,
},
Kafka,
#[cfg(feature = "opensearch")]
OpenSearch,
}

impl From<&ProtocolType> for CodecState {
fn from(value: &ProtocolType) -> Self {
match value {
#[cfg(feature = "cassandra")]
ProtocolType::Cassandra { compression } => Self::Cassandra {
compression: *compression,
},
#[cfg(feature = "redis")]
ProtocolType::Redis => Self::Redis,
#[cfg(feature = "kafka")]
ProtocolType::Kafka { request_header } => Self::Kafka {
request_header: *request_header,
},
#[cfg(feature = "opensearch")]
ProtocolType::OpenSearch => Self::OpenSearch,
}
}
}

pub type Messages = Vec<Message>;

/// Unique identifier for the message assigned by shotover at creation time.
Expand Down Expand Up @@ -121,16 +96,16 @@ impl Message {
/// Providing just the bytes results in better performance when only the raw bytes are available.
pub fn from_bytes_at_instant(
bytes: Bytes,
protocol_type: ProtocolType,
codec_state: CodecState,
received_from_source_or_sink_at: Option<Instant>,
) -> Self {
Message {
inner: Some(MessageInner::RawBytes {
bytes,
message_type: MessageType::from(&protocol_type),
message_type: MessageType::from(&codec_state),
}),
meta_timestamp: None,
codec_state: CodecState::from(&protocol_type),
codec_state,
received_from_source_or_sink_at,
id: rand::random(),
request_id: None,
Expand Down Expand Up @@ -173,8 +148,8 @@ impl Message {
}

/// Same as [`Message::from_bytes`] but `received_from_source_or_sink_at` is set to None.
pub fn from_bytes(bytes: Bytes, protocol_type: ProtocolType) -> Self {
Self::from_bytes_at_instant(bytes, protocol_type, None)
pub fn from_bytes(bytes: Bytes, codec_state: CodecState) -> Self {
Self::from_bytes_at_instant(bytes, codec_state, None)
}

/// Same as [`Message::from_frame`] but `received_from_source_or_sink_at` is set to None.
Expand Down

0 comments on commit 65d4f6b

Please sign in to comment.