Skip to content

Commit

Permalink
Split transforms into Transform and TransformBuilder stages (#935)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Dec 12, 2022
1 parent a54a256 commit 75f4938
Show file tree
Hide file tree
Showing 27 changed files with 569 additions and 404 deletions.
61 changes: 35 additions & 26 deletions shotover-proxy/benches/benches/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,27 @@ use shotover_proxy::frame::RedisFrame;
use shotover_proxy::frame::{CassandraFrame, CassandraOperation, Frame, MessageType};
use shotover_proxy::message::{Message, QueryType};
use shotover_proxy::transforms::cassandra::peers_rewrite::CassandraPeersRewrite;
use shotover_proxy::transforms::chain::TransformChain;
use shotover_proxy::transforms::chain::{TransformChain, TransformChainBuilder};
use shotover_proxy::transforms::debug::returner::{DebugReturner, Response};
use shotover_proxy::transforms::filter::QueryTypeFilter;
use shotover_proxy::transforms::null::Null;
use shotover_proxy::transforms::protect::{KeyManagerConfig, ProtectConfig};
use shotover_proxy::transforms::redis::cluster_ports_rewrite::RedisClusterPortsRewrite;
use shotover_proxy::transforms::redis::timestamp_tagging::RedisTimestampTagger;
use shotover_proxy::transforms::throttling::RequestThrottlingConfig;
use shotover_proxy::transforms::{Transforms, Wrapper};
use shotover_proxy::transforms::{TransformBuilder, Wrapper};

fn criterion_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut group = c.benchmark_group("transform");
group.noise_threshold(0.2);

{
let chain =
TransformChain::new(vec![Transforms::Null(Null::default())], "bench".to_string());
let chain = TransformChainBuilder::new(
vec![TransformBuilder::Null(Null::default())],
"bench".to_string(),
)
.build();
let wrapper = Wrapper::new_with_chain_name(
vec![Message::from_frame(Frame::None)],
chain.name.clone(),
Expand All @@ -47,15 +50,16 @@ fn criterion_benchmark(c: &mut Criterion) {
}

{
let chain = TransformChain::new(
let chain = TransformChainBuilder::new(
vec![
Transforms::QueryTypeFilter(QueryTypeFilter {
TransformBuilder::QueryTypeFilter(QueryTypeFilter {
filter: QueryType::Read,
}),
Transforms::DebugReturner(DebugReturner::new(Response::Redis("a".into()))),
TransformBuilder::DebugReturner(DebugReturner::new(Response::Redis("a".into()))),
],
"bench".to_string(),
);
)
.build();
let wrapper = Wrapper::new_with_chain_name(
vec![
Message::from_frame(Frame::Redis(RedisFrame::Array(vec![
Expand Down Expand Up @@ -86,18 +90,19 @@ fn criterion_benchmark(c: &mut Criterion) {
}

{
let chain = TransformChain::new(
let chain = TransformChainBuilder::new(
vec![
Transforms::RedisTimestampTagger(RedisTimestampTagger::new()),
Transforms::DebugReturner(DebugReturner::new(Response::Message(vec![
TransformBuilder::RedisTimestampTagger(RedisTimestampTagger::new()),
TransformBuilder::DebugReturner(DebugReturner::new(Response::Message(vec![
Message::from_frame(Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString(Bytes::from_static(b"1")), // real frame
RedisFrame::BulkString(Bytes::from_static(b"1")), // timestamp
]))),
]))),
],
"bench".to_string(),
);
)
.build();
let wrapper_set = Wrapper::new_with_chain_name(
vec![Message::from_frame(Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString(Bytes::from_static(b"SET")),
Expand Down Expand Up @@ -143,13 +148,14 @@ fn criterion_benchmark(c: &mut Criterion) {
}

{
let chain = TransformChain::new(
let chain = TransformChainBuilder::new(
vec![
Transforms::RedisClusterPortsRewrite(RedisClusterPortsRewrite::new(2004)),
Transforms::Null(Null::default()),
TransformBuilder::RedisClusterPortsRewrite(RedisClusterPortsRewrite::new(2004)),
TransformBuilder::Null(Null::default()),
],
"bench".to_string(),
);
)
.build();
let wrapper = Wrapper::new_with_chain_name(
vec![Message::from_frame(Frame::Redis(RedisFrame::Array(vec![
RedisFrame::BulkString(Bytes::from_static(b"SET")),
Expand All @@ -174,7 +180,7 @@ fn criterion_benchmark(c: &mut Criterion) {
}

{
let chain = TransformChain::new(
let chain = TransformChainBuilder::new(
vec![
rt.block_on(
RequestThrottlingConfig {
Expand All @@ -184,10 +190,11 @@ fn criterion_benchmark(c: &mut Criterion) {
.get_transform(),
)
.unwrap(),
Transforms::Null(Null::default()),
TransformBuilder::Null(Null::default()),
],
"bench".to_string(),
);
)
.build();
let wrapper = Wrapper::new_with_chain_name(
vec![Message::from_bytes(
Bytes::from(
Expand Down Expand Up @@ -218,13 +225,14 @@ fn criterion_benchmark(c: &mut Criterion) {
}

{
let chain = TransformChain::new(
let chain = TransformChainBuilder::new(
vec![
Transforms::CassandraPeersRewrite(CassandraPeersRewrite::new(9042)),
Transforms::Null(Null::default()),
TransformBuilder::CassandraPeersRewrite(CassandraPeersRewrite::new(9042)),
TransformBuilder::Null(Null::default()),
],
"bench".into(),
);
)
.build();

let wrapper = Wrapper::new_with_chain_name(
vec![Message::from_bytes(
Expand Down Expand Up @@ -274,7 +282,7 @@ fn criterion_benchmark(c: &mut Criterion) {
}

{
let chain = TransformChain::new(
let chain = TransformChainBuilder::new(
vec![
rt.block_on(
ProtectConfig {
Expand All @@ -294,10 +302,11 @@ fn criterion_benchmark(c: &mut Criterion) {
.get_transform(),
)
.unwrap(),
Transforms::Null(Null::default()),
TransformBuilder::Null(Null::default()),
],
"bench".into(),
);
)
.build();

let wrapper = cassandra_parsed_query(
"INSERT INTO test_protect_keyspace.unprotected_table (pk, cluster, col1, col2, col3) VALUES ('pk1', 'cluster', 'I am gonna get encrypted!!', 42, true);"
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/src/config/topology.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::sources::{Sources, SourcesConfig};
use crate::transforms::chain::TransformChain;
use crate::transforms::chain::TransformChainBuilder;
use crate::transforms::{build_chain_from_config, TransformsConfig};
use anyhow::{anyhow, Result};
use itertools::Itertools;
Expand Down Expand Up @@ -30,7 +30,7 @@ impl Topology {
Topology::topology_from_config(config)
}

async fn build_chains(&self) -> Result<HashMap<String, TransformChain>> {
async fn build_chains(&self) -> Result<HashMap<String, TransformChainBuilder>> {
let mut result = HashMap::new();
for (key, value) in &self.chain_config {
result.insert(
Expand Down
10 changes: 5 additions & 5 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::message::Messages;
use crate::tls::TlsAcceptor;
use crate::transforms::chain::TransformChain;
use crate::transforms::chain::{TransformChain, TransformChainBuilder};
use crate::transforms::Wrapper;
use anyhow::{anyhow, Context, Result};
use futures::future::join_all;
Expand Down Expand Up @@ -50,8 +50,7 @@ pub trait Codec: CodecReadHalf + CodecWriteHalf {}
impl<T: CodecReadHalf + CodecWriteHalf> Codec for T {}

pub struct TcpCodecListener<C: Codec> {
chain: TransformChain,

chain: TransformChainBuilder,
source_name: String,

/// TCP listener supplied by the `run` caller.
Expand Down Expand Up @@ -97,7 +96,7 @@ pub struct TcpCodecListener<C: Codec> {
impl<C: Codec + 'static> TcpCodecListener<C> {
#![allow(clippy::too_many_arguments)]
pub async fn new(
chain: TransformChain,
chain: TransformChainBuilder,
source_name: String,
listen_addr: String,
hard_connection_limit: bool,
Expand Down Expand Up @@ -182,6 +181,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
self.listener = Some(create_listener(&self.listen_addr).await?);
}
}

self.connection_count = self.connection_count.wrapping_add(1);
let span = tracing::error_span!(
"connection",
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
tokio::sync::mpsc::unbounded_channel::<Messages>();

let mut handler = Handler {
chain: self.chain.clone_with_pushed_messages_tx(pushed_messages_tx),
chain: self.chain.build_with_pushed_messages(pushed_messages_tx),
client_details: peer,
conn_details: conn_string,
source_details: self.source_name.clone(),
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/sources/cassandra_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::codec::cassandra::CassandraCodec;
use crate::server::TcpCodecListener;
use crate::sources::Sources;
use crate::tls::{TlsAcceptor, TlsAcceptorConfig};
use crate::transforms::chain::TransformChain;
use crate::transforms::chain::TransformChainBuilder;
use anyhow::Result;
use serde::Deserialize;
use std::sync::Arc;
Expand All @@ -22,7 +22,7 @@ pub struct CassandraConfig {
impl CassandraConfig {
pub async fn get_source(
&self,
chain: &TransformChain,
chain: &TransformChainBuilder,
trigger_shutdown_rx: watch::Receiver<bool>,
) -> Result<Vec<Sources>> {
Ok(vec![Sources::Cassandra(
Expand Down Expand Up @@ -50,7 +50,7 @@ pub struct CassandraSource {
impl CassandraSource {
#![allow(clippy::too_many_arguments)]
pub async fn new(
chain: &TransformChain,
chain: &TransformChainBuilder,
listen_addr: String,
mut trigger_shutdown_rx: watch::Receiver<bool>,
connection_limit: Option<usize>,
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sources::cassandra_source::{CassandraConfig, CassandraSource};
use crate::sources::redis_source::{RedisConfig, RedisSource};
use crate::transforms::chain::TransformChain;
use crate::transforms::chain::TransformChainBuilder;
use anyhow::Result;
use serde::Deserialize;
use tokio::sync::watch;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub enum SourcesConfig {
impl SourcesConfig {
pub(crate) async fn get_source(
&self,
chain: &TransformChain,
chain: &TransformChainBuilder,
trigger_shutdown_rx: watch::Receiver<bool>,
) -> Result<Vec<Sources>> {
match self {
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/sources/redis_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::codec::redis::RedisCodec;
use crate::server::TcpCodecListener;
use crate::sources::Sources;
use crate::tls::{TlsAcceptor, TlsAcceptorConfig};
use crate::transforms::chain::TransformChain;
use crate::transforms::chain::TransformChainBuilder;
use anyhow::Result;
use serde::Deserialize;
use std::sync::Arc;
Expand All @@ -22,7 +22,7 @@ pub struct RedisConfig {
impl RedisConfig {
pub async fn get_source(
&self,
chain: &TransformChain,
chain: &TransformChainBuilder,
trigger_shutdown_rx: watch::Receiver<bool>,
) -> Result<Vec<Sources>> {
RedisSource::new(
Expand All @@ -49,7 +49,7 @@ pub struct RedisSource {
impl RedisSource {
#![allow(clippy::too_many_arguments)]
pub async fn new(
chain: &TransformChain,
chain: &TransformChainBuilder,
listen_addr: String,
mut trigger_shutdown_rx: watch::Receiver<bool>,
connection_limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::message::{IntSize, Message, MessageValue};
use crate::transforms::cassandra::peers_rewrite::CassandraOperation::Event;
use crate::{
error::ChainResponse,
transforms::{Transform, Transforms, Wrapper},
transforms::{Transform, TransformBuilder, Wrapper},
};
use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -19,8 +19,8 @@ pub struct CassandraPeersRewriteConfig {
}

impl CassandraPeersRewriteConfig {
pub async fn get_transform(&self) -> Result<Transforms> {
Ok(Transforms::CassandraPeersRewrite(
pub async fn get_transform(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::CassandraPeersRewrite(
CassandraPeersRewrite::new(self.port),
))
}
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{IntSize, Message, MessageValue, Messages, Metadata};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::{CassandraConnection, Response};
use crate::transforms::{Transform, Transforms, Wrapper};
use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cassandra_protocol::events::ServerEvent;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct CassandraSinkClusterConfig {
}

impl CassandraSinkClusterConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<Transforms> {
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
let mut shotover_nodes = self.shotover_nodes.clone();
let index = self
Expand All @@ -75,7 +75,7 @@ impl CassandraSinkClusterConfig {
})?;
let local_node = shotover_nodes.remove(index);

Ok(Transforms::CassandraSinkCluster(Box::new(
Ok(TransformBuilder::CassandraSinkCluster(Box::new(
CassandraSinkCluster::new(
self.first_contact_points.clone(),
shotover_nodes,
Expand Down
20 changes: 11 additions & 9 deletions shotover-proxy/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::frame::cassandra::CassandraMetadata;
use crate::message::{Messages, Metadata};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::Response;
use crate::transforms::{Transform, Transforms, Wrapper};
use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cassandra_protocol::frame::Version;
Expand All @@ -26,15 +26,17 @@ pub struct CassandraSinkSingleConfig {
}

impl CassandraSinkSingleConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<Transforms> {
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(Transforms::CassandraSinkSingle(CassandraSinkSingle::new(
self.address.clone(),
chain_name,
tls,
self.connect_timeout_ms,
self.read_timeout,
)))
Ok(TransformBuilder::CassandraSinkSingle(
CassandraSinkSingle::new(
self.address.clone(),
chain_name,
tls,
self.connect_timeout_ms,
self.read_timeout,
),
))
}
}

Expand Down
Loading

0 comments on commit 75f4938

Please sign in to comment.