Skip to content

Commit

Permalink
CassandraSinkCluster: topology task now processes events
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 19, 2022
1 parent 724ae49 commit 8d1ace1
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 68 deletions.
14 changes: 12 additions & 2 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl CassandraSinkCluster {
..
})) = message.frame()
{
tracing::error!("complete handshake");
self.connection_factory
.push_handshake_message(message.clone());
}
Expand All @@ -268,6 +269,7 @@ impl CassandraSinkCluster {
let mut responses_future_use = FuturesOrdered::new();
let mut use_future_index_to_node_index = vec![];
for mut message in messages {
tracing::error!("process message: {:?}", message.frame().unwrap());
let (return_chan_tx, return_chan_rx) = oneshot::channel();
if self.local_nodes.is_empty()
|| !self.init_handshake_complete
Expand All @@ -276,8 +278,10 @@ impl CassandraSinkCluster {
|| is_ddl_statement(&mut message)
|| self.is_system_query(&mut message)
{
tracing::error!("send to control node");
self.init_handshake_connection.as_mut().unwrap()
} else if is_use_statement(&mut message) {
tracing::error!("send to use statement");
// Adding the USE statement to the handshake ensures that any new connection
// created will have the correct keyspace setup.
self.connection_factory.set_use_message(message.clone());
Expand All @@ -296,7 +300,13 @@ impl CassandraSinkCluster {
self.init_handshake_connection.as_mut().unwrap()
} else {
// We have a full nodes list and handshake, so we can do proper routing now.
let random_node = self.local_nodes.choose_mut(&mut self.rng).unwrap();
let random_node = self
.local_nodes
.iter_mut()
.filter(|x| x.is_up)
.choose(&mut self.rng)
.unwrap();
tracing::error!("send to a random node {:?}", random_node.address);
random_node.get_connection(&self.connection_factory).await?
}
.send(message, return_chan_tx)?;
Expand Down Expand Up @@ -400,7 +410,7 @@ impl CassandraSinkCluster {
fn get_random_node_in_dc_rack(&mut self) -> &CassandraNode {
self.local_nodes
.iter()
.filter(|x| x.rack == self.local_shotover_node.rack)
.filter(|x| x.rack == self.local_shotover_node.rack && x.is_up)
.choose(&mut self.rng)
.unwrap()
}
Expand Down
17 changes: 16 additions & 1 deletion shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::codec::cassandra::CassandraCodec;
use crate::frame::Frame;
use crate::message::{Message, Messages};
use crate::tls::TlsConnector;
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::Result;
use anyhow::{anyhow, Result};
use cassandra_protocol::frame::Version;
use cassandra_protocol::token::Murmur3Token;
use std::net::SocketAddr;
use tokio::net::ToSocketAddrs;
Expand All @@ -14,6 +16,7 @@ pub struct CassandraNode {
pub rack: String,
pub tokens: Vec<Murmur3Token>,
pub outbound: Option<CassandraConnection>,
pub is_up: bool,
}

impl CassandraNode {
Expand Down Expand Up @@ -111,4 +114,16 @@ impl ConnectionFactory {
pub fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
self.pushed_messages_tx = Some(pushed_messages_tx);
}

pub fn get_version(&mut self) -> Result<Version> {
for message in &mut self.init_handshake {
if let Some(Frame::Cassandra(frame)) = message.frame() {
return Ok(frame.version);
}
}
Err(anyhow!(
"connection version could not be retrieved from the handshake because none of the {} messages in the handshake could be parsed",
self.init_handshake.len()
))
}
}
153 changes: 134 additions & 19 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageValue};
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::{anyhow, Result};
use cassandra_protocol::events::{ServerEvent, SimpleServerEvent};
use cassandra_protocol::frame::events::{StatusChangeType, TopologyChangeType};
use cassandra_protocol::frame::message_register::BodyReqRegister;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::{frame::Version, query::QueryParams};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};

#[derive(Debug)]
Expand All @@ -18,57 +22,166 @@ pub struct TaskConnectionInfo {

pub fn create_topology_task(
nodes: Arc<RwLock<Vec<CassandraNode>>>,
mut handshake_rx: mpsc::Receiver<TaskConnectionInfo>,
mut connection_info_rx: mpsc::Receiver<TaskConnectionInfo>,
data_center: String,
) {
tokio::spawn(async move {
while let Some(handshake) = handshake_rx.recv().await {
while let Some(mut connection_info) = connection_info_rx.recv().await {
let mut attempts = 0;
while let Err(err) = topology_task_process(&nodes, &handshake, &data_center).await {
while let Err(err) =
topology_task_process(&nodes, &mut connection_info, &data_center).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
if attempts > 3 {
// 3 attempts have failed, lets try a new handshake
break;
}
}

// Sleep for an hour.
// TODO: This is a crude way to ensure we dont overload the transforms with too many topology changes.
// This will be replaced with:
// * the task subscribes to events
// * the transforms request a reload when they hit connection errors
tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await;
}
});
}

async fn topology_task_process(
nodes: &Arc<RwLock<Vec<CassandraNode>>>,
handshake: &TaskConnectionInfo,
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
connection_info: &mut TaskConnectionInfo,
data_center: &str,
) -> Result<()> {
let outbound = handshake
let (pushed_messages_tx, mut pushed_messages_rx) = unbounded_channel();
connection_info
.connection_factory
.set_pushed_messages_tx(pushed_messages_tx);

let connection = connection_info
.connection_factory
.new_connection(handshake.address)
.new_connection(connection_info.address)
.await?;

let version = connection_info.connection_factory.get_version()?;

let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?;
write_to_shared(shared_nodes, nodes.clone()).await;

register_for_topology_and_status_events(&connection, version).await?;

loop {
match pushed_messages_rx.recv().await {
Some(messages) => {
for mut message in messages {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(topology) => match topology.change_type {
TopologyChangeType::NewNode => {
let mut new_nodes = fetch_current_nodes(
&connection,
connection_info,
data_center,
)
.await?;

// is_up state gets carried over to new list
for node in &nodes {
if !node.is_up {
for new_node in &mut new_nodes {
if new_node.address == node.address {
new_node.is_up = false;
}
}
}
}

nodes = new_nodes;
}
TopologyChangeType::RemovedNode => {
nodes.retain(|node| node.address != topology.addr.addr)
}
},
ServerEvent::StatusChange(status) => {
for node in &mut nodes {
if node.address == status.addr.addr {
node.is_up = match status.change_type {
StatusChangeType::Up => true,
StatusChangeType::Down => false,
}
}
}
}
event => tracing::error!("Unexpected event: {:?}", event),
}
}
}
}
None => {
return Err(anyhow!("topology control connection was closed"));
}
}
write_to_shared(shared_nodes, nodes.clone()).await;
}
}

async fn register_for_topology_and_status_events(
connection: &CassandraConnection,
version: Version,
) -> Result<()> {
let (tx, rx) = oneshot::channel();
connection
.send(
Message::from_frame(Frame::Cassandra(CassandraFrame {
version,
stream_id: 0,
tracing_id: None,
warnings: vec![],
operation: CassandraOperation::Register(BodyReqRegister {
events: vec![
SimpleServerEvent::TopologyChange,
SimpleServerEvent::StatusChange,
],
}),
})),
tx,
)
.unwrap();

if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = rx.await?.response?.frame() {
match operation {
CassandraOperation::Ready(_) => Ok(()),
operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {:?}", operation))
}
} else {
Err(anyhow!("Failed to parse cassandra message"))
}
}

async fn fetch_current_nodes(
connection: &CassandraConnection,
connection_info: &TaskConnectionInfo,
data_center: &str,
) -> Result<Vec<CassandraNode>> {
let (new_nodes, more_nodes) = tokio::join!(
system_local::query(&outbound, data_center, handshake.address),
system_peers::query(&outbound, data_center)
system_local::query(connection, data_center, connection_info.address),
system_peers::query(connection, data_center)
);

let mut new_nodes = new_nodes?;
new_nodes.extend(more_nodes?);

let mut write_lock = nodes.write().await;
Ok(new_nodes)
}

async fn write_to_shared(
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
new_nodes: Vec<CassandraNode>,
) {
let mut write_lock = shared_nodes.write().await;
let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes);

// Make sure to drop write_lock before the expensive_drop which will have to perform many deallocations.
std::mem::drop(write_lock);
std::mem::drop(expensive_drop);

Ok(())
}

mod system_local {
Expand Down Expand Up @@ -140,6 +253,7 @@ mod system_local {
rack,
tokens,
outbound: None,
is_up: true,
})
})
.collect(),
Expand Down Expand Up @@ -276,6 +390,7 @@ mod system_peers {
rack,
tokens,
outbound: None,
is_up: true,
})
})
.collect(),
Expand Down
21 changes: 15 additions & 6 deletions shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::time::{sleep, timeout};

use crate::cassandra_int_tests::cluster::run_topology_task;
use crate::helpers::cassandra::{
assert_query_result, CassandraConnection, CassandraDriver, ResultValue,
assert_query_result, run_query, CassandraConnection, CassandraDriver, ResultValue,
};
use crate::helpers::ShotoverManager;
use std::net::SocketAddr;
Expand Down Expand Up @@ -243,16 +243,18 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option<u3
}
}

pub async fn test_events_filtering(
pub async fn test_node_going_down(
compose: DockerCompose,
shotover_manager: ShotoverManager,
driver: CassandraDriver,
) {
let session_direct = CassandraConnection::new("172.16.1.2", 9044, driver).await;
let mut event_recv_direct = session_direct.as_cdrs().create_event_receiver();
let event_session_direct =
CassandraConnection::new("172.16.1.2", 9044, CassandraDriver::CdrsTokio).await;
let mut event_recv_direct = event_session_direct.as_cdrs().create_event_receiver();

let session_shotover = CassandraConnection::new("127.0.0.1", 9042, driver).await;
let mut event_recv_shotover = session_shotover.as_cdrs().create_event_receiver();
let event_session_shotover =
CassandraConnection::new("127.0.0.1", 9042, CassandraDriver::CdrsTokio).await;
let mut event_recv_shotover = event_session_shotover.as_cdrs().create_event_receiver();

// let the driver finish connecting to the cluster and registering for the events
sleep(Duration::from_secs(10)).await;
Expand Down Expand Up @@ -306,6 +308,13 @@ pub async fn test_events_filtering(
.await
.expect_err("CassandraSinkCluster must filter out this event");

tracing::error!("before");
// Run a query to make sure that shotover succesfully routes the message to a remaining node.
let session_shotover = CassandraConnection::new("127.0.0.1", 9042, driver).await;
tracing::error!("middle");
run_query(&session_shotover, "CREATE KEYSPACE cluster_single_rack_node_going_down WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").await;
tracing::error!("after");

// Purposefully dispose of these as we left the underlying cassandra cluster in a non-recoverable state
std::mem::drop(compose);
std::mem::drop(shotover_manager);
Expand Down
Loading

0 comments on commit 8d1ace1

Please sign in to comment.