Skip to content

Commit

Permalink
Remove remaining custom transform clone impls
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 8, 2023
1 parent a0e7645 commit 5bff262
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 126 deletions.
67 changes: 39 additions & 28 deletions shotover-proxy/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl CassandraSinkSingleConfig {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(TransformBuilder::CassandraSinkSingle(
CassandraSinkSingle::new(
CassandraSinkSingleBuilder::new(
self.address.clone(),
chain_name,
tls,
Expand All @@ -40,57 +40,68 @@ impl CassandraSinkSingleConfig {
}
}

pub struct CassandraSinkSingle {
#[derive(Clone)]
pub struct CassandraSinkSingleBuilder {
version: Option<Version>,
address: String,
outbound: Option<CassandraConnection>,
chain_name: String,
failed_requests: Counter,
tls: Option<TlsConnector>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
connect_timeout: Duration,
read_timeout: Option<Duration>,
}

impl Clone for CassandraSinkSingle {
fn clone(&self) -> Self {
CassandraSinkSingle {
version: self.version,
address: self.address.clone(),
outbound: None,
chain_name: self.chain_name.clone(),
tls: self.tls.clone(),
failed_requests: self.failed_requests.clone(),
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
}
}
}

impl CassandraSinkSingle {
impl CassandraSinkSingleBuilder {
pub fn new(
address: String,
chain_name: String,
tls: Option<TlsConnector>,
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> CassandraSinkSingle {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name.clone(), "transform" => "CassandraSinkSingle");
) -> CassandraSinkSingleBuilder {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name, "transform" => "CassandraSinkSingle");
let receive_timeout = timeout.map(Duration::from_secs);

CassandraSinkSingle {
CassandraSinkSingleBuilder {
version: None,
address,
outbound: None,
chain_name,
failed_requests,
tls,
pushed_messages_tx: None,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
}
}

pub fn build(&self) -> CassandraSinkSingle {
CassandraSinkSingle {
outbound: None,
version: self.version,
address: self.address.clone(),
tls: self.tls.clone(),
failed_requests: self.failed_requests.clone(),
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
}
}

pub fn validate(&self) -> Vec<String> {
vec![]
}

pub fn is_terminating(&self) -> bool {
true
}
}

pub struct CassandraSinkSingle {
version: Option<Version>,
address: String,
outbound: Option<CassandraConnection>,
failed_requests: Counter,
tls: Option<TlsConnector>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
connect_timeout: Duration,
read_timeout: Option<Duration>,
}

impl CassandraSinkSingle {
Expand Down
22 changes: 13 additions & 9 deletions shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::transforms::cassandra::peers_rewrite::{
use crate::transforms::cassandra::sink_cluster::{
CassandraSinkCluster, CassandraSinkClusterBuilder, CassandraSinkClusterConfig,
};
use crate::transforms::cassandra::sink_single::{CassandraSinkSingle, CassandraSinkSingleConfig};
use crate::transforms::cassandra::sink_single::{
CassandraSinkSingle, CassandraSinkSingleBuilder, CassandraSinkSingleConfig,
};
use crate::transforms::chain::TransformChainBuilder;
use crate::transforms::coalesce::{Coalesce, CoalesceConfig};
use crate::transforms::debug::force_parse::DebugForceParse;
Expand All @@ -33,9 +35,11 @@ use crate::transforms::redis::cluster_ports_rewrite::{
RedisClusterPortsRewrite, RedisClusterPortsRewriteConfig,
};
use crate::transforms::redis::sink_cluster::{RedisSinkCluster, RedisSinkClusterConfig};
use crate::transforms::redis::sink_single::{RedisSinkSingle, RedisSinkSingleConfig};
use crate::transforms::redis::sink_single::{
RedisSinkSingle, RedisSinkSingleBuilder, RedisSinkSingleConfig,
};
use crate::transforms::redis::timestamp_tagging::RedisTimestampTagger;
use crate::transforms::tee::{Tee, TeeConfig};
use crate::transforms::tee::{Tee, TeeBuilder, TeeConfig};
use crate::transforms::throttling::{RequestThrottling, RequestThrottlingConfig};
use anyhow::{anyhow, Result};
use async_recursion::async_recursion;
Expand Down Expand Up @@ -77,12 +81,12 @@ pub mod util;
// It would also affect whether sources pointing into the same chain share state, which will require careful consideration
#[derive(Clone, IntoStaticStr)]
pub enum TransformBuilder {
CassandraSinkSingle(CassandraSinkSingle),
CassandraSinkSingle(CassandraSinkSingleBuilder),
CassandraSinkCluster(Box<CassandraSinkClusterBuilder>),
RedisSinkSingle(RedisSinkSingle),
RedisSinkSingle(RedisSinkSingleBuilder),
CassandraPeersRewrite(CassandraPeersRewrite),
RedisCache(SimpleRedisCacheBuilder),
Tee(Tee),
Tee(TeeBuilder),
NullSink(NullSink),
#[cfg(test)]
Loopback(Loopback),
Expand All @@ -106,16 +110,16 @@ pub enum TransformBuilder {
impl TransformBuilder {
pub fn build(&self) -> Transforms {
match self {
TransformBuilder::CassandraSinkSingle(t) => Transforms::CassandraSinkSingle(t.clone()),
TransformBuilder::CassandraSinkSingle(t) => Transforms::CassandraSinkSingle(t.build()),
TransformBuilder::CassandraSinkCluster(t) => {
Transforms::CassandraSinkCluster(t.build())
}
TransformBuilder::CassandraPeersRewrite(t) => {
Transforms::CassandraPeersRewrite(t.clone())
}
TransformBuilder::RedisCache(t) => Transforms::RedisCache(t.build()),
TransformBuilder::Tee(t) => Transforms::Tee(t.clone()),
TransformBuilder::RedisSinkSingle(t) => Transforms::RedisSinkSingle(t.clone()),
TransformBuilder::Tee(t) => Transforms::Tee(t.build()),
TransformBuilder::RedisSinkSingle(t) => Transforms::RedisSinkSingle(t.build()),
TransformBuilder::ConsistentScatter(t) => Transforms::ConsistentScatter(t.clone()),
TransformBuilder::RedisTimestampTagger(t) => {
Transforms::RedisTimestampTagger(t.clone())
Expand Down
89 changes: 50 additions & 39 deletions shotover-proxy/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,67 +31,78 @@ pub struct RedisSinkSingleConfig {
impl RedisSinkSingleConfig {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(TransformBuilder::RedisSinkSingle(RedisSinkSingle::new(
self.address.clone(),
tls,
chain_name,
self.connect_timeout_ms,
)))
Ok(TransformBuilder::RedisSinkSingle(
RedisSinkSingleBuilder::new(
self.address.clone(),
tls,
chain_name,
self.connect_timeout_ms,
),
))
}
}

type RedisFramed = Framed<Pin<Box<dyn AsyncStream + Send + Sync>>, RedisCodec>;

struct Connection {
outbound_tx: SplitSink<RedisFramed, Messages>,
response_messages_rx: mpsc::UnboundedReceiver<Message>,
sent_message_type_tx: mpsc::UnboundedSender<MessageType>,
}

pub struct RedisSinkSingle {
#[derive(Clone)]
pub struct RedisSinkSingleBuilder {
address: String,
tls: Option<TlsConnector>,
connection: Option<Connection>,
chain_name: String,
failed_requests: Counter,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
connect_timeout: Duration,
}

impl Clone for RedisSinkSingle {
fn clone(&self) -> Self {
RedisSinkSingle {
address: self.address.clone(),
tls: self.tls.clone(),
connection: None,
chain_name: self.chain_name.clone(),
failed_requests: self.failed_requests.clone(),
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
}
}
}

impl RedisSinkSingle {
impl RedisSinkSingleBuilder {
pub fn new(
address: String,
tls: Option<TlsConnector>,
chain_name: String,
connect_timeout_ms: u64,
) -> RedisSinkSingle {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name.clone(), "transform" => "RedisSinkSingle");
) -> Self {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name, "transform" => "RedisSinkSingle");
let connect_timeout = Duration::from_millis(connect_timeout_ms);

RedisSinkSingle {
RedisSinkSingleBuilder {
address,
tls,
connection: None,
chain_name,
failed_requests,
pushed_messages_tx: None,
connect_timeout,
}
}

pub fn build(&self) -> RedisSinkSingle {
RedisSinkSingle {
address: self.address.clone(),
tls: self.tls.clone(),
connection: None,
failed_requests: self.failed_requests.clone(),
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
}
}

pub fn validate(&self) -> Vec<String> {
vec![]
}

pub fn is_terminating(&self) -> bool {
true
}
}

type RedisFramed = Framed<Pin<Box<dyn AsyncStream + Send + Sync>>, RedisCodec>;

struct Connection {
outbound_tx: SplitSink<RedisFramed, Messages>,
response_messages_rx: mpsc::UnboundedReceiver<Message>,
sent_message_type_tx: mpsc::UnboundedSender<MessageType>,
}

pub struct RedisSinkSingle {
address: String,
tls: Option<TlsConnector>,
connection: Option<Connection>,
failed_requests: Counter,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
connect_timeout: Duration,
}

#[async_trait]
Expand Down
Loading

0 comments on commit 5bff262

Please sign in to comment.