From 97b93ac9c3a58b71e88eee0a1ecaf85a4f2dde5f Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 16 Sep 2022 15:16:28 +1000 Subject: [PATCH] CassandraSinkCluster: topology task now processes events --- .../transforms/cassandra/sink_cluster/mod.rs | 14 +- .../transforms/cassandra/sink_cluster/node.rs | 17 +- .../cassandra/sink_cluster/topology.rs | 153 +++++++++++++++--- .../cassandra_int_tests/cluster_multi_rack.rs | 1 + .../cluster_single_rack_v3.rs | 1 + .../cluster_single_rack_v4.rs | 1 + 6 files changed, 164 insertions(+), 23 deletions(-) diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index 444c55bb7..70c0bb05a 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -232,8 +232,11 @@ impl CassandraSinkCluster { let nodes_shared = self.topology_task_nodes.read().await; self.local_nodes = nodes_shared.clone(); } + for node in &self.local_nodes { + tracing::error!("local nodes: {} {}", node.address, node.is_up); + } - let random_point = if self.local_nodes.is_empty() { + let random_point = if self.local_nodes.iter().all(|x| !x.is_up) { tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap()) .await? .next() @@ -296,7 +299,12 @@ impl CassandraSinkCluster { self.init_handshake_connection.as_mut().unwrap() } else { // We have a full nodes list and handshake, so we can do proper routing now. - let random_node = self.local_nodes.choose_mut(&mut self.rng).unwrap(); + let random_node = self + .local_nodes + .iter_mut() + .filter(|x| x.is_up) + .choose(&mut self.rng) + .unwrap(); random_node.get_connection(&self.connection_factory).await? } .send(message, return_chan_tx)?; @@ -400,7 +408,7 @@ impl CassandraSinkCluster { fn get_random_node_in_dc_rack(&mut self) -> &CassandraNode { self.local_nodes .iter() - .filter(|x| x.rack == self.local_shotover_node.rack) + .filter(|x| x.rack == self.local_shotover_node.rack && x.is_up) .choose(&mut self.rng) .unwrap() } diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs index 19556836d..c3ef4f455 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs @@ -1,8 +1,10 @@ use crate::codec::cassandra::CassandraCodec; +use crate::frame::Frame; use crate::message::{Message, Messages}; use crate::tls::TlsConnector; use crate::transforms::cassandra::connection::CassandraConnection; -use anyhow::Result; +use anyhow::{anyhow, Result}; +use cassandra_protocol::frame::Version; use cassandra_protocol::token::Murmur3Token; use std::net::SocketAddr; use tokio::net::ToSocketAddrs; @@ -16,6 +18,7 @@ pub struct CassandraNode { pub tokens: Vec, pub outbound: Option, pub host_id: Uuid, + pub is_up: bool, } impl CassandraNode { @@ -113,4 +116,16 @@ impl ConnectionFactory { pub fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { self.pushed_messages_tx = Some(pushed_messages_tx); } + + pub fn get_version(&mut self) -> Result { + for message in &mut self.init_handshake { + if let Some(Frame::Cassandra(frame)) = message.frame() { + return Ok(frame.version); + } + } + Err(anyhow!( + "connection version could not be retrieved from the handshake because none of the {} messages in the handshake could be parsed", + self.init_handshake.len() + )) + } } diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs index 0bab5ee74..7be283ecd 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs @@ -4,10 +4,14 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; use crate::message::{Message, MessageValue}; use crate::transforms::cassandra::connection::CassandraConnection; use anyhow::{anyhow, Result}; +use cassandra_protocol::events::{ServerEvent, SimpleServerEvent}; +use cassandra_protocol::frame::events::{StatusChangeType, TopologyChangeType}; +use cassandra_protocol::frame::message_register::BodyReqRegister; use cassandra_protocol::token::Murmur3Token; use cassandra_protocol::{frame::Version, query::QueryParams}; use std::net::SocketAddr; use std::sync::Arc; +use tokio::sync::mpsc::unbounded_channel; use tokio::sync::{mpsc, oneshot, RwLock}; #[derive(Debug)] @@ -18,13 +22,15 @@ pub struct TaskConnectionInfo { pub fn create_topology_task( nodes: Arc>>, - mut handshake_rx: mpsc::Receiver, + mut connection_info_rx: mpsc::Receiver, data_center: String, ) { tokio::spawn(async move { - while let Some(handshake) = handshake_rx.recv().await { + while let Some(mut connection_info) = connection_info_rx.recv().await { let mut attempts = 0; - while let Err(err) = topology_task_process(&nodes, &handshake, &data_center).await { + while let Err(err) = + topology_task_process(&nodes, &mut connection_info, &data_center).await + { tracing::error!("topology task failed, retrying, error was: {err:?}"); attempts += 1; if attempts > 3 { @@ -32,43 +38,150 @@ pub fn create_topology_task( break; } } - - // Sleep for an hour. - // TODO: This is a crude way to ensure we dont overload the transforms with too many topology changes. - // This will be replaced with: - // * the task subscribes to events - // * the transforms request a reload when they hit connection errors - tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await; } }); } async fn topology_task_process( - nodes: &Arc>>, - handshake: &TaskConnectionInfo, + shared_nodes: &Arc>>, + connection_info: &mut TaskConnectionInfo, data_center: &str, ) -> Result<()> { - let outbound = handshake + let (pushed_messages_tx, mut pushed_messages_rx) = unbounded_channel(); + connection_info + .connection_factory + .set_pushed_messages_tx(pushed_messages_tx); + + let connection = connection_info .connection_factory - .new_connection(handshake.address) + .new_connection(connection_info.address) .await?; + let version = connection_info.connection_factory.get_version()?; + + let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?; + write_to_shared(shared_nodes, nodes.clone()).await; + + register_for_topology_and_status_events(&connection, version).await?; + + loop { + match pushed_messages_rx.recv().await { + Some(messages) => { + for mut message in messages { + if let Some(Frame::Cassandra(CassandraFrame { + operation: CassandraOperation::Event(event), + .. + })) = message.frame() + { + match event { + ServerEvent::TopologyChange(topology) => match topology.change_type { + TopologyChangeType::NewNode => { + let mut new_nodes = fetch_current_nodes( + &connection, + connection_info, + data_center, + ) + .await?; + + // is_up state gets carried over to new list + for node in &nodes { + if !node.is_up { + for new_node in &mut new_nodes { + if new_node.address == node.address { + new_node.is_up = false; + } + } + } + } + + nodes = new_nodes; + } + TopologyChangeType::RemovedNode => { + nodes.retain(|node| node.address != topology.addr) + } + }, + ServerEvent::StatusChange(status) => { + for node in &mut nodes { + if node.address == status.addr { + node.is_up = match status.change_type { + StatusChangeType::Up => true, + StatusChangeType::Down => false, + } + } + } + } + event => tracing::error!("Unexpected event: {:?}", event), + } + } + } + } + None => { + return Err(anyhow!("topology control connection was closed")); + } + } + write_to_shared(shared_nodes, nodes.clone()).await; + } +} + +async fn register_for_topology_and_status_events( + connection: &CassandraConnection, + version: Version, +) -> Result<()> { + let (tx, rx) = oneshot::channel(); + connection + .send( + Message::from_frame(Frame::Cassandra(CassandraFrame { + version, + stream_id: 0, + tracing_id: None, + warnings: vec![], + operation: CassandraOperation::Register(BodyReqRegister { + events: vec![ + SimpleServerEvent::TopologyChange, + SimpleServerEvent::StatusChange, + ], + }), + })), + tx, + ) + .unwrap(); + + if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = rx.await?.response?.frame() { + match operation { + CassandraOperation::Ready(_) => Ok(()), + operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {:?}", operation)) + } + } else { + Err(anyhow!("Failed to parse cassandra message")) + } +} + +async fn fetch_current_nodes( + connection: &CassandraConnection, + connection_info: &TaskConnectionInfo, + data_center: &str, +) -> Result> { let (new_nodes, more_nodes) = tokio::join!( - system_local::query(&outbound, data_center, handshake.address), - system_peers::query(&outbound, data_center) + system_local::query(connection, data_center, connection_info.address), + system_peers::query(connection, data_center) ); let mut new_nodes = new_nodes?; new_nodes.extend(more_nodes?); - let mut write_lock = nodes.write().await; + Ok(new_nodes) +} + +async fn write_to_shared( + shared_nodes: &Arc>>, + new_nodes: Vec, +) { + let mut write_lock = shared_nodes.write().await; let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes); // Make sure to drop write_lock before the expensive_drop which will have to perform many deallocations. std::mem::drop(write_lock); std::mem::drop(expensive_drop); - - Ok(()) } mod system_local { @@ -147,6 +260,7 @@ mod system_local { tokens, outbound: None, host_id, + is_up: true, }) }) .collect(), @@ -290,6 +404,7 @@ mod system_peers { tokens, outbound: None, host_id, + is_up: true, }) }) .collect(), diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs b/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs index 25373f4d9..a57ebedda 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster_multi_rack.rs @@ -174,5 +174,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) { possible_racks.remove(rack_index); assert_eq!(node.tokens.len(), 128); + assert!(node.is_up); } } diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v3.rs b/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v3.rs index de9a9870b..b180a0eb5 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v3.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v3.rs @@ -126,5 +126,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) { assert_eq!(node.rack, "rack1"); assert_eq!(node.tokens.len(), 128); + assert!(node.is_up); } } diff --git a/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs b/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs index 2678b9977..39c14ad03 100644 --- a/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs +++ b/shotover-proxy/tests/cassandra_int_tests/cluster_single_rack_v4.rs @@ -240,6 +240,7 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option