Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use keyspace info for replication factor #895

Merged
merged 13 commits into from
Nov 9, 2022
22 changes: 18 additions & 4 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use futures::StreamExt;
use itertools::Itertools;
use metrics::{register_counter, Counter};
use node::{CassandraNode, ConnectionFactory};
use node_pool::{GetReplicaErr, NodePool};
use node_pool::{GetReplicaErr, KeyspaceMetadata, NodePool};
use rand::prelude::*;
use serde::Deserialize;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, watch};
Expand All @@ -42,6 +43,9 @@ mod test_router;
mod token_map;
pub mod topology;

pub type KeyspaceChanTx = watch::Sender<HashMap<String, KeyspaceMetadata>>;
pub type KeyspaceChanRx = watch::Receiver<HashMap<String, KeyspaceMetadata>>;

#[derive(Deserialize, Debug, Clone)]
pub struct CassandraSinkClusterConfig {
/// contact points must be within the specified data_center and rack.
Expand Down Expand Up @@ -118,6 +122,7 @@ pub struct CassandraSinkCluster {
/// Addditionally any changes to nodes_rx is observed and copied over.
pool: NodePool,
nodes_rx: watch::Receiver<Vec<CassandraNode>>,
keyspaces_rx: KeyspaceChanRx,
rng: SmallRng,
task_handshake_tx: mpsc::Sender<TaskConnectionInfo>,
}
Expand All @@ -143,6 +148,7 @@ impl Clone for CassandraSinkCluster {
// Because the self.nodes_rx is always copied from the original nodes_rx created before any node lists were sent,
// once a single node list has been sent all new connections will immediately recognize it as a change.
nodes_rx: self.nodes_rx.clone(),
keyspaces_rx: self.keyspaces_rx.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx: self.task_handshake_tx.clone(),
}
Expand All @@ -163,10 +169,14 @@ impl CassandraSinkCluster {
let receive_timeout = timeout.map(Duration::from_secs);

let (local_nodes_tx, local_nodes_rx) = watch::channel(vec![]);
let (keyspaces_tx, keyspaces_rx): (KeyspaceChanTx, KeyspaceChanRx) =
watch::channel(HashMap::new());

let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1);

create_topology_task(
local_nodes_tx,
keyspaces_tx,
task_handshake_rx,
local_shotover_node.data_center.clone(),
);
Expand All @@ -192,6 +202,7 @@ impl CassandraSinkCluster {
local_shotover_node,
pool: NodePool::new(vec![]),
nodes_rx: local_nodes_rx,
keyspaces_rx,
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx,
}
Expand Down Expand Up @@ -236,6 +247,10 @@ impl CassandraSinkCluster {
}
}

if self.keyspaces_rx.has_changed()? {
self.pool.update_keyspaces(&mut self.keyspaces_rx).await;
}

let tables_to_rewrite: Vec<TableToRewrite> = messages
.iter_mut()
.enumerate()
Expand Down Expand Up @@ -369,7 +384,6 @@ impl CassandraSinkCluster {
&self.local_shotover_node.rack,
&metadata.version,
&mut self.rng,
1,
)
.await
{
Expand All @@ -379,15 +393,15 @@ impl CassandraSinkCluster {
.await?
.send(message, return_chan_tx)?;
}
Ok(None) => {
Ok(None) | Err(GetReplicaErr::NoKeyspaceMetadata) => {
let node = self
.pool
.get_round_robin_node_in_dc_rack(&self.local_shotover_node.rack);
node.get_connection(&self.connection_factory)
.await?
.send(message, return_chan_tx)?;
}
Err(GetReplicaErr::NoMetadata) => {
Err(GetReplicaErr::NoPreparedMetadata) => {
let id = execute.id.clone();
tracing::info!("forcing re-prepare on {:?}", id);
// this shotover node doesn't have the metadata
Expand Down
6 changes: 3 additions & 3 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use uuid::Uuid;
pub struct CassandraNode {
pub address: SocketAddr,
pub rack: String,

#[derivative(Debug = "ignore")]
pub tokens: Vec<Murmur3Token>,
pub outbound: Option<CassandraConnection>,
pub host_id: Uuid,
pub is_up: bool,

#[derivative(Debug = "ignore")]
pub tokens: Vec<Murmur3Token>,
}

impl CassandraNode {
Expand Down
45 changes: 38 additions & 7 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::node::CassandraNode;
use super::routing_key::calculate_routing_key;
use super::token_map::TokenMap;
use crate::transforms::cassandra::sink_cluster::node::CassandraNode;
use super::KeyspaceChanRx;
use anyhow::{anyhow, Error, Result};
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::frame::message_result::PreparedMetadata;
Expand All @@ -15,13 +16,20 @@ use tokio::sync::{watch, RwLock};

#[derive(Debug)]
pub enum GetReplicaErr {
NoMetadata,
NoPreparedMetadata,
NoKeyspaceMetadata,
Other(Error),
}

#[derive(Debug, Clone, PartialEq)]
pub struct KeyspaceMetadata {
pub replication_factor: usize,
}

#[derive(Debug)]
pub struct NodePool {
prepared_metadata: Arc<RwLock<HashMap<CBytesShort, PreparedMetadata>>>,
keyspace_metadata: Arc<RwLock<HashMap<String, KeyspaceMetadata>>>,
rukai marked this conversation as resolved.
Show resolved Hide resolved
token_map: TokenMap,
nodes: Vec<CassandraNode>,
prev_idx: usize,
Expand All @@ -31,6 +39,7 @@ impl Clone for NodePool {
fn clone(&self) -> Self {
Self {
prepared_metadata: self.prepared_metadata.clone(),
keyspace_metadata: self.keyspace_metadata.clone(),
token_map: TokenMap::new(&[]),
nodes: vec![],
prev_idx: 0,
Expand All @@ -44,6 +53,7 @@ impl NodePool {
token_map: TokenMap::new(nodes.as_slice()),
nodes,
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
keyspace_metadata: Arc::new(RwLock::new(HashMap::new())),
prev_idx: 0,
}
}
Expand All @@ -70,6 +80,12 @@ impl NodePool {
self.token_map = TokenMap::new(self.nodes.as_slice());
}

pub async fn update_keyspaces(&mut self, keyspaces_rx: &mut KeyspaceChanRx) {
let updated_keyspaces = keyspaces_rx.borrow_and_update().clone();
let mut write_keyspaces = self.keyspace_metadata.write().await;
*write_keyspaces = updated_keyspaces;
}

pub async fn add_prepared_result(&mut self, id: CBytesShort, metadata: PreparedMetadata) {
let mut write_lock = self.prepared_metadata.write().await;
write_lock.insert(id, metadata);
Expand Down Expand Up @@ -121,29 +137,44 @@ impl NodePool {
rack: &str,
version: &Version,
rng: &mut SmallRng,
rf: usize, // TODO this parameter should be removed
) -> Result<Option<&mut CassandraNode>, GetReplicaErr> {
let metadata = {
let read_lock = self.prepared_metadata.read().await;
read_lock
.get(&execute.id)
.ok_or(GetReplicaErr::NoMetadata)?
.ok_or(GetReplicaErr::NoPreparedMetadata)?
.clone()
};

let keyspace = {
conorbros marked this conversation as resolved.
Show resolved Hide resolved
let read_lock = self.keyspace_metadata.read().await;
read_lock
.get(
&metadata
.global_table_spec
.as_ref()
.ok_or(GetReplicaErr::NoKeyspaceMetadata)?
.ks_name,
)
.ok_or(GetReplicaErr::NoKeyspaceMetadata)?
.clone()
};

let routing_key = calculate_routing_key(
&metadata.pk_indexes,
execute.query_parameters.values.as_ref().ok_or_else(|| {
GetReplicaErr::Other(anyhow!("Execute body does not have query paramters"))
GetReplicaErr::Other(anyhow!("Execute body does not have query parameters"))
conorbros marked this conversation as resolved.
Show resolved Hide resolved
})?,
*version,
)
.unwrap();

// TODO this should use the keyspace info to properly select the replica count
let replica_host_ids = self
.token_map
.iter_replica_nodes(Murmur3Token::generate(&routing_key), rf)
.iter_replica_nodes_capped(
Murmur3Token::generate(&routing_key),
keyspace.replication_factor,
)
.collect::<Vec<uuid::Uuid>>();

let (dc_replicas, rack_replicas) = self
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#[cfg(test)]
mod test_token_aware_router {
use super::super::node_pool::NodePool;
use super::super::node_pool::{KeyspaceMetadata, NodePool};
use super::super::routing_key::calculate_routing_key;
use crate::transforms::cassandra::sink_cluster::node::CassandraNode;
use crate::transforms::cassandra::sink_cluster::{KeyspaceChanRx, KeyspaceChanTx};
use cassandra_protocol::consistency::Consistency::One;
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::frame::message_result::PreparedMetadata;
Expand All @@ -18,6 +19,7 @@ mod test_token_aware_router {
use rand::prelude::*;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::sync::watch;
use uuid::Uuid;

#[tokio::test]
Expand All @@ -30,6 +32,23 @@ mod test_token_aware_router {
11, 241, 38, 11, 140, 72, 217, 34, 214, 128, 175, 241, 151, 73, 197, 227,
]);

let keyspace_metadata = KeyspaceMetadata {
replication_factor: 3,
};

let (keyspaces_tx, mut keyspaces_rx): (KeyspaceChanTx, KeyspaceChanRx) =
watch::channel(HashMap::new());

keyspaces_tx
.send(
[("demo_ks".to_string(), keyspace_metadata)]
.into_iter()
.collect(),
)
.unwrap();

router.update_keyspaces(&mut keyspaces_rx).await;

router
.add_prepared_result(id.clone(), prepared_metadata().clone())
.await;
Expand Down Expand Up @@ -64,7 +83,6 @@ mod test_token_aware_router {
"rack1",
&Version::V4,
&mut rng,
3,
)
.await
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl TokenMap {
}

/// Returns nodes starting at given token and going in the direction of replicas.
pub fn iter_replica_nodes(
pub fn iter_replica_nodes_capped(
&self,
token: Murmur3Token,
replica_count: usize,
Expand Down Expand Up @@ -98,7 +98,7 @@ mod test_token_map {
fn verify_tokens(node_host_ids: &[Uuid], token: Murmur3Token) {
let token_map = TokenMap::new(prepare_nodes().as_slice());
let nodes = token_map
.iter_replica_nodes(token, node_host_ids.len())
.iter_replica_nodes_capped(token, node_host_ids.len())
.collect_vec();

assert_eq!(nodes, node_host_ids);
Expand Down
Loading