Skip to content

Commit

Permalink
CassandraSinkCluster: topology task now processes events
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 21, 2022
1 parent df03487 commit 97b93ac
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 23 deletions.
14 changes: 11 additions & 3 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,11 @@ impl CassandraSinkCluster {
let nodes_shared = self.topology_task_nodes.read().await;
self.local_nodes = nodes_shared.clone();
}
for node in &self.local_nodes {
tracing::error!("local nodes: {} {}", node.address, node.is_up);
}

let random_point = if self.local_nodes.is_empty() {
let random_point = if self.local_nodes.iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
.next()
Expand Down Expand Up @@ -296,7 +299,12 @@ impl CassandraSinkCluster {
self.init_handshake_connection.as_mut().unwrap()
} else {
// We have a full nodes list and handshake, so we can do proper routing now.
let random_node = self.local_nodes.choose_mut(&mut self.rng).unwrap();
let random_node = self
.local_nodes
.iter_mut()
.filter(|x| x.is_up)
.choose(&mut self.rng)
.unwrap();
random_node.get_connection(&self.connection_factory).await?
}
.send(message, return_chan_tx)?;
Expand Down Expand Up @@ -400,7 +408,7 @@ impl CassandraSinkCluster {
fn get_random_node_in_dc_rack(&mut self) -> &CassandraNode {
self.local_nodes
.iter()
.filter(|x| x.rack == self.local_shotover_node.rack)
.filter(|x| x.rack == self.local_shotover_node.rack && x.is_up)
.choose(&mut self.rng)
.unwrap()
}
Expand Down
17 changes: 16 additions & 1 deletion shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::codec::cassandra::CassandraCodec;
use crate::frame::Frame;
use crate::message::{Message, Messages};
use crate::tls::TlsConnector;
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::Result;
use anyhow::{anyhow, Result};
use cassandra_protocol::frame::Version;
use cassandra_protocol::token::Murmur3Token;
use std::net::SocketAddr;
use tokio::net::ToSocketAddrs;
Expand All @@ -16,6 +18,7 @@ pub struct CassandraNode {
pub tokens: Vec<Murmur3Token>,
pub outbound: Option<CassandraConnection>,
pub host_id: Uuid,
pub is_up: bool,
}

impl CassandraNode {
Expand Down Expand Up @@ -113,4 +116,16 @@ impl ConnectionFactory {
pub fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
self.pushed_messages_tx = Some(pushed_messages_tx);
}

pub fn get_version(&mut self) -> Result<Version> {
for message in &mut self.init_handshake {
if let Some(Frame::Cassandra(frame)) = message.frame() {
return Ok(frame.version);
}
}
Err(anyhow!(
"connection version could not be retrieved from the handshake because none of the {} messages in the handshake could be parsed",
self.init_handshake.len()
))
}
}
153 changes: 134 additions & 19 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageValue};
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::{anyhow, Result};
use cassandra_protocol::events::{ServerEvent, SimpleServerEvent};
use cassandra_protocol::frame::events::{StatusChangeType, TopologyChangeType};
use cassandra_protocol::frame::message_register::BodyReqRegister;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::{frame::Version, query::QueryParams};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};

#[derive(Debug)]
Expand All @@ -18,57 +22,166 @@ pub struct TaskConnectionInfo {

pub fn create_topology_task(
nodes: Arc<RwLock<Vec<CassandraNode>>>,
mut handshake_rx: mpsc::Receiver<TaskConnectionInfo>,
mut connection_info_rx: mpsc::Receiver<TaskConnectionInfo>,
data_center: String,
) {
tokio::spawn(async move {
while let Some(handshake) = handshake_rx.recv().await {
while let Some(mut connection_info) = connection_info_rx.recv().await {
let mut attempts = 0;
while let Err(err) = topology_task_process(&nodes, &handshake, &data_center).await {
while let Err(err) =
topology_task_process(&nodes, &mut connection_info, &data_center).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
if attempts > 3 {
// 3 attempts have failed, lets try a new handshake
break;
}
}

// Sleep for an hour.
// TODO: This is a crude way to ensure we dont overload the transforms with too many topology changes.
// This will be replaced with:
// * the task subscribes to events
// * the transforms request a reload when they hit connection errors
tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await;
}
});
}

async fn topology_task_process(
nodes: &Arc<RwLock<Vec<CassandraNode>>>,
handshake: &TaskConnectionInfo,
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
connection_info: &mut TaskConnectionInfo,
data_center: &str,
) -> Result<()> {
let outbound = handshake
let (pushed_messages_tx, mut pushed_messages_rx) = unbounded_channel();
connection_info
.connection_factory
.set_pushed_messages_tx(pushed_messages_tx);

let connection = connection_info
.connection_factory
.new_connection(handshake.address)
.new_connection(connection_info.address)
.await?;

let version = connection_info.connection_factory.get_version()?;

let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?;
write_to_shared(shared_nodes, nodes.clone()).await;

register_for_topology_and_status_events(&connection, version).await?;

loop {
match pushed_messages_rx.recv().await {
Some(messages) => {
for mut message in messages {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(topology) => match topology.change_type {
TopologyChangeType::NewNode => {
let mut new_nodes = fetch_current_nodes(
&connection,
connection_info,
data_center,
)
.await?;

// is_up state gets carried over to new list
for node in &nodes {
if !node.is_up {
for new_node in &mut new_nodes {
if new_node.address == node.address {
new_node.is_up = false;
}
}
}
}

nodes = new_nodes;
}
TopologyChangeType::RemovedNode => {
nodes.retain(|node| node.address != topology.addr)
}
},
ServerEvent::StatusChange(status) => {
for node in &mut nodes {
if node.address == status.addr {
node.is_up = match status.change_type {
StatusChangeType::Up => true,
StatusChangeType::Down => false,
}
}
}
}
event => tracing::error!("Unexpected event: {:?}", event),
}
}
}
}
None => {
return Err(anyhow!("topology control connection was closed"));
}
}
write_to_shared(shared_nodes, nodes.clone()).await;
}
}

async fn register_for_topology_and_status_events(
connection: &CassandraConnection,
version: Version,
) -> Result<()> {
let (tx, rx) = oneshot::channel();
connection
.send(
Message::from_frame(Frame::Cassandra(CassandraFrame {
version,
stream_id: 0,
tracing_id: None,
warnings: vec![],
operation: CassandraOperation::Register(BodyReqRegister {
events: vec![
SimpleServerEvent::TopologyChange,
SimpleServerEvent::StatusChange,
],
}),
})),
tx,
)
.unwrap();

if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = rx.await?.response?.frame() {
match operation {
CassandraOperation::Ready(_) => Ok(()),
operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {:?}", operation))
}
} else {
Err(anyhow!("Failed to parse cassandra message"))
}
}

async fn fetch_current_nodes(
connection: &CassandraConnection,
connection_info: &TaskConnectionInfo,
data_center: &str,
) -> Result<Vec<CassandraNode>> {
let (new_nodes, more_nodes) = tokio::join!(
system_local::query(&outbound, data_center, handshake.address),
system_peers::query(&outbound, data_center)
system_local::query(connection, data_center, connection_info.address),
system_peers::query(connection, data_center)
);

let mut new_nodes = new_nodes?;
new_nodes.extend(more_nodes?);

let mut write_lock = nodes.write().await;
Ok(new_nodes)
}

async fn write_to_shared(
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
new_nodes: Vec<CassandraNode>,
) {
let mut write_lock = shared_nodes.write().await;
let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes);

// Make sure to drop write_lock before the expensive_drop which will have to perform many deallocations.
std::mem::drop(write_lock);
std::mem::drop(expensive_drop);

Ok(())
}

mod system_local {
Expand Down Expand Up @@ -147,6 +260,7 @@ mod system_local {
tokens,
outbound: None,
host_id,
is_up: true,
})
})
.collect(),
Expand Down Expand Up @@ -290,6 +404,7 @@ mod system_peers {
tokens,
outbound: None,
host_id,
is_up: true,
})
})
.collect(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) {
possible_racks.remove(rack_index);

assert_eq!(node.tokens.len(), 128);
assert!(node.is_up);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) {

assert_eq!(node.rack, "rack1");
assert_eq!(node.tokens.len(), 128);
assert!(node.is_up);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option<u3

assert_eq!(node.rack, "rack1");
assert_eq!(node.tokens.len(), 128);
assert!(node.is_up);
}
}

Expand Down

0 comments on commit 97b93ac

Please sign in to comment.