Skip to content

Commit

Permalink
Merge branch 'main' into remove_clone_from_transform_config
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Mar 1, 2023
2 parents 51ee6e3 + a47c5d6 commit 51b5f81
Showing 1 changed file with 46 additions and 32 deletions.
78 changes: 46 additions & 32 deletions shotover-proxy/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,30 @@ use tracing::trace;
#[derive(Clone)]
pub struct TeeBuilder {
pub tx: TransformChainBuilder,
pub mismatch_chain: Option<TransformChainBuilder>,
pub buffer_size: usize,
pub behavior: ConsistencyBehavior,
pub behavior: ConsistencyBehaviorBuilder,
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
}

#[derive(Clone)]
pub enum ConsistencyBehaviorBuilder {
Ignore,
FailOnMismatch,
SubchainOnMismatch(TransformChainBuilder),
}

impl TeeBuilder {
pub fn new(
tx: TransformChainBuilder,
mismatch_chain: Option<TransformChainBuilder>,
buffer_size: usize,
behavior: ConsistencyBehavior,
behavior: ConsistencyBehaviorBuilder,
timeout_micros: Option<u64>,
) -> Self {
let dropped_messages = register_counter!("tee_dropped_messages", "chain" => "Tee");

TeeBuilder {
tx,
mismatch_chain,
buffer_size,
behavior,
timeout_micros,
Expand All @@ -44,12 +48,14 @@ impl TransformBuilder for TeeBuilder {
fn build(&self) -> Transforms {
Transforms::Tee(Tee {
tx: self.tx.build_buffered(self.buffer_size),
mismatch_chain: self
.mismatch_chain
.as_ref()
.map(|x| x.build_buffered(self.buffer_size)),
behavior: match &self.behavior {
ConsistencyBehaviorBuilder::Ignore => ConsistencyBehavior::Ignore,
ConsistencyBehaviorBuilder::FailOnMismatch => ConsistencyBehavior::FailOnMismatch,
ConsistencyBehaviorBuilder::SubchainOnMismatch(chain) => {
ConsistencyBehavior::SubchainOnMismatch(chain.build_buffered(self.buffer_size))
}
},
buffer_size: self.buffer_size,
behavior: self.behavior.clone(),
timeout_micros: self.timeout_micros,
dropped_messages: self.dropped_messages.clone(),
})
Expand All @@ -60,7 +66,7 @@ impl TransformBuilder for TeeBuilder {
}

fn validate(&self) -> Vec<String> {
if let Some(mismatch_chain) = &self.mismatch_chain {
if let ConsistencyBehaviorBuilder::SubchainOnMismatch(mismatch_chain) = &self.behavior {
let mut errors = mismatch_chain
.validate()
.iter()
Expand All @@ -80,44 +86,54 @@ impl TransformBuilder for TeeBuilder {

pub struct Tee {
pub tx: BufferedChain,
pub mismatch_chain: Option<BufferedChain>,
pub buffer_size: usize,
pub behavior: ConsistencyBehavior,
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
}

#[derive(Deserialize, Debug, Clone)]
pub enum ConsistencyBehavior {
Ignore,
FailOnMismatch,
SubchainOnMismatch(Vec<TransformsConfig>),
SubchainOnMismatch(BufferedChain),
}

#[derive(Deserialize, Debug)]
pub struct TeeConfig {
pub behavior: Option<ConsistencyBehavior>,
pub behavior: Option<ConsistencyBehaviorConfig>,
pub timeout_micros: Option<u64>,
pub chain: Vec<TransformsConfig>,
pub buffer_size: Option<usize>,
}

#[derive(Deserialize, Debug, Clone)]
pub enum ConsistencyBehaviorConfig {
Ignore,
FailOnMismatch,
SubchainOnMismatch(Vec<TransformsConfig>),
}

impl TeeConfig {
pub async fn get_builder(&self) -> Result<Box<dyn TransformBuilder>> {
let buffer_size = self.buffer_size.unwrap_or(5);
let mismatch_chain =
if let Some(ConsistencyBehavior::SubchainOnMismatch(mismatch_chain)) = &self.behavior {
Some(build_chain_from_config("mismatch_chain".to_string(), mismatch_chain).await?)
} else {
None
};
let behavior = match &self.behavior {
Some(ConsistencyBehaviorConfig::Ignore) => ConsistencyBehaviorBuilder::Ignore,
Some(ConsistencyBehaviorConfig::FailOnMismatch) => {
ConsistencyBehaviorBuilder::FailOnMismatch
}
Some(ConsistencyBehaviorConfig::SubchainOnMismatch(mismatch_chain)) => {
ConsistencyBehaviorBuilder::SubchainOnMismatch(
build_chain_from_config("mismatch_chain".to_string(), mismatch_chain).await?,
)
}
None => ConsistencyBehaviorBuilder::Ignore,
};
let tee_chain = build_chain_from_config("tee_chain".to_string(), &self.chain).await?;

Ok(Box::new(TeeBuilder::new(
tee_chain,
mismatch_chain,
buffer_size,
self.behavior.clone().unwrap_or(ConsistencyBehavior::Ignore),
behavior,
self.timeout_micros,
)))
}
Expand All @@ -126,7 +142,7 @@ impl TeeConfig {
#[async_trait]
impl Transform for Tee {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
match self.behavior {
match &mut self.behavior {
ConsistencyBehavior::Ignore => {
let (tee_result, chain_result) = tokio::join!(
self.tx
Expand Down Expand Up @@ -156,7 +172,7 @@ impl Transform for Tee {
}
Ok(chain_response)
}
ConsistencyBehavior::SubchainOnMismatch(_) => {
ConsistencyBehavior::SubchainOnMismatch(mismatch_chain) => {
let failed_message = message_wrapper.clone();
let (tee_result, chain_result) = tokio::join!(
self.tx
Expand All @@ -168,9 +184,7 @@ impl Transform for Tee {
let chain_response = chain_result?;

if !chain_response.eq(&tee_response) {
if let Some(mismatch_chain) = &mut self.mismatch_chain {
mismatch_chain.process_request(failed_message, None).await?;
}
mismatch_chain.process_request(failed_message, None).await?;
}

Ok(chain_response)
Expand All @@ -188,7 +202,7 @@ mod tests {
async fn test_validate_no_subchain() {
{
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::Ignore),
behavior: Some(ConsistencyBehaviorConfig::Ignore),
timeout_micros: None,
chain: vec![TransformsConfig::NullSink],
buffer_size: None,
Expand All @@ -200,7 +214,7 @@ mod tests {

{
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::FailOnMismatch),
behavior: Some(ConsistencyBehaviorConfig::FailOnMismatch),
timeout_micros: None,
chain: vec![TransformsConfig::NullSink],
buffer_size: None,
Expand All @@ -214,7 +228,7 @@ mod tests {
#[tokio::test]
async fn test_validate_invalid_chain() {
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::SubchainOnMismatch(vec![
behavior: Some(ConsistencyBehaviorConfig::SubchainOnMismatch(vec![
TransformsConfig::NullSink,
TransformsConfig::NullSink,
])),
Expand All @@ -232,7 +246,7 @@ mod tests {
#[tokio::test]
async fn test_validate_valid_chain() {
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::SubchainOnMismatch(vec![
behavior: Some(ConsistencyBehaviorConfig::SubchainOnMismatch(vec![
TransformsConfig::NullSink,
])),
timeout_micros: None,
Expand Down

0 comments on commit 51b5f81

Please sign in to comment.