Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CassandraSinkCluster: Keep node list up to date via a tokio::watcher #831

Merged
merged 1 commit into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 32 additions & 45 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ use node_pool::{GetReplicaErr, NodePool};
use rand::prelude::*;
use serde::Deserialize;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, RwLock};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, watch};
use topology::{create_topology_task, TaskConnectionInfo};
use uuid::Uuid;
use version_compare::Cmp;
Expand Down Expand Up @@ -107,16 +106,11 @@ pub struct CassandraSinkCluster {
peers_v2_table: FQName,
system_keyspaces: [Identifier; 3],
local_shotover_node: ShotoverNode,
/// A local clone of topology_task_nodes.
/// Internally stores connections to the nodes.
///
/// Can be populated at two points:
/// 1. If topology_task_nodes is already populated when the very first message is received then we populate local_nodes immediately.
/// 2. Otherwise local_nodes will be populated immediately after we have a confirmed successful handshake, waiting until topology_task_nodes is populated.
/// The nodes list is populated as soon as nodes_rx makes one available, but once a confirmed succesful handshake is reached
/// we await nodes_rx to ensure that we have a nodes list from that point forward.
/// Addditionally any changes to nodes_rx is observed and copied over.
pool: NodePool,
/// Only written to by the topology task
/// Transform instances should never write to this.
topology_task_nodes: Arc<RwLock<Vec<CassandraNode>>>,
nodes_rx: watch::Receiver<Vec<CassandraNode>>,
rng: SmallRng,
task_handshake_tx: mpsc::Sender<TaskConnectionInfo>,
}
Expand All @@ -139,7 +133,9 @@ impl Clone for CassandraSinkCluster {
system_keyspaces: self.system_keyspaces.clone(),
local_shotover_node: self.local_shotover_node.clone(),
pool: NodePool::new(vec![]),
topology_task_nodes: self.topology_task_nodes.clone(),
// Because the self.nodes_rx is always copied from the original nodes_rx created before any node lists were sent,
// once a single node list has been sent all new connections will immediately recognize it as a change.
nodes_rx: self.nodes_rx.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx: self.task_handshake_tx.clone(),
}
Expand All @@ -159,12 +155,11 @@ impl CassandraSinkCluster {
let failed_requests = register_counter!("failed_requests", "chain" => chain_name.clone(), "transform" => "CassandraSinkCluster");
let receive_timeout = timeout.map(Duration::from_secs);

let nodes_shared = Arc::new(RwLock::new(vec![]));

let (local_nodes_tx, local_nodes_rx) = watch::channel(vec![]);
let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1);

create_topology_task(
nodes_shared.clone(),
local_nodes_tx,
task_handshake_rx,
local_shotover_node.data_center.clone(),
);
Expand All @@ -189,7 +184,7 @@ impl CassandraSinkCluster {
],
local_shotover_node,
pool: NodePool::new(vec![]),
topology_task_nodes: nodes_shared,
nodes_rx: local_nodes_rx,
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx,
}
Expand All @@ -212,6 +207,24 @@ fn create_query(messages: &Messages, query: &str, version: Version) -> Result<Me

impl CassandraSinkCluster {
async fn send_message(&mut self, mut messages: Messages) -> ChainResponse {
// if the node list has been updated use the new list, copying over any existing connections
if self.nodes_rx.has_changed()? {
let mut new_nodes = self.nodes_rx.borrow_and_update().clone();

for node in self.pool.nodes.drain(..) {
if let Some(outbound) = node.outbound {
for new_node in &mut new_nodes {
if new_node.host_id == node.host_id {
new_node.outbound = Some(outbound);
break;
}
}
}
}

self.pool.set_nodes(new_nodes);
}

let tables_to_rewrite: Vec<TableToRewrite> = messages
.iter_mut()
.enumerate()
Expand All @@ -236,11 +249,6 @@ impl CassandraSinkCluster {
// Create the initial connection.
// Messages will be sent through this connection until we have extracted the handshake.
if self.init_handshake_connection.is_none() {
if self.pool.nodes.is_empty() {
let nodes_shared = self.topology_task_nodes.read().await;
self.pool.set_nodes(nodes_shared.clone());
}

let random_point = if self.pool.nodes.iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
Expand Down Expand Up @@ -489,7 +497,8 @@ impl CassandraSinkCluster {
self.init_handshake_complete = true;

if self.pool.nodes.is_empty() {
self.populate_local_nodes().await?;
self.nodes_rx.changed().await?;
self.pool.nodes = self.nodes_rx.borrow_and_update().clone();

// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
Expand All @@ -513,28 +522,6 @@ impl CassandraSinkCluster {
Ok(())
}

async fn populate_local_nodes(&mut self) -> Result<()> {
let start = Instant::now();
loop {
if self.pool.nodes.is_empty() {
let nodes_shared = self.topology_task_nodes.read().await;
self.pool.set_nodes(nodes_shared.clone());
}

if !self.pool.nodes.is_empty() {
return Ok(());
}

if start.elapsed() > Duration::from_secs(10 * 60) {
return Err(anyhow!(
"10 minute timeout waiting for topology task elapsed"
));
}

tokio::time::sleep(Duration::from_millis(10)).await;
}
}

fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
if let Some(Frame::Cassandra(cassandra)) = request.frame() {
// No need to handle Batch as selects can only occur on Query
Expand Down
25 changes: 6 additions & 19 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use cassandra_protocol::frame::message_register::BodyReqRegister;
use cassandra_protocol::token::Murmur3Token;
use cassandra_protocol::{frame::Version, query::QueryParams};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::sync::{mpsc, oneshot, watch};

#[derive(Debug)]
pub struct TaskConnectionInfo {
Expand All @@ -21,15 +20,15 @@ pub struct TaskConnectionInfo {
}

pub fn create_topology_task(
nodes: Arc<RwLock<Vec<CassandraNode>>>,
nodes_tx: watch::Sender<Vec<CassandraNode>>,
mut connection_info_rx: mpsc::Receiver<TaskConnectionInfo>,
data_center: String,
) {
tokio::spawn(async move {
while let Some(mut connection_info) = connection_info_rx.recv().await {
let mut attempts = 0;
while let Err(err) =
topology_task_process(&nodes, &mut connection_info, &data_center).await
topology_task_process(&nodes_tx, &mut connection_info, &data_center).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
Expand All @@ -43,7 +42,7 @@ pub fn create_topology_task(
}

async fn topology_task_process(
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
nodes_tx: &watch::Sender<Vec<CassandraNode>>,
connection_info: &mut TaskConnectionInfo,
data_center: &str,
) -> Result<()> {
Expand All @@ -60,7 +59,7 @@ async fn topology_task_process(
let version = connection_info.connection_factory.get_version()?;

let mut nodes = fetch_current_nodes(&connection, connection_info, data_center).await?;
write_to_shared(shared_nodes, nodes.clone()).await;
nodes_tx.send(nodes.clone())?;

register_for_topology_and_status_events(&connection, version).await?;

Expand Down Expand Up @@ -119,7 +118,7 @@ async fn topology_task_process(
return Err(anyhow!("topology control connection was closed"));
}
}
write_to_shared(shared_nodes, nodes.clone()).await;
nodes_tx.send(nodes.clone())?;
}
}

Expand Down Expand Up @@ -172,18 +171,6 @@ async fn fetch_current_nodes(
Ok(new_nodes)
}

async fn write_to_shared(
shared_nodes: &Arc<RwLock<Vec<CassandraNode>>>,
new_nodes: Vec<CassandraNode>,
) {
let mut write_lock = shared_nodes.write().await;
let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes);

// Make sure to drop write_lock before the expensive_drop which will have to perform many deallocations.
std::mem::drop(write_lock);
std::mem::drop(expensive_drop);
}

mod system_local {
use super::*;

Expand Down
21 changes: 5 additions & 16 deletions shotover-proxy/tests/cassandra_int_tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use shotover_proxy::transforms::cassandra::sink_cluster::{
node::{CassandraNode, ConnectionFactory},
topology::{create_topology_task, TaskConnectionInfo},
};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio::sync::{mpsc, watch};

pub async fn run_topology_task(ca_path: Option<&str>, port: Option<u32>) -> Vec<CassandraNode> {
let port = port.unwrap_or(9042);
let nodes_shared = Arc::new(RwLock::new(vec![]));
let (nodes_tx, mut nodes_rx) = watch::channel(vec![]);
let (task_handshake_tx, task_handshake_rx) = mpsc::channel(1);
let tls = ca_path.map(|ca_path| {
TlsConnector::new(TlsConnectorConfig {
Expand All @@ -27,7 +26,7 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option<u32>) -> Vec<
connection_factory.push_handshake_message(message);
}

create_topology_task(nodes_shared.clone(), task_handshake_rx, "dc1".to_string());
create_topology_task(nodes_tx, task_handshake_rx, "dc1".to_string());

// Give the handshake task a hardcoded handshake.
// Normally the handshake is the handshake that the client gave shotover.
Expand All @@ -39,18 +38,8 @@ pub async fn run_topology_task(ca_path: Option<&str>, port: Option<u32>) -> Vec<
.await
.unwrap();

// keep attempting to read the nodes list until it is populated.
let mut nodes = vec![];
let mut tries = 0;
while nodes.is_empty() {
nodes = nodes_shared.read().await.clone();
tokio::time::sleep(std::time::Duration::from_millis(5)).await;

if tries > 2000 {
panic!("Ran out of retries for the topology task to write the nodes list");
}
tries += 1;
}
nodes_rx.changed().await.unwrap();
let nodes = nodes_rx.borrow().clone();
nodes
}

Expand Down