Skip to content

Commit

Permalink
Make TransformConfig into a trait
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 14, 2023
1 parent 682af0b commit 59a2f5d
Show file tree
Hide file tree
Showing 30 changed files with 492 additions and 354 deletions.
65 changes: 65 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ chacha20poly1305 = { version = "0.10.0", features = ["std"] }
generic-array = { version = "0.14", features = ["serde"] }
dyn-clone = "1.0.10"
kafka-protocol = "0.6.0"
typetag = "0.2.5"

[dev-dependencies]
criterion = { git = "https://github.com/shotover/criterion.rs", branch = "0.4.0-bench_with_input_fn", features = ["async_tokio"] }
Expand Down
19 changes: 12 additions & 7 deletions shotover-proxy/benches/benches/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use shotover_proxy::transforms::chain::{TransformChain, TransformChainBuilder};
use shotover_proxy::transforms::debug::returner::{DebugReturner, Response};
use shotover_proxy::transforms::filter::QueryTypeFilter;
use shotover_proxy::transforms::null::NullSink;
#[cfg(feature = "alpha-transforms")]
use shotover_proxy::transforms::protect::{KeyManagerConfig, ProtectConfig};
use shotover_proxy::transforms::redis::cluster_ports_rewrite::RedisClusterPortsRewrite;
use shotover_proxy::transforms::redis::timestamp_tagging::RedisTimestampTagger;
use shotover_proxy::transforms::throttling::RequestThrottlingConfig;
use shotover_proxy::transforms::Wrapper;
use shotover_proxy::transforms::{TransformConfig, Wrapper};

fn criterion_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand Down Expand Up @@ -175,11 +176,13 @@ fn criterion_benchmark(c: &mut Criterion) {
{
let chain = TransformChainBuilder::new(
vec![
RequestThrottlingConfig {
// an absurdly large value is given so that all messages will pass through
max_requests_per_second: std::num::NonZeroU32::new(100_000_000).unwrap(),
}
.get_builder()
rt.block_on(
RequestThrottlingConfig {
// an absurdly large value is given so that all messages will pass through
max_requests_per_second: std::num::NonZeroU32::new(100_000_000).unwrap(),
}
.get_builder("".to_owned()),
)
.unwrap(),
Box::<NullSink>::default(),
],
Expand Down Expand Up @@ -272,6 +275,7 @@ fn criterion_benchmark(c: &mut Criterion) {
});
}

#[cfg(feature = "alpha-transforms")]
{
let chain = TransformChainBuilder::new(
vec![
Expand All @@ -290,7 +294,7 @@ fn criterion_benchmark(c: &mut Criterion) {
kek_id: "".to_string(),
},
}
.get_builder(),
.get_builder("".to_owned()),
)
.unwrap(),
Box::<NullSink>::default(),
Expand Down Expand Up @@ -332,6 +336,7 @@ fn criterion_benchmark(c: &mut Criterion) {
}
}

#[cfg(feature = "alpha-transforms")]
fn cassandra_parsed_query(query: &str) -> Wrapper {
Wrapper::new_with_chain_name(
vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
Expand Down
89 changes: 89 additions & 0 deletions shotover-proxy/src/config/chain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::transforms::chain::TransformChainBuilder;
use crate::transforms::{TransformBuilder, TransformConfig};
use anyhow::Result;
use serde::de::{DeserializeSeed, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::Deserialize;
use std::fmt::{self, Debug};
use std::iter;

#[derive(Deserialize, Debug)]
pub struct TransformChainConfig(
#[serde(rename = "TransformChain", deserialize_with = "vec_transform_config")]
pub Vec<Box<dyn TransformConfig>>,
);

impl TransformChainConfig {
pub async fn get_builder(&self, name: String) -> Result<TransformChainBuilder> {
let mut transforms: Vec<Box<dyn TransformBuilder>> = Vec::new();
for tc in &self.0 {
transforms.push(tc.get_builder(name.clone()).await?)
}
Ok(TransformChainBuilder::new(transforms, name))
}
}

fn vec_transform_config<'de, D>(deserializer: D) -> Result<Vec<Box<dyn TransformConfig>>, D::Error>
where
D: Deserializer<'de>,
{
struct VecTransformConfigVisitor;

impl<'de> Visitor<'de> for VecTransformConfigVisitor {
type Value = Vec<Box<dyn TransformConfig>>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("list of TransformConfig")
}

fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error>
where
S: SeqAccess<'de>,
{
let mut vec = Vec::new();
while let Some(item) = seq.next_element_seed(TransformConfigVisitor)? {
vec.push(item);
}
Ok(vec)
}
}

struct TransformConfigVisitor;

impl<'de> Visitor<'de> for TransformConfigVisitor {
type Value = Box<dyn TransformConfig>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("TransformConfig")
}

fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let de = serde::de::value::MapAccessDeserializer::new(map);
Deserialize::deserialize(de)
}

fn visit_str<E>(self, string: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let singleton_map = iter::once((string, ()));
let de = serde::de::value::MapDeserializer::new(singleton_map);
Deserialize::deserialize(de)
}
}

impl<'de> DeserializeSeed<'de> for TransformConfigVisitor {
type Value = Box<dyn TransformConfig>;

fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(self)
}
}

deserializer.deserialize_seq(VecTransformConfigVisitor)
}
1 change: 1 addition & 0 deletions shotover-proxy/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Context, Result};
use serde::Deserialize;

pub mod chain;
pub mod topology;

#[derive(Deserialize, Debug, Clone)]
Expand Down
Loading

0 comments on commit 59a2f5d

Please sign in to comment.