Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CassandraSinkCluster: topology task now processes events #812

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl CassandraSinkCluster {
self.local_nodes = nodes_shared.clone();
}

let random_point = if self.local_nodes.is_empty() {
let random_point = if self.local_nodes.iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
.next()
Expand Down Expand Up @@ -296,7 +296,12 @@ impl CassandraSinkCluster {
self.init_handshake_connection.as_mut().unwrap()
} else {
// We have a full nodes list and handshake, so we can do proper routing now.
let random_node = self.local_nodes.choose_mut(&mut self.rng).unwrap();
let random_node = self
.local_nodes
.iter_mut()
.filter(|x| x.is_up)
.choose(&mut self.rng)
.unwrap();
random_node.get_connection(&self.connection_factory).await?
}
.send(message, return_chan_tx)?;
Expand Down Expand Up @@ -400,7 +405,7 @@ impl CassandraSinkCluster {
fn get_random_node_in_dc_rack(&mut self) -> &CassandraNode {
self.local_nodes
.iter()
.filter(|x| x.rack == self.local_shotover_node.rack)
.filter(|x| x.rack == self.local_shotover_node.rack && x.is_up)
.choose(&mut self.rng)
.unwrap()
}
Expand Down
17 changes: 16 additions & 1 deletion shotover-proxy/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::codec::cassandra::CassandraCodec;
use crate::frame::Frame;
use crate::message::{Message, Messages};
use crate::tls::TlsConnector;
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::Result;
use anyhow::{anyhow, Result};
use cassandra_protocol::frame::Version;
use cassandra_protocol::token::Murmur3Token;
use std::net::SocketAddr;
use tokio::net::ToSocketAddrs;
Expand All @@ -16,6 +18,7 @@ pub struct CassandraNode {
pub tokens: Vec<Murmur3Token>,
pub outbound: Option<CassandraConnection>,
pub host_id: Uuid,
pub is_up: bool,
}

impl CassandraNode {
Expand Down Expand Up @@ -113,4 +116,16 @@ impl ConnectionFactory {
pub fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
self.pushed_messages_tx = Some(pushed_messages_tx);
}

pub fn get_version(&mut self) -> Result<Version> {
for message in &mut self.init_handshake {
if let Some(Frame::Cassandra(frame)) = message.frame() {
return Ok(frame.version);
}
}
Err(anyhow!(
"connection version could not be retrieved from the handshake because none of the {} messages in the handshake could be parsed",
self.init_handshake.len()
))
}
}
153 changes: 134 additions & 19 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageValue};
use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::{anyhow, Result};
use cassandra_protocol::events::{ServerEvent, SimpleServerEvent};
use cassandra_protocol::frame::events::{StatusChangeType, TopologyChangeType};
use cassandra_protocol::frame::message_register::BodyReqRegister;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::{frame::Version, query::QueryParams};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};

#[derive(Debug)]
Expand All @@ -18,57 +22,166 @@ pub struct TaskConnectionInfo {

pub fn create_topology_task(
nodes: Arc<RwLock<Vec<CassandraNode>>>,
mut handshake_rx: mpsc::Receiver<TaskConnectionInfo>,
mut connection_info_rx: mpsc::Receiver<TaskConnectionInfo>,
data_center: String,
) {
tokio::spawn(async move {
while let Some(handshake) = handshake_rx.recv().await {
while let Some(mut connection_info) = connection_info_rx.recv().await {
let mut attempts = 0;
while let Err(err) = topology_task_process(&nodes, &handshake, &data_center).await {
while let Err(err) =
topology_task_process(&nodes, &mut connection_info, &data_center).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
if attempts > 3 {
// 3 attempts have failed, lets try a new handshake
break;
}
}

// Sleep for an hour.
// TODO: This is a crude way to ensure we dont overload the transforms with too many topology changes.
// This will be replaced with:
// * the task subscribes to events
// * the transforms request a reload when they hit connection errors
tokio::time::sleep(std::time::Duration::from_secs(60 * 60)).await;
}
});
}

async fn topology_task_process(
nodes: &Arc<RwLock<Vec<CassandraNode>>>,
handshake: &TaskConnectionInfo,
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
connection_info: &mut TaskConnectionInfo,
data_center: &str,
) -> Result<()> {
let outbound = handshake
let (pushed_messages_tx, mut pushed_messages_rx) = unbounded_channel();
connection_info
.connection_factory
.set_pushed_messages_tx(pushed_messages_tx);

let connection = connection_info
.connection_factory
.new_connection(handshake.address)
.new_connection(connection_info.address)
.await?;

let version = connection_info.connection_factory.get_version()?;

let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?;
write_to_shared(shared_nodes, nodes.clone()).await;

register_for_topology_and_status_events(&connection, version).await?;

loop {
match pushed_messages_rx.recv().await {
Some(messages) => {
for mut message in messages {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(topology) => match topology.change_type {
TopologyChangeType::NewNode => {
let mut new_nodes = fetch_current_nodes(
&connection,
connection_info,
data_center,
)
.await?;

// is_up state gets carried over to new list
for node in &nodes {
if !node.is_up {
for new_node in &mut new_nodes {
if new_node.address == node.address {
new_node.is_up = false;
}
}
}
}

nodes = new_nodes;
}
TopologyChangeType::RemovedNode => {
nodes.retain(|node| node.address != topology.addr)
}
},
ServerEvent::StatusChange(status) => {
for node in &mut nodes {
if node.address == status.addr {
node.is_up = match status.change_type {
StatusChangeType::Up => true,
StatusChangeType::Down => false,
}
}
}
}
event => tracing::error!("Unexpected event: {:?}", event),
}
}
}
}
None => {
return Err(anyhow!("topology control connection was closed"));
}
}
write_to_shared(shared_nodes, nodes.clone()).await;
}
}

async fn register_for_topology_and_status_events(
connection: &CassandraConnection,
version: Version,
) -> Result<()> {
let (tx, rx) = oneshot::channel();
connection
.send(
Message::from_frame(Frame::Cassandra(CassandraFrame {
version,
stream_id: 0,
tracing_id: None,
warnings: vec![],
operation: CassandraOperation::Register(BodyReqRegister {
events: vec![
SimpleServerEvent::TopologyChange,
SimpleServerEvent::StatusChange,
],
}),
})),
tx,
)
.unwrap();

if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = rx.await?.response?.frame() {
match operation {
CassandraOperation::Ready(_) => Ok(()),
operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {:?}", operation))
}
} else {
Err(anyhow!("Failed to parse cassandra message"))
}
}

async fn fetch_current_nodes(
connection: &CassandraConnection,
connection_info: &TaskConnectionInfo,
data_center: &str,
) -> Result<Vec<CassandraNode>> {
let (new_nodes, more_nodes) = tokio::join!(
system_local::query(&outbound, data_center, handshake.address),
system_peers::query(&outbound, data_center)
system_local::query(connection, data_center, connection_info.address),
system_peers::query(connection, data_center)
);

let mut new_nodes = new_nodes?;
new_nodes.extend(more_nodes?);

let mut write_lock = nodes.write().await;
Ok(new_nodes)
}

async fn write_to_shared(
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
new_nodes: Vec<CassandraNode>,
) {
let mut write_lock = shared_nodes.write().await;
let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes);

// Make sure to drop write_lock before the expensive_drop which will have to perform many deallocations.
std::mem::drop(write_lock);
std::mem::drop(expensive_drop);

Ok(())
}

mod system_local {
Expand Down Expand Up @@ -147,6 +260,7 @@ mod system_local {
tokens,
outbound: None,
host_id,
is_up: true,
})
})
.collect(),
Expand Down Expand Up @@ -290,6 +404,7 @@ mod system_peers {
tokens,
outbound: None,
host_id,
is_up: true,
})
})
.collect(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) {
possible_racks.remove(rack_index);

assert_eq!(node.tokens.len(), 128);
assert!(node.is_up);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,6 @@ pub async fn test_topology_task(ca_path: Option<&str>) {

assert_eq!(node.rack, "rack1");
assert_eq!(node.tokens.len(), 128);
assert!(node.is_up);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub async fn test_topology_task(ca_path: Option<&str>, cassandra_port: Option<u3

assert_eq!(node.rack, "rack1");
assert_eq!(node.tokens.len(), 128);
assert!(node.is_up);
}
}

Expand Down