Skip to content

Commit

Permalink
Merge pull request #75 from shotover/filter-coalesce
Browse files Browse the repository at this point in the history
Filter coalesce
  • Loading branch information
benbromhead authored Dec 1, 2020
2 parents 5a1b7e2 + 36139e5 commit d62084e
Show file tree
Hide file tree
Showing 11 changed files with 544 additions and 32 deletions.
31 changes: 22 additions & 9 deletions examples/redis-cluster-dr/config.yaml
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
# This example will replicate all commands to the DR datacenter on a best effort basis
---
sources:
redis_batch:
Mpsc:
topic_name: redis-batch-tee
coalesce_behavior:
COUNT: 1000
redis_dr:
Mpsc:
topic_name: redis-dr-tee
redis_prod:
Redis:
batch_size_hint: 1000
listen_addr: "0.0.0.0:6378"
batch_size_hint: 100
listen_addr: "127.0.0.1:6379"
connection_limit: 3000000
chain_config:
redis-batch-chain:
- QueryTypeFilter:
filter: Read
- MPSCForwarder:
topic_name: redis-dr-tee
async_mode: false
redis_dr_chain:
- PoolConnections:
name: "RedisCluster-DR-subchain"
parallelism: 512
parallelism: 256
chain:
- RedisCluster:
first_contact_points: [ "redis://107.20.71.3/", "redis://34.207.20.215/", "redis://34.225.18.239/" ]
first_contact_points: [ "redis://127.0.0.1:2120/", "redis://127.0.0.1:2121/", "redis://127.0.0.1:2122/", "redis://127.0.0.1:2123/", "redis://127.0.0.1:2124/", "redis://127.0.0.1:2125/" ]
redis_chain:
- MPSCTee:
topic_name: redis-dr-tee
topic_name: redis-batch-tee
behavior: IGNORE
timeout_micros: 40
- PoolConnections:
name: "RedisCluster-Main-subchain"
parallelism: 512
chain:
- RedisCluster:
first_contact_points: [ "redis://3.209.120.59/", "redis://107.22.193.71/", "redis://52.71.156.167/" ]
first_contact_points: [ "redis://127.0.0.1:2220/", "redis://127.0.0.1:2221/", "redis://127.0.0.1:2222/", "redis://127.0.0.1:2223/", "redis://127.0.0.1:2224/", "redis://127.0.0.1:2225/" ]
named_topics:
testtopic: 5
redis-dr-tee: 5
redis-dr-tee : 5000
redis-batch-tee : 10000
source_to_chain_mapping:
redis_prod: redis_chain
redis_dr: redis_dr_chain
redis_dr: redis_dr_chain
redis_batch: redis-batch-chain
55 changes: 47 additions & 8 deletions examples/redis-cluster-dr/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,41 +81,74 @@ services:
ports:
- "2120:6379"
volumes:
- redis-cluster_data-0:/bitnami/redis/data
- redis-cluster_data-0-dr:/bitnami/redis/data
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'

redis-node-1-dr:
image: docker.io/bitnami/redis-cluster:6.0-debian-10
ports:
- "2121:6379"
volumes:
- redis-cluster_data-1:/bitnami/redis/data
- redis-cluster_data-1-dr:/bitnami/redis/data
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=rredis-node-0-dr redis-node-1-dr redis-node-2-dr'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'

redis-node-2-dr:
image: docker.io/bitnami/redis-cluster:6.0-debian-10
ports:
- "2122:6379"
volumes:
- redis-cluster_data-2:/bitnami/redis/data
- redis-cluster_data-2-dr:/bitnami/redis/data
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'

redis-node-3-dr:
image: docker.io/bitnami/redis-cluster:6.0-debian-10
ports:
- "2123:6379"
volumes:
- redis-cluster_data-3-dr:/bitnami/redis/data
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'

redis-node-4-dr:
image: docker.io/bitnami/redis-cluster:6.0-debian-10
ports:
- "2124:6379"
volumes:
- redis-cluster_data-4-dr:/bitnami/redis/data
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'

redis-node-5-dr:
image: docker.io/bitnami/redis-cluster:6.0-debian-10
ports:
- "2125:6379"
volumes:
- redis-cluster_data-5-dr:/bitnami/redis/data
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'

redis-cluster-init-dr:
image: docker.io/bitnami/redis-cluster:6.0-debian-10
depends_on:
- redis-node-0-dr
- redis-node-1-dr
- redis-node-2-dr
- redis-node-3-dr
- redis-node-4-dr
- redis-node-5-dr
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_CLUSTER_REPLICAS=0'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr'
- 'REDIS_NODES=redis-node-0-dr redis-node-1-dr redis-node-2-dr redis-node-3-dr redis-node-4-dr redis-node-5-dr'
- 'REDIS_CLUSTER_CREATOR=yes'

volumes:
Expand All @@ -131,9 +164,15 @@ volumes:
driver: local
redis-cluster_data-5:
driver: local
redis-cluster_data-0-dr:
driver: local
redis-cluster_data-1-dr:
driver: local
redis-cluster_data-2-dr:
driver: local
redis-cluster_data-3-dr:
driver: local
driver: local
redis-cluster_data-4-dr:
driver: local
redis-cluster_data-5-dr:
driver: local
15 changes: 11 additions & 4 deletions src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ impl TopicHolder {

impl Topology {
pub fn new_from_yaml(yaml_contents: String) -> Topology {
let config : TopologyConfig = serde_yaml::from_str(&yaml_contents).map_err(|e| anyhow!(e)).unwrap();
let config: TopologyConfig = serde_yaml::from_str(&yaml_contents)
.map_err(|e| anyhow!(e))
.unwrap();
Topology::topology_from_config(config)
}

Expand Down Expand Up @@ -206,13 +208,16 @@ impl Topology {
default_topic
});

let built_topics = topics.iter().map(|(k, v)| (k.to_owned(), v.unwrap_or(5))).collect();
let built_topics = topics
.iter()
.map(|(k, v)| (k.to_owned(), v.unwrap_or(5)))
.collect();
return Topology {
sources: config.sources,
chain_config: config.chain_config,
named_topics: built_topics,
source_to_chain_mapping: config.source_to_chain_mapping
}
source_to_chain_mapping: config.source_to_chain_mapping,
};
}

pub fn get_demo_config() -> Topology {
Expand Down Expand Up @@ -246,6 +251,7 @@ impl Topology {

let mpsc_config = SourcesConfig::Mpsc(AsyncMpscConfig {
topic_name: String::from("test_topic"),
coalesce_behavior: None,
});

let cassandra_source = SourcesConfig::Cassandra(CassandraConfig {
Expand All @@ -258,6 +264,7 @@ impl Topology {
let tee_conf = TransformsConfig::MPSCTee(TeeConfig {
topic_name: String::from("test_topic"),
behavior: None,
timeout_micros: None,
});

let mut sources: HashMap<String, SourcesConfig> = HashMap::new();
Expand Down
4 changes: 4 additions & 0 deletions src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl Messages {
Messages { messages: vec![] }
}

pub fn new_with_size_hint(capacity: usize) -> Self {
Messages { messages: Vec::with_capacity(capacity) }
}

pub fn new_from_message(message: Message) -> Self {
Messages {
messages: vec![message],
Expand Down
8 changes: 3 additions & 5 deletions src/protocols/redis_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,17 @@ impl RedisCodec {
get_keys(values, keys, commands)?;
} // get the length of the value stored in a key
"MSET" => {
query_type = QueryType::Read;
get_key_values(values, keys, commands)?;
} // set multiple keys to multiple values
"MSETNX" => {
query_type = QueryType::Read;
get_key_values(values, keys, commands)?;
} // set multiple keys to multiple values, only if none of the keys exist
"GET" => {
query_type = QueryType::Read;
get_keys(values, keys, commands)?;
} // get value in key
"GETRANGE" => {
query_type = QueryType::Read;
get_key_values(values, keys, commands)?;
} // get a substring value of a key and return its old value
"MGET" => {
Expand Down Expand Up @@ -256,7 +256,6 @@ impl RedisCodec {
get_keys(values, keys, commands)?;
} // return the current length of the list
"LPOP" => {
query_type = QueryType::ReadWrite;
get_keys(values, keys, commands)?;
} // remove the first element from the list and returns it
"LSET" => {
Expand All @@ -266,7 +265,6 @@ impl RedisCodec {
get_key_multi_values(values, keys, commands)?;
} // trim a list to the specified range
"RPOP" => {
query_type = QueryType::ReadWrite;
get_keys(values, keys, commands)?;
} // remove the last element from the list and returns it
"SADD" => {
Expand Down Expand Up @@ -300,7 +298,7 @@ impl RedisCodec {
get_key_values(values, keys, commands)?;
} // move a member from one set to another
"SPOP" => {
query_type = QueryType::ReadWrite;
query_type = QueryType::Write;
get_key_values(values, keys, commands)?;
} // remove and return one or multiple random members from a set
"ZADD" => {
Expand Down
47 changes: 42 additions & 5 deletions src/sources/mpsc_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use crate::transforms::chain::TransformChain;
use tokio::sync::mpsc::Receiver;

use crate::config::topology::{ChannelMessage, TopicHolder};
use crate::message::Message;
use crate::server::Shutdown;
use crate::sources::{Sources, SourcesFromConfig};
use crate::transforms::coalesce::CoalesceBehavior;
use crate::transforms::Wrapper;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
Expand All @@ -17,6 +20,7 @@ use tracing::warn;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct AsyncMpscConfig {
pub topic_name: String,
pub coalesce_behavior: Option<CoalesceBehavior>,
}

#[async_trait]
Expand All @@ -29,12 +33,17 @@ impl SourcesFromConfig for AsyncMpscConfig {
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>> {
if let Some(rx) = topics.get_rx(&self.topic_name) {
let behavior = self
.coalesce_behavior
.clone()
.unwrap_or(CoalesceBehavior::COUNT(10000));
return Ok(vec![Sources::Mpsc(AsyncMpsc::new(
chain.clone(),
rx,
&self.topic_name,
Shutdown::new(notify_shutdown.subscribe()),
shutdown_complete_tx,
behavior.clone(),
))]);
}
Err(anyhow!(
Expand All @@ -58,12 +67,17 @@ impl AsyncMpsc {
name: &str,
mut shutdown: Shutdown,
shutdown_complete: mpsc::Sender<()>,
max_behavior: CoalesceBehavior,
) -> AsyncMpsc {
info!("Starting MPSC source for the topic [{}] ", name);
let mut main_chain = chain.clone();
let max_behavior = max_behavior.clone();
let mut buffer: Vec<Message> = Vec::new();

let jh = Handle::current().spawn(async move {
// This will go out of scope once we exit the loop below, indicating we are done and shutdown
let _notifier = shutdown_complete.clone();
let mut last_write: Instant = Instant::now();
while !shutdown.is_shutdown() {
let channel_message = tokio::select! {
res = rx.recv() => {
Expand All @@ -78,19 +92,42 @@ impl AsyncMpsc {
};

let ChannelMessage {
messages,
mut messages,
return_chan,
} = channel_message;

let w: Wrapper = Wrapper::new(messages);
match return_chan {
None => {
if let Err(e) = main_chain.process_request(w, "AsyncMpsc".to_string()).await
{
warn!("Something went wrong {}", e);
buffer.append(&mut messages.messages);
if match max_behavior {
CoalesceBehavior::COUNT(c) => buffer.len() >= c,
CoalesceBehavior::WAIT_MS(w) => last_write.elapsed().as_millis() >= w,
CoalesceBehavior::COUNT_OR_WAIT(c, w) => {
last_write.elapsed().as_millis() >= w || buffer.len() >= c
}
} {
//this could be done in the if statement above, but for the moment lets keep the
//evaluation logic separate from the update
match max_behavior {
CoalesceBehavior::WAIT_MS(_)
| CoalesceBehavior::COUNT_OR_WAIT(_, _) => {
last_write = Instant::now()
}
_ => {}
}
std::mem::swap(&mut buffer, &mut messages.messages);
let w: Wrapper = Wrapper::new(messages);
info!("Flushing {} commands", w.message.messages.len());

if let Err(e) =
main_chain.process_request(w, "AsyncMpsc".to_string()).await
{
warn!("Something went wrong {}", e);
}
}
}
Some(tx) => {
let w: Wrapper = Wrapper::new(messages);
if let Err(e) =
tx.send(main_chain.process_request(w, "AsyncMpsc".to_string()).await)
{
Expand Down
Loading

0 comments on commit d62084e

Please sign in to comment.