Skip to content

Commit

Permalink
keyspace
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Nov 3, 2022
1 parent dc8cbd1 commit 3797eac
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 25 deletions.
18 changes: 16 additions & 2 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use futures::StreamExt;
use itertools::Itertools;
use metrics::{register_counter, Counter};
use node::{CassandraNode, ConnectionFactory};
use node_pool::{GetReplicaErr, NodePool};
use node_pool::{GetReplicaErr, KeyspaceMetadata, NodePool};
use rand::prelude::*;
use serde::Deserialize;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, watch};
Expand All @@ -42,6 +43,9 @@ mod test_router;
mod token_map;
pub mod topology;

pub type KeyspaceChanTx = watch::Sender<HashMap<String, KeyspaceMetadata>>;
pub type KeyspaceChanRx = watch::Receiver<HashMap<String, KeyspaceMetadata>>;

#[derive(Deserialize, Debug, Clone)]
pub struct CassandraSinkClusterConfig {
/// contact points must be within the specified data_center and rack.
Expand Down Expand Up @@ -118,6 +122,7 @@ pub struct CassandraSinkCluster {
/// Addditionally any changes to nodes_rx is observed and copied over.
pool: NodePool,
nodes_rx: watch::Receiver<Vec<CassandraNode>>,
keyspaces_rx: KeyspaceChanRx,
rng: SmallRng,
task_handshake_tx: mpsc::Sender<TaskConnectionInfo>,
}
Expand All @@ -143,6 +148,7 @@ impl Clone for CassandraSinkCluster {
// Because the self.nodes_rx is always copied from the original nodes_rx created before any node lists were sent,
// once a single node list has been sent all new connections will immediately recognize it as a change.
nodes_rx: self.nodes_rx.clone(),
keyspaces_rx: self.keyspaces_rx.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx: self.task_handshake_tx.clone(),
}
Expand All @@ -163,10 +169,14 @@ impl CassandraSinkCluster {
let receive_timeout = timeout.map(Duration::from_secs);

let (local_nodes_tx, local_nodes_rx) = watch::channel(vec![]);
let (keyspaces_tx, keyspaces_rx): (KeyspaceChanTx, KeyspaceChanRx) =
watch::channel(HashMap::new());

let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1);

create_topology_task(
local_nodes_tx,
keyspaces_tx,
task_handshake_rx,
local_shotover_node.data_center.clone(),
);
Expand All @@ -192,6 +202,7 @@ impl CassandraSinkCluster {
local_shotover_node,
pool: NodePool::new(vec![]),
nodes_rx: local_nodes_rx,
keyspaces_rx,
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx,
}
Expand Down Expand Up @@ -236,6 +247,10 @@ impl CassandraSinkCluster {
}
}

if self.keyspaces_rx.has_changed()? {
self.pool.update_keyspaces(&mut self.keyspaces_rx).await;
}

let tables_to_rewrite: Vec<TableToRewrite> = messages
.iter_mut()
.enumerate()
Expand Down Expand Up @@ -369,7 +384,6 @@ impl CassandraSinkCluster {
&self.local_shotover_node.rack,
&metadata.version,
&mut self.rng,
1,
)
.await
{
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use uuid::Uuid;
pub struct CassandraNode {
pub address: SocketAddr,
pub rack: String,

#[derivative(Debug = "ignore")]
pub tokens: Vec<Murmur3Token>,
pub outbound: Option<CassandraConnection>,
pub host_id: Uuid,
pub is_up: bool,

#[derivative(Debug = "ignore")]
pub tokens: Vec<Murmur3Token>,
}

impl CassandraNode {
Expand Down
40 changes: 35 additions & 5 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::node::CassandraNode;
use super::routing_key::calculate_routing_key;
use super::token_map::TokenMap;
use crate::transforms::cassandra::sink_cluster::node::CassandraNode;
use super::KeyspaceChanRx;
use anyhow::{anyhow, Error, Result};
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::frame::message_result::PreparedMetadata;
Expand All @@ -19,9 +20,15 @@ pub enum GetReplicaErr {
Other(Error),
}

#[derive(Debug, Clone)]
pub struct KeyspaceMetadata {
pub replication_factor: usize,
}

#[derive(Debug)]
pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
keyspace_metadata: Arc<RwLock<HashMap<String, KeyspaceMetadata>>>,
token_map: TokenMap,
nodes: Vec<CassandraNode>,
prev_idx: usize,
Expand All @@ -31,6 +38,7 @@ impl Clone for NodePool {
fn clone(&self) -> Self {
Self {
prepared_metadata: self.prepared_metadata.clone(),
keyspace_metadata: self.keyspace_metadata.clone(),
token_map: TokenMap::new(&[]),
nodes: vec![],
prev_idx: 0,
Expand All @@ -44,6 +52,7 @@ impl NodePool {
token_map: TokenMap::new(nodes.as_slice()),
nodes,
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
keyspace_metadata: Arc::new(RwLock::new(HashMap::new())),
prev_idx: 0,
}
}
Expand All @@ -70,6 +79,12 @@ impl NodePool {
self.token_map = TokenMap::new(self.nodes.as_slice());
}

pub async fn update_keyspaces(&mut self, keyspaces_rx: &mut KeyspaceChanRx) {
let updated_keyspaces = keyspaces_rx.borrow_and_update().clone();
let mut write_keyspaces = self.keyspace_metadata.write().await;
*write_keyspaces = updated_keyspaces;
}

pub async fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) {
let mut write_lock = self.prepared_metadata.write().await;
write_lock.insert(id, metadata);
Expand Down Expand Up @@ -121,7 +136,6 @@ impl NodePool {
rack: &str,
version: &Version,
rng: &mut SmallRng,
rf: usize, // TODO this parameter should be removed
) -> Result<Option<&mut CassandraNode>, GetReplicaErr> {
let metadata = {
let read_lock = self.prepared_metadata.read().await;
Expand All @@ -131,19 +145,35 @@ impl NodePool {
.clone()
};

let keyspace = {
let read_lock = self.keyspace_metadata.read().await;
read_lock
.get(
&metadata
.global_table_spec
.as_ref()
.ok_or_else(|| GetReplicaErr::Other(anyhow!("bruh")))?
.ks_name,
)
.ok_or(GetReplicaErr::NoMetadata)?
.clone()
};

let routing_key = calculate_routing_key(
&metadata.pk_indexes,
execute.query_parameters.values.as_ref().ok_or_else(|| {
GetReplicaErr::Other(anyhow!("Execute body does not have query paramters"))
GetReplicaErr::Other(anyhow!("Execute body does not have query parameters"))
})?,
*version,
)
.unwrap();

// TODO this should use the keyspace info to properly select the replica count
let replica_host_ids = self
.token_map
.iter_replica_nodes(Murmur3Token::generate(&routing_key), rf)
.iter_replica_nodes_capped(
Murmur3Token::generate(&routing_key),
keyspace.replication_factor,
)
.collect::<Vec<uuid::Uuid>>();

let (dc_replicas, rack_replicas) = self
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#[cfg(test)]
mod test_token_aware_router {
use super::super::node_pool::NodePool;
use super::super::node_pool::{KeyspaceMetadata, NodePool};
use super::super::routing_key::calculate_routing_key;
use crate::transforms::cassandra::sink_cluster::node::CassandraNode;
use crate::transforms::cassandra::sink_cluster::{KeyspaceChanRx, KeyspaceChanTx};
use cassandra_protocol::consistency::Consistency::One;
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::frame::message_result::PreparedMetadata;
Expand All @@ -18,6 +19,7 @@ mod test_token_aware_router {
use rand::prelude::*;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::sync::watch;
use uuid::Uuid;

#[tokio::test]
Expand All @@ -30,6 +32,23 @@ mod test_token_aware_router {
11, 241, 38, 11, 140, 72, 217, 34, 214, 128, 175, 241, 151, 73, 197, 227,
]);

let keyspace_metadata = KeyspaceMetadata {
replication_factor: 3,
};

let (keyspaces_tx, mut keyspaces_rx): (KeyspaceChanTx, KeyspaceChanRx) =
watch::channel(HashMap::new());

keyspaces_tx
.send(
[("demo_ks".to_string(), keyspace_metadata)]
.into_iter()
.collect(),
)
.unwrap();

router.update_keyspaces(&mut keyspaces_rx).await;

router
.add_prepared_result(id.clone(), prepared_metadata().clone())
.await;
Expand Down Expand Up @@ -64,7 +83,6 @@ mod test_token_aware_router {
"rack1",
&Version::V4,
&mut rng,
3,
)
.await
.unwrap()
Expand Down
18 changes: 13 additions & 5 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/token_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl TokenMap {
}

/// Returns nodes starting at given token and going in the direction of replicas.
pub fn iter_replica_nodes(
pub fn iter_replica_nodes_capped(
&self,
token: Murmur3Token,
replica_count: usize,
Expand All @@ -30,6 +30,14 @@ impl TokenMap {
.take(replica_count)
.map(|(_, node)| *node)
}

// pub fn iter_replica_nodes(&self, token: Murmur3Token) -> impl Iterator<Item = Uuid> + '_ {
// self.token_ring
// .range(token..)
// .chain(self.token_ring.iter())
// .take(self.token_ring.len())
// .map(|(_, node)| *node)
// }
}

#[cfg(test)]
Expand All @@ -47,7 +55,7 @@ mod test_token_map {
vec![
CassandraNode::new(
"127.0.0.1:9042".parse().unwrap(),
"rack1".into(),
"dc1".into(),
vec![
Murmur3Token::new(-2),
Murmur3Token::new(-1),
Expand All @@ -57,13 +65,13 @@ mod test_token_map {
),
CassandraNode::new(
"127.0.0.1:9043".parse().unwrap(),
"rack1".into(),
"dc1".into(),
vec![Murmur3Token::new(20)],
NODE_2,
),
CassandraNode::new(
"127.0.0.1:9044".parse().unwrap(),
"rack1".into(),
"dc1".into(),
vec![
Murmur3Token::new(2),
Murmur3Token::new(1),
Expand Down Expand Up @@ -98,7 +106,7 @@ mod test_token_map {
fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) {
let token_map = TokenMap::new(prepare_nodes().as_slice());
let nodes = token_map
.iter_replica_nodes(token, node_host_ids.len())
.iter_replica_nodes_capped(token, node_host_ids.len())
.collect_vec();

assert_eq!(nodes, node_host_ids);
Expand Down
Loading

0 comments on commit 3797eac

Please sign in to comment.