From 6b9d4aca46f464deb514f0c72484c53de6230361 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 3 Dec 2020 16:05:57 +1300 Subject: [PATCH 1/4] wip --- examples/redis-cluster-dr/config.yaml | 48 ++-- src/config/topology.rs | 3 +- src/transforms/chain.rs | 24 +- .../tunable_consistency_scatter.rs | 4 +- src/transforms/mpsc.rs | 207 +++++++++--------- 5 files changed, 142 insertions(+), 144 deletions(-) diff --git a/examples/redis-cluster-dr/config.yaml b/examples/redis-cluster-dr/config.yaml index 12fa72fa1..ade13287b 100644 --- a/examples/redis-cluster-dr/config.yaml +++ b/examples/redis-cluster-dr/config.yaml @@ -1,38 +1,35 @@ # This example will replicate all commands to the DR datacenter on a best effort basis --- sources: - redis_batch: - Mpsc: - topic_name: redis-batch-tee - coalesce_behavior: - COUNT: 1000 - redis_dr: - Mpsc: - topic_name: redis-dr-tee redis_prod: Redis: batch_size_hint: 100 listen_addr: "127.0.0.1:6379" connection_limit: 3000000 chain_config: - redis-batch-chain: - - QueryTypeFilter: - filter: Read - - MPSCForwarder: - topic_name: redis-dr-tee - async_mode: false - redis_dr_chain: - - PoolConnections: - name: "RedisCluster-DR-subchain" - parallelism: 256 - chain: - - RedisCluster: - first_contact_points: [ "redis://127.0.0.1:2120/", "redis://127.0.0.1:2121/", "redis://127.0.0.1:2122/", "redis://127.0.0.1:2123/", "redis://127.0.0.1:2124/", "redis://127.0.0.1:2125/" ] redis_chain: - MPSCTee: - topic_name: redis-batch-tee behavior: IGNORE timeout_micros: 40 + buffer_size: 5000 + chain: + - QueryTypeFilter: + filter: Read + - Coalesce: + max_behavior: + COUNT: 1000 + - MPSCForwarder: + buffer_size: 10000 + async_mode: false + timeout_micros: 100000 + chain: + - PoolConnections: + name: "RedisCluster-DR-subchain" + parallelism: 256 + chain: + - RedisCluster: + first_contact_points: [ "redis://127.0.0.1:2120/", "redis://127.0.0.1:2121/", "redis://127.0.0.1:2122/", "redis://127.0.0.1:2123/", "redis://127.0.0.1:2124/", "redis://127.0.0.1:2125/" ] + - PoolConnections: name: "RedisCluster-Main-subchain" parallelism: 512 @@ -40,9 +37,6 @@ chain_config: - RedisCluster: first_contact_points: [ "redis://127.0.0.1:2220/", "redis://127.0.0.1:2221/", "redis://127.0.0.1:2222/", "redis://127.0.0.1:2223/", "redis://127.0.0.1:2224/", "redis://127.0.0.1:2225/" ] named_topics: - redis-dr-tee : 5000 - redis-batch-tee : 10000 + example: 10 source_to_chain_mapping: - redis_prod: redis_chain - redis_dr: redis_dr_chain - redis_batch: redis-batch-chain \ No newline at end of file + redis_prod: redis_chain \ No newline at end of file diff --git a/src/config/topology.rs b/src/config/topology.rs index 405bba883..f5d346480 100644 --- a/src/config/topology.rs +++ b/src/config/topology.rs @@ -262,9 +262,10 @@ impl Topology { }); let tee_conf = TransformsConfig::MPSCTee(TeeConfig { - topic_name: String::from("test_topic"), behavior: None, timeout_micros: None, + chain: vec![], + buffer_size: None, }); let mut sources: HashMap = HashMap::new(); diff --git a/src/transforms/chain.rs b/src/transforms/chain.rs index dd80c2f6f..e2b8f51c7 100644 --- a/src/transforms/chain.rs +++ b/src/transforms/chain.rs @@ -73,13 +73,25 @@ impl BufferedChain { .await?; Ok(one_rx) } + + pub async fn process_request_no_return( + &mut self, + wrapper: Wrapper<'_>, + _client_details: String, + ) -> Result<()> { + self.send_handle + .send(ChannelMessage::new_with_no_return(wrapper.message)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await?; + Ok(()) + } } impl TransformChain { pub fn build_buffered_chain( self, buffer_size: usize, - timeout_millis: Option, + timeout_micros: Option, ) -> BufferedChain { let (tx, mut rx) = tokio::sync::mpsc::channel::(buffer_size); @@ -102,11 +114,11 @@ impl TransformChain { *count += 1; } let future = async { - match timeout_millis { + match timeout_micros { None => Ok(chain.process_request(Wrapper::new(messages), name).await), - Some(timeout_ms) => { + Some(timeout_us) => { timeout( - Duration::from_millis(timeout_ms), + Duration::from_micros(timeout_us), chain.process_request(Wrapper::new(messages), name), ) .await @@ -134,8 +146,8 @@ impl TransformChain { } } Err(e) => { - info!("Upstream timeout, resetting chain {}", e); - chain = chain.clone(); + trace!("Upstream timeout, {}", e); + // chain = chain.clone(); } } } diff --git a/src/transforms/distributed/tunable_consistency_scatter.rs b/src/transforms/distributed/tunable_consistency_scatter.rs index ce8c43485..5a73a3663 100644 --- a/src/transforms/distributed/tunable_consistency_scatter.rs +++ b/src/transforms/distributed/tunable_consistency_scatter.rs @@ -45,7 +45,7 @@ impl TransformsFromConfig for TunableConsistencyConfig { temp.push( build_chain_from_config(key, &value, topics) .await? - .build_buffered_chain(10, Some(1000)), + .build_buffered_chain(10, Some(1000 * 1000)), ); } @@ -295,7 +295,7 @@ mod scatter_transform_tests { let mut temp: Vec = Vec::with_capacity(route_map.len()); for (_key, value) in route_map.clone() { - temp.push(value.build_buffered_chain(10, Some(1000))); + temp.push(value.build_buffered_chain(10, Some(1000 * 1000))); } temp } diff --git a/src/transforms/mpsc.rs b/src/transforms/mpsc.rs index 606e96301..268c4b71b 100644 --- a/src/transforms/mpsc.rs +++ b/src/transforms/mpsc.rs @@ -4,8 +4,11 @@ use crate::config::topology::{ChannelMessage, TopicHolder}; use crate::error::ChainResponse; use crate::message::{Message, Messages, QueryResponse, Value}; use crate::protocols::RawFrame; -use crate::transforms::{Transform, Transforms, TransformsFromConfig, Wrapper}; -use anyhow::{anyhow, Result}; +use crate::transforms::chain::{BufferedChain, TransformChain}; +use crate::transforms::{ + build_chain_from_config, Transform, Transforms, TransformsConfig, TransformsFromConfig, Wrapper, +}; +use anyhow::{anyhow, Error, Result}; use async_trait::async_trait; use futures::TryFutureExt; use itertools::Itertools; @@ -19,48 +22,61 @@ AsyncMPSC Tees and Forwarders should only be created from the AsyncMpsc struct, It's the thing that owns tx and rx handles :D */ -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Buffer { pub name: &'static str, - pub tx: Sender, + pub tx: BufferedChain, pub async_mode: bool, + pub buffer_size: usize, + pub chain_to_clone: TransformChain, + pub timeout: Option, } #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] pub struct BufferConfig { - pub topic_name: String, pub async_mode: bool, + pub chain: Vec, + pub buffer_size: Option, + pub timeout_micros: Option, +} + +impl Clone for Buffer { + fn clone(&self) -> Self { + let chain = self.chain_to_clone.clone(); + Buffer { + name: self.name.clone(), + tx: chain.build_buffered_chain(self.buffer_size, None), + async_mode: false, + buffer_size: self.buffer_size, + chain_to_clone: self.chain_to_clone.clone(), + timeout: self.timeout.clone(), + } + } } #[async_trait] impl TransformsFromConfig for BufferConfig { async fn get_source(&self, topics: &TopicHolder) -> Result { - if let Some(tx) = topics.get_tx(&self.topic_name) { - return Ok(Transforms::MPSCForwarder(Buffer { - name: "forward", - tx, - async_mode: self.async_mode, - })); - } - Err(anyhow!( - "Could not find the topic {} in [{:#?}]", - self.topic_name.clone(), - &topics.topics_rx.keys() - )) + let chain = build_chain_from_config("forward".to_string(), &self.chain, &topics).await?; + let buffer = self.buffer_size.unwrap_or(5); + return Ok(Transforms::MPSCForwarder(Buffer { + name: "forward", + tx: chain.clone().build_buffered_chain(buffer, None), + async_mode: self.async_mode, + buffer_size: buffer, + chain_to_clone: chain, + timeout: self.timeout_micros, + })); } } #[async_trait] impl Transform for Buffer { async fn transform<'a>(&'a mut self, qd: Wrapper<'a>) -> ChainResponse { - if self.async_mode { + return if self.async_mode { let expected_responses = qd.message.messages.len(); self.tx - .send(ChannelMessage::new_with_no_return(qd.message)) - .map_err(|e| { - warn!("MPSC error {}", e); - e - }) + .process_request_no_return(qd, "Buffer".to_string()) .await?; ChainResponse::Ok(Messages { messages: (0..expected_responses) @@ -69,16 +85,8 @@ impl Transform for Buffer { .collect_vec(), }) } else { - let (tx, rx) = oneshot::channel::(); - self.tx - .send(ChannelMessage::new(qd.message, tx)) - .map_err(|e| { - warn!("MPSC error {}", e); - e - }) - .await?; - return rx.await?; - } + self.tx.process_request(qd, "Buffer".to_string()).await + }; } fn get_name(&self) -> &'static str { @@ -89,8 +97,10 @@ impl Transform for Buffer { #[derive(Debug, Clone)] pub struct Tee { pub name: &'static str, - pub tx: Sender, - pub fail_topic: Option>, + pub tx: BufferedChain, + pub fail_chain: Option, + pub buffer_size: usize, + pub chain_to_clone: TransformChain, pub behavior: ConsistencyBehavior, pub timeout: u64, } @@ -99,81 +109,72 @@ pub struct Tee { pub enum ConsistencyBehavior { IGNORE, FAIL, - LOG { topic: String }, + LOG { fail_chain: Vec }, } #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] pub struct TeeConfig { - pub topic_name: String, pub behavior: Option, pub timeout_micros: Option, + pub chain: Vec, + pub buffer_size: Option, } #[async_trait] impl TransformsFromConfig for TeeConfig { async fn get_source(&self, topics: &TopicHolder) -> Result { - if let Some(tx) = topics.get_tx(&self.topic_name) { - let fail_topic = if let Some(ConsistencyBehavior::LOG { topic }) = &self.behavior { - let topic_chan = topics.get_tx(topic); - if topic_chan.is_none() { - return Err(anyhow!( - "Could not find the fail topic {} in [{:#?}]", - topic, - topics.topics_rx.keys() - )); - } - topic_chan - } else { - None - }; - - return Ok(Transforms::MPSCTee(Tee { - name: "tee", - tx, - fail_topic, - behavior: self.behavior.clone().unwrap_or(ConsistencyBehavior::IGNORE), - timeout: self.timeout_micros.unwrap_or(45), - })); - } - Err(anyhow!( - "Could not find the topic {} in [{:#?}]", - &self.topic_name, - topics.topics_rx.keys() - )) + let buffer_size = self.buffer_size.unwrap_or(5); + let fail_chain = if let Some(ConsistencyBehavior::LOG { fail_chain }) = &self.behavior { + Some( + build_chain_from_config("fail_chain".to_string(), fail_chain, topics) + .await? + .build_buffered_chain(buffer_size, None), + ) + } else { + None + }; + let tee_chain = + build_chain_from_config("tee_chain".to_string(), &self.chain, topics).await?; + + return Ok(Transforms::MPSCTee(Tee { + name: "tee", + tx: tee_chain + .clone() + .build_buffered_chain(buffer_size, self.timeout_micros), + fail_chain, + buffer_size, + chain_to_clone: tee_chain, + behavior: self.behavior.clone().unwrap_or(ConsistencyBehavior::IGNORE), + timeout: self.timeout_micros.unwrap_or(45), + })); } } #[async_trait] impl Transform for Tee { async fn transform<'a>(&'a mut self, qd: Wrapper<'a>) -> ChainResponse { - let m = qd.message.clone(); + // let m = qd.message.clone(); return match self.behavior { ConsistencyBehavior::IGNORE => { - let _ = self - .tx - .send_timeout( - ChannelMessage::new_with_no_return(m), - std::time::Duration::from_micros(self.timeout), - ) - .await - .map_err(|e| { + let (tee_result, chain_result) = tokio::join!( + self.tx + .process_request_no_return(qd.clone(), "tee".to_string()), + qd.call_next_transform() + ); + match tee_result { + Ok(_) => {} + Err(e) => { counter!("tee_dropped_messages", 1, "chain" => self.name); trace!("MPSC error {}", e); - e - }); - qd.call_next_transform().await + } + } + chain_result } ConsistencyBehavior::FAIL => { - let (tx, rx) = oneshot::channel::(); - self.tx - .send(ChannelMessage::new(m, tx)) - .map_err(|e| { - warn!("MPSC error {}", e); - e - }) - .await?; - - let (tee_result, chain_result) = tokio::join!(rx, qd.call_next_transform()); - let tee_response = tee_result??; + let (tee_result, chain_result) = tokio::join!( + self.tx.process_request(qd.clone(), "tee".to_string()), + qd.call_next_transform() + ); + let tee_response = tee_result?; let chain_response = chain_result?; if !chain_response.eq(&tee_response) { @@ -190,29 +191,19 @@ impl Transform for Tee { } } ConsistencyBehavior::LOG { .. } => { - let failed_message = m.clone(); - let (tx, rx) = oneshot::channel::(); - self.tx - .send(ChannelMessage::new(m, tx)) - .map_err(|e| { - // counter!("tee_logged_messages", 1, "chain" => self.name); - warn!("MPSC error {}", e); - e - }) - .await?; - - let (tee_result, chain_result) = tokio::join!(rx, qd.call_next_transform()); - let tee_response = tee_result??; + let failed_message = qd.clone(); + let (tee_result, chain_result) = tokio::join!( + self.tx.process_request(qd.clone(), "tee".to_string()), + qd.call_next_transform() + ); + + let tee_response = tee_result?; let chain_response = chain_result?; - if !chain_response.eq(&tee_response) { - if let Some(topic) = &mut self.fail_topic { + if chain_response.eq(&tee_response) { + if let Some(topic) = &mut self.fail_chain { topic - .send(ChannelMessage::new_with_no_return(failed_message)) - .map_err(|e| { - warn!("MPSC error for logging failed Tee message {}", e); - e - }) + .process_request(failed_message, "tee".to_string()) .await?; } } From bb10e81152e89d9785b39dbda630a9e2fb1a55bb Mon Sep 17 00:00:00 2001 From: Ben Date: Fri, 4 Dec 2020 10:03:03 +1300 Subject: [PATCH 2/4] Fix topology test --- src/config/topology.rs | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/src/config/topology.rs b/src/config/topology.rs index f5d346480..e567ade97 100644 --- a/src/config/topology.rs +++ b/src/config/topology.rs @@ -249,11 +249,6 @@ impl Topology { vec!["pk".to_string(), "clustering".to_string()], ); - let mpsc_config = SourcesConfig::Mpsc(AsyncMpscConfig { - topic_name: String::from("test_topic"), - coalesce_behavior: None, - }); - let cassandra_source = SourcesConfig::Cassandra(CassandraConfig { listen_addr, cassandra_ks, @@ -264,27 +259,21 @@ impl Topology { let tee_conf = TransformsConfig::MPSCTee(TeeConfig { behavior: None, timeout_micros: None, - chain: vec![], + chain: vec![kafka_transform_config_obj], buffer_size: None, }); let mut sources: HashMap = HashMap::new(); sources.insert(String::from("cassandra_prod"), cassandra_source); - sources.insert(String::from("mpsc_chan"), mpsc_config); let mut chain_config: HashMap> = HashMap::new(); chain_config.insert(String::from("main_chain"), vec![tee_conf, codec_config]); - chain_config.insert( - String::from("async_chain"), - vec![kafka_transform_config_obj], - ); let mut named_topics: HashMap = HashMap::new(); named_topics.insert(String::from("test_topic"), 1); let mut source_to_chain_mapping: HashMap = HashMap::new(); source_to_chain_mapping.insert(String::from("cassandra_prod"), String::from("main_chain")); - source_to_chain_mapping.insert(String::from("mpsc_chan"), String::from("async_chain")); Topology { sources, @@ -314,27 +303,23 @@ sources: test.clustering: - pk - clustering - mpsc_chan: - Mpsc: - topic_name: test_topic chain_config: main_chain: - MPSCTee: topic_name: test_topic + chain: + - KafkaDestination: + topic: "test_topic" + config_values: + bootstrap.servers: "127.0.0.1:9092" + message.timeout.ms: "5000" - CodecDestination: bypass_result_processing: false - remote_address: "127.0.0.1:9042" - async_chain: - - KafkaDestination: - topic: "test_topic" - config_values: - bootstrap.servers: "127.0.0.1:9092" - message.timeout.ms: "5000" + remote_address: "127.0.0.1:9042" named_topics: test_topic: 1 source_to_chain_mapping: - cassandra_prod: main_chain - mpsc_chan: async_chain"###; + cassandra_prod: main_chain"###; #[test] fn new_test() -> Result<()> { From 6e25a42165d9f22b10280eeb8c1e24906ec5c04d Mon Sep 17 00:00:00 2001 From: Ben Date: Fri, 4 Dec 2020 13:49:37 +1300 Subject: [PATCH 3/4] Change timeout behavior --- examples/redis-cluster-dr/config.yaml | 14 +-- examples/redis-cluster-dr/docker-compose.yml | 2 +- src/transforms/chain.rs | 104 ++++++++++-------- .../tunable_consistency_scatter.rs | 6 +- src/transforms/load_balance.rs | 4 +- src/transforms/mpsc.rs | 44 +++++--- 6 files changed, 97 insertions(+), 77 deletions(-) diff --git a/examples/redis-cluster-dr/config.yaml b/examples/redis-cluster-dr/config.yaml index ade13287b..59ae449d6 100644 --- a/examples/redis-cluster-dr/config.yaml +++ b/examples/redis-cluster-dr/config.yaml @@ -3,25 +3,25 @@ sources: redis_prod: Redis: - batch_size_hint: 100 + batch_size_hint: 4 listen_addr: "127.0.0.1:6379" connection_limit: 3000000 chain_config: redis_chain: - MPSCTee: behavior: IGNORE - timeout_micros: 40 - buffer_size: 5000 +# timeout_micros: 1000 + buffer_size: 10000 chain: - QueryTypeFilter: filter: Read - Coalesce: max_behavior: - COUNT: 1000 + COUNT: 2000 - MPSCForwarder: - buffer_size: 10000 - async_mode: false - timeout_micros: 100000 + buffer_size: 100 + async_mode: true + timeout_micros: 10000 chain: - PoolConnections: name: "RedisCluster-DR-subchain" diff --git a/examples/redis-cluster-dr/docker-compose.yml b/examples/redis-cluster-dr/docker-compose.yml index 74319f404..027059a93 100644 --- a/examples/redis-cluster-dr/docker-compose.yml +++ b/examples/redis-cluster-dr/docker-compose.yml @@ -147,7 +147,7 @@ services: - redis-node-5-dr environment: - 'ALLOW_EMPTY_PASSWORD=yes' - - 'REDIS_CLUSTER_REPLICAS=0' + - 'REDIS_CLUSTER_REPLICAS=1' - 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr' - 'REDIS_CLUSTER_CREATOR=yes' diff --git a/src/transforms/chain.rs b/src/transforms/chain.rs index e2b8f51c7..65973dc70 100644 --- a/src/transforms/chain.rs +++ b/src/transforms/chain.rs @@ -55,8 +55,9 @@ impl BufferedChain { &mut self, wrapper: Wrapper<'_>, client_details: String, + buffer_timeout_micros: Option, ) -> ChainResponse { - self.process_request_with_receiver(wrapper, client_details) + self.process_request_with_receiver(wrapper, client_details, buffer_timeout_micros) .await? .await? } @@ -65,12 +66,27 @@ impl BufferedChain { &mut self, wrapper: Wrapper<'_>, _client_details: String, + buffer_timeout_micros: Option, ) -> Result> { let (one_tx, one_rx) = tokio::sync::oneshot::channel::(); - self.send_handle - .send(ChannelMessage::new(wrapper.message, one_tx)) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) - .await?; + match buffer_timeout_micros { + None => { + self.send_handle + .send(ChannelMessage::new(wrapper.message, one_tx)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await? + } + Some(timeout) => { + self.send_handle + .send_timeout( + ChannelMessage::new(wrapper.message, one_tx), + Duration::from_micros(timeout), + ) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await? + } + } + Ok(one_rx) } @@ -78,21 +94,31 @@ impl BufferedChain { &mut self, wrapper: Wrapper<'_>, _client_details: String, + buffer_timeout_micros: Option, ) -> Result<()> { - self.send_handle - .send(ChannelMessage::new_with_no_return(wrapper.message)) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) - .await?; + match buffer_timeout_micros { + None => { + self.send_handle + .send(ChannelMessage::new_with_no_return(wrapper.message)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await? + } + Some(timeout) => { + self.send_handle + .send_timeout( + ChannelMessage::new_with_no_return(wrapper.message), + Duration::from_micros(timeout), + ) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .await? + } + } Ok(()) } } impl TransformChain { - pub fn build_buffered_chain( - self, - buffer_size: usize, - timeout_micros: Option, - ) -> BufferedChain { + pub fn build_buffered_chain(self, buffer_size: usize) -> BufferedChain { let (tx, mut rx) = tokio::sync::mpsc::channel::(buffer_size); // If this is not a test, this should get removed by the compiler @@ -113,44 +139,26 @@ impl TransformChain { let mut count = count.lock().await; *count += 1; } - let future = async { - match timeout_micros { - None => Ok(chain.process_request(Wrapper::new(messages), name).await), - Some(timeout_us) => { - timeout( - Duration::from_micros(timeout_us), - chain.process_request(Wrapper::new(messages), name), - ) - .await - } - } + + let chain_response = chain.process_request(Wrapper::new(messages), name).await; + + if let Err(e) = &chain_response { + warn!("Internal error in buffered chain: {:?} - resetting", e); + chain = chain.clone(); }; - match future.fuse().await { - Ok(chain_response) => { - if let Err(e) = &chain_response { - warn!("Internal error in buffered chain: {:?} - resetting", e); - chain = chain.clone(); - }; - match return_chan { - None => trace!("Ignoring response due to lack of return chan"), - Some(tx) => { - match tx.send(chain_response) { - Ok(_) => {} - Err(e) => trace!( - "Dropping response message {:?} as not needed by TunableConsistency", - e - ) - } - }, - } - } - Err(e) => { - trace!("Upstream timeout, {}", e); - // chain = chain.clone(); - } + match return_chan { + None => trace!("Ignoring response due to lack of return chan"), + Some(tx) => match tx.send(chain_response) { + Ok(_) => {} + Err(e) => trace!( + "Dropping response message {:?} as not needed by TunableConsistency", + e + ), + }, } } + trace!("buffered chain processing thread exiting, stopping chain loop and dropping"); }); diff --git a/src/transforms/distributed/tunable_consistency_scatter.rs b/src/transforms/distributed/tunable_consistency_scatter.rs index 5a73a3663..fe9307a2c 100644 --- a/src/transforms/distributed/tunable_consistency_scatter.rs +++ b/src/transforms/distributed/tunable_consistency_scatter.rs @@ -45,7 +45,7 @@ impl TransformsFromConfig for TunableConsistencyConfig { temp.push( build_chain_from_config(key, &value, topics) .await? - .build_buffered_chain(10, Some(1000 * 1000)), + .build_buffered_chain(10), ); } @@ -159,7 +159,7 @@ impl Transform for TunableConsistency { //TODO: FuturesUnordered does bias to polling the first submitted task - this will bias all requests for chain in self.route_map.iter_mut() { - rec_fu.push(chain.process_request(qd.clone(), "TunableConsistency".to_string())); + rec_fu.push(chain.process_request(qd.clone(), "TunableConsistency".to_string(), None)); } let mut results: Vec = Vec::new(); @@ -295,7 +295,7 @@ mod scatter_transform_tests { let mut temp: Vec = Vec::with_capacity(route_map.len()); for (_key, value) in route_map.clone() { - temp.push(value.build_buffered_chain(10, Some(1000 * 1000))); + temp.push(value.build_buffered_chain(10)); } temp } diff --git a/src/transforms/load_balance.rs b/src/transforms/load_balance.rs index d63e4e42a..7ac87111a 100644 --- a/src/transforms/load_balance.rs +++ b/src/transforms/load_balance.rs @@ -59,7 +59,7 @@ impl Transform for ConnectionBalanceAndPool { if self.active_connection.is_none() { let mut guard = self.other_connections.lock().await; if guard.len() < self.parallelism { - let chain = self.chain_to_clone.clone().build_buffered_chain(5, None); + let chain = self.chain_to_clone.clone().build_buffered_chain(5); self.active_connection.replace(chain.clone()); guard.push(chain); } else { @@ -72,7 +72,7 @@ impl Transform for ConnectionBalanceAndPool { } if let Some(chain) = &mut self.active_connection { return chain - .process_request(qd, "Connection Balance and Pooler".to_string()) + .process_request(qd, "Connection Balance and Pooler".to_string(), None) .await; } unreachable!() diff --git a/src/transforms/mpsc.rs b/src/transforms/mpsc.rs index 268c4b71b..cd422251e 100644 --- a/src/transforms/mpsc.rs +++ b/src/transforms/mpsc.rs @@ -45,7 +45,7 @@ impl Clone for Buffer { let chain = self.chain_to_clone.clone(); Buffer { name: self.name.clone(), - tx: chain.build_buffered_chain(self.buffer_size, None), + tx: chain.build_buffered_chain(self.buffer_size), async_mode: false, buffer_size: self.buffer_size, chain_to_clone: self.chain_to_clone.clone(), @@ -61,7 +61,7 @@ impl TransformsFromConfig for BufferConfig { let buffer = self.buffer_size.unwrap_or(5); return Ok(Transforms::MPSCForwarder(Buffer { name: "forward", - tx: chain.clone().build_buffered_chain(buffer, None), + tx: chain.clone().build_buffered_chain(buffer), async_mode: self.async_mode, buffer_size: buffer, chain_to_clone: chain, @@ -75,9 +75,19 @@ impl Transform for Buffer { async fn transform<'a>(&'a mut self, qd: Wrapper<'a>) -> ChainResponse { return if self.async_mode { let expected_responses = qd.message.messages.len(); - self.tx - .process_request_no_return(qd, "Buffer".to_string()) - .await?; + let buffer_result = self + .tx + .process_request_no_return(qd, "Buffer".to_string(), self.timeout) + .await; + + match buffer_result { + Ok(_) => {} + Err(e) => { + counter!("tee_dropped_messages", 1, "chain" => self.name); + trace!("MPSC error {}", e); + } + } + ChainResponse::Ok(Messages { messages: (0..expected_responses) .into_iter() @@ -85,7 +95,9 @@ impl Transform for Buffer { .collect_vec(), }) } else { - self.tx.process_request(qd, "Buffer".to_string()).await + self.tx + .process_request(qd, "Buffer".to_string(), self.timeout) + .await }; } @@ -102,7 +114,7 @@ pub struct Tee { pub buffer_size: usize, pub chain_to_clone: TransformChain, pub behavior: ConsistencyBehavior, - pub timeout: u64, + pub timeout: Option, } #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] @@ -127,7 +139,7 @@ impl TransformsFromConfig for TeeConfig { Some( build_chain_from_config("fail_chain".to_string(), fail_chain, topics) .await? - .build_buffered_chain(buffer_size, None), + .build_buffered_chain(buffer_size), ) } else { None @@ -137,14 +149,12 @@ impl TransformsFromConfig for TeeConfig { return Ok(Transforms::MPSCTee(Tee { name: "tee", - tx: tee_chain - .clone() - .build_buffered_chain(buffer_size, self.timeout_micros), + tx: tee_chain.clone().build_buffered_chain(buffer_size), fail_chain, buffer_size, chain_to_clone: tee_chain, behavior: self.behavior.clone().unwrap_or(ConsistencyBehavior::IGNORE), - timeout: self.timeout_micros.unwrap_or(45), + timeout: self.timeout_micros, })); } } @@ -157,7 +167,7 @@ impl Transform for Tee { ConsistencyBehavior::IGNORE => { let (tee_result, chain_result) = tokio::join!( self.tx - .process_request_no_return(qd.clone(), "tee".to_string()), + .process_request_no_return(qd.clone(), "tee".to_string(), self.timeout), qd.call_next_transform() ); match tee_result { @@ -171,7 +181,8 @@ impl Transform for Tee { } ConsistencyBehavior::FAIL => { let (tee_result, chain_result) = tokio::join!( - self.tx.process_request(qd.clone(), "tee".to_string()), + self.tx + .process_request(qd.clone(), "tee".to_string(), self.timeout), qd.call_next_transform() ); let tee_response = tee_result?; @@ -193,7 +204,8 @@ impl Transform for Tee { ConsistencyBehavior::LOG { .. } => { let failed_message = qd.clone(); let (tee_result, chain_result) = tokio::join!( - self.tx.process_request(qd.clone(), "tee".to_string()), + self.tx + .process_request(qd.clone(), "tee".to_string(), self.timeout), qd.call_next_transform() ); @@ -203,7 +215,7 @@ impl Transform for Tee { if chain_response.eq(&tee_response) { if let Some(topic) = &mut self.fail_chain { topic - .process_request(failed_message, "tee".to_string()) + .process_request(failed_message, "tee".to_string(), None) .await?; } } From 241169a0b7bcb9ade96c68aa680474b53fe03d05 Mon Sep 17 00:00:00 2001 From: Ben Date: Fri, 4 Dec 2020 14:04:19 +1300 Subject: [PATCH 4/4] Version upversion --- Cargo.toml | 2 +- src/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ff0661d0..a50658e84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shotover-proxy" -version = "0.0.15" +version = "0.0.16" authors = ["Ben "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index d4c514c84..5d7f1c8f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use std::net::SocketAddr; use tokio::runtime; #[derive(Clap)] -#[clap(version = "0.0.15", author = "Instaclustr")] +#[clap(version = "0.0.16", author = "Instaclustr")] struct ConfigOpts { #[clap(short, long, default_value = "config/topology.yaml")] pub topology_file: String,