Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mpsc chain change #77

Merged
merged 4 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shotover-proxy"
version = "0.0.15"
version = "0.0.16"
authors = ["Ben <ben@instaclustr.com>"]
edition = "2018"

Expand Down
52 changes: 23 additions & 29 deletions examples/redis-cluster-dr/config.yaml
Original file line number Diff line number Diff line change
@@ -1,48 +1,42 @@
# This example will replicate all commands to the DR datacenter on a best effort basis
---
sources:
redis_batch:
Mpsc:
topic_name: redis-batch-tee
coalesce_behavior:
COUNT: 1000
redis_dr:
Mpsc:
topic_name: redis-dr-tee
redis_prod:
Redis:
batch_size_hint: 100
batch_size_hint: 4
listen_addr: "127.0.0.1:6379"
connection_limit: 3000000
chain_config:
redis-batch-chain:
- QueryTypeFilter:
filter: Read
- MPSCForwarder:
topic_name: redis-dr-tee
async_mode: false
redis_dr_chain:
- PoolConnections:
name: "RedisCluster-DR-subchain"
parallelism: 256
chain:
- RedisCluster:
first_contact_points: [ "redis://127.0.0.1:2120/", "redis://127.0.0.1:2121/", "redis://127.0.0.1:2122/", "redis://127.0.0.1:2123/", "redis://127.0.0.1:2124/", "redis://127.0.0.1:2125/" ]
redis_chain:
- MPSCTee:
topic_name: redis-batch-tee
behavior: IGNORE
timeout_micros: 40
# timeout_micros: 1000
buffer_size: 10000
chain:
- QueryTypeFilter:
filter: Read
- Coalesce:
max_behavior:
COUNT: 2000
- MPSCForwarder:
buffer_size: 100
async_mode: true
timeout_micros: 10000
chain:
- PoolConnections:
name: "RedisCluster-DR-subchain"
parallelism: 256
chain:
- RedisCluster:
first_contact_points: [ "redis://127.0.0.1:2120/", "redis://127.0.0.1:2121/", "redis://127.0.0.1:2122/", "redis://127.0.0.1:2123/", "redis://127.0.0.1:2124/", "redis://127.0.0.1:2125/" ]

- PoolConnections:
name: "RedisCluster-Main-subchain"
parallelism: 512
chain:
- RedisCluster:
first_contact_points: [ "redis://127.0.0.1:2220/", "redis://127.0.0.1:2221/", "redis://127.0.0.1:2222/", "redis://127.0.0.1:2223/", "redis://127.0.0.1:2224/", "redis://127.0.0.1:2225/" ]
named_topics:
redis-dr-tee : 5000
redis-batch-tee : 10000
example: 10
source_to_chain_mapping:
redis_prod: redis_chain
redis_dr: redis_dr_chain
redis_batch: redis-batch-chain
redis_prod: redis_chain
2 changes: 1 addition & 1 deletion examples/redis-cluster-dr/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ services:
- redis-node-5-dr
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_CLUSTER_REPLICAS=0'
- 'REDIS_CLUSTER_REPLICAS=1'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'
- 'REDIS_CLUSTER_CREATOR=yes'

Expand Down
34 changes: 10 additions & 24 deletions src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ impl Topology {
vec!["pk".to_string(), "clustering".to_string()],
);

let mpsc_config = SourcesConfig::Mpsc(AsyncMpscConfig {
topic_name: String::from("test_topic"),
coalesce_behavior: None,
});

let cassandra_source = SourcesConfig::Cassandra(CassandraConfig {
listen_addr,
cassandra_ks,
Expand All @@ -262,28 +257,23 @@ impl Topology {
});

let tee_conf = TransformsConfig::MPSCTee(TeeConfig {
topic_name: String::from("test_topic"),
behavior: None,
timeout_micros: None,
chain: vec![kafka_transform_config_obj],
buffer_size: None,
});

let mut sources: HashMap<String, SourcesConfig> = HashMap::new();
sources.insert(String::from("cassandra_prod"), cassandra_source);
sources.insert(String::from("mpsc_chan"), mpsc_config);

let mut chain_config: HashMap<String, Vec<TransformsConfig>> = HashMap::new();
chain_config.insert(String::from("main_chain"), vec![tee_conf, codec_config]);
chain_config.insert(
String::from("async_chain"),
vec![kafka_transform_config_obj],
);

let mut named_topics: HashMap<String, usize> = HashMap::new();
named_topics.insert(String::from("test_topic"), 1);

let mut source_to_chain_mapping: HashMap<String, String> = HashMap::new();
source_to_chain_mapping.insert(String::from("cassandra_prod"), String::from("main_chain"));
source_to_chain_mapping.insert(String::from("mpsc_chan"), String::from("async_chain"));

Topology {
sources,
Expand Down Expand Up @@ -313,27 +303,23 @@ sources:
test.clustering:
- pk
- clustering
mpsc_chan:
Mpsc:
topic_name: test_topic
chain_config:
main_chain:
- MPSCTee:
topic_name: test_topic
chain:
- KafkaDestination:
topic: "test_topic"
config_values:
bootstrap.servers: "127.0.0.1:9092"
message.timeout.ms: "5000"
- CodecDestination:
bypass_result_processing: false
remote_address: "127.0.0.1:9042"
async_chain:
- KafkaDestination:
topic: "test_topic"
config_values:
bootstrap.servers: "127.0.0.1:9092"
message.timeout.ms: "5000"
remote_address: "127.0.0.1:9042"
named_topics:
test_topic: 1
source_to_chain_mapping:
cassandra_prod: main_chain
mpsc_chan: async_chain"###;
cassandra_prod: main_chain"###;

#[test]
fn new_test() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::net::SocketAddr;
use tokio::runtime;

#[derive(Clap)]
#[clap(version = "0.0.15", author = "Instaclustr")]
#[clap(version = "0.0.16", author = "Instaclustr")]
struct ConfigOpts {
#[clap(short, long, default_value = "config/topology.yaml")]
pub topology_file: String,
Expand Down
108 changes: 64 additions & 44 deletions src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ impl BufferedChain {
&mut self,
wrapper: Wrapper<'_>,
client_details: String,
buffer_timeout_micros: Option<u64>,
) -> ChainResponse {
self.process_request_with_receiver(wrapper, client_details)
self.process_request_with_receiver(wrapper, client_details, buffer_timeout_micros)
.await?
.await?
}
Expand All @@ -65,22 +66,59 @@ impl BufferedChain {
&mut self,
wrapper: Wrapper<'_>,
_client_details: String,
buffer_timeout_micros: Option<u64>,
) -> Result<OneReceiver<ChainResponse>> {
let (one_tx, one_rx) = tokio::sync::oneshot::channel::<ChainResponse>();
self.send_handle
.send(ChannelMessage::new(wrapper.message, one_tx))
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?;
match buffer_timeout_micros {
None => {
self.send_handle
.send(ChannelMessage::new(wrapper.message, one_tx))
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
Some(timeout) => {
self.send_handle
.send_timeout(
ChannelMessage::new(wrapper.message, one_tx),
Duration::from_micros(timeout),
)
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
}

Ok(one_rx)
}

pub async fn process_request_no_return(
&mut self,
wrapper: Wrapper<'_>,
_client_details: String,
buffer_timeout_micros: Option<u64>,
) -> Result<()> {
match buffer_timeout_micros {
None => {
self.send_handle
.send(ChannelMessage::new_with_no_return(wrapper.message))
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
Some(timeout) => {
self.send_handle
.send_timeout(
ChannelMessage::new_with_no_return(wrapper.message),
Duration::from_micros(timeout),
)
.map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e))
.await?
}
}
Ok(())
}
}

impl TransformChain {
pub fn build_buffered_chain(
self,
buffer_size: usize,
timeout_millis: Option<u64>,
) -> BufferedChain {
pub fn build_buffered_chain(self, buffer_size: usize) -> BufferedChain {
let (tx, mut rx) = tokio::sync::mpsc::channel::<ChannelMessage>(buffer_size);

// If this is not a test, this should get removed by the compiler
Expand All @@ -101,44 +139,26 @@ impl TransformChain {
let mut count = count.lock().await;
*count += 1;
}
let future = async {
match timeout_millis {
None => Ok(chain.process_request(Wrapper::new(messages), name).await),
Some(timeout_ms) => {
timeout(
Duration::from_millis(timeout_ms),
chain.process_request(Wrapper::new(messages), name),
)
.await
}
}

let chain_response = chain.process_request(Wrapper::new(messages), name).await;

if let Err(e) = &chain_response {
warn!("Internal error in buffered chain: {:?} - resetting", e);
chain = chain.clone();
};

match future.fuse().await {
Ok(chain_response) => {
if let Err(e) = &chain_response {
warn!("Internal error in buffered chain: {:?} - resetting", e);
chain = chain.clone();
};
match return_chan {
None => trace!("Ignoring response due to lack of return chan"),
Some(tx) => {
match tx.send(chain_response) {
Ok(_) => {}
Err(e) => trace!(
"Dropping response message {:?} as not needed by TunableConsistency",
e
)
}
},
}
}
Err(e) => {
info!("Upstream timeout, resetting chain {}", e);
chain = chain.clone();
}
match return_chan {
None => trace!("Ignoring response due to lack of return chan"),
Some(tx) => match tx.send(chain_response) {
Ok(_) => {}
Err(e) => trace!(
"Dropping response message {:?} as not needed by TunableConsistency",
e
),
},
}
}

trace!("buffered chain processing thread exiting, stopping chain loop and dropping");
});

Expand Down
6 changes: 3 additions & 3 deletions src/transforms/distributed/tunable_consistency_scatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl TransformsFromConfig for TunableConsistencyConfig {
temp.push(
build_chain_from_config(key, &value, topics)
.await?
.build_buffered_chain(10, Some(1000)),
.build_buffered_chain(10),
);
}

Expand Down Expand Up @@ -159,7 +159,7 @@ impl Transform for TunableConsistency {

//TODO: FuturesUnordered does bias to polling the first submitted task - this will bias all requests
for chain in self.route_map.iter_mut() {
rec_fu.push(chain.process_request(qd.clone(), "TunableConsistency".to_string()));
rec_fu.push(chain.process_request(qd.clone(), "TunableConsistency".to_string(), None));
}

let mut results: Vec<Messages> = Vec::new();
Expand Down Expand Up @@ -295,7 +295,7 @@ mod scatter_transform_tests {
let mut temp: Vec<BufferedChain> = Vec::with_capacity(route_map.len());

for (_key, value) in route_map.clone() {
temp.push(value.build_buffered_chain(10, Some(1000)));
temp.push(value.build_buffered_chain(10));
}
temp
}
Expand Down
4 changes: 2 additions & 2 deletions src/transforms/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Transform for ConnectionBalanceAndPool {
if self.active_connection.is_none() {
let mut guard = self.other_connections.lock().await;
if guard.len() < self.parallelism {
let chain = self.chain_to_clone.clone().build_buffered_chain(5, None);
let chain = self.chain_to_clone.clone().build_buffered_chain(5);
self.active_connection.replace(chain.clone());
guard.push(chain);
} else {
Expand All @@ -72,7 +72,7 @@ impl Transform for ConnectionBalanceAndPool {
}
if let Some(chain) = &mut self.active_connection {
return chain
.process_request(qd, "Connection Balance and Pooler".to_string())
.process_request(qd, "Connection Balance and Pooler".to_string(), None)
.await;
}
unreachable!()
Expand Down
Loading