Skip to content

Commit

Permalink
CassandraSinkCluster: connection failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 29, 2023
1 parent c6795d2 commit 943b214
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 330 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 2 additions & 15 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,11 @@ async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) {

shotover
.shutdown_and_then_consume_events(&[
EventMatcher::new()
.with_level(Level::Error)
.with_target("shotover::server")
.with_message(
r#"connection was unexpectedly terminated
Caused by:
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: CassandraSinkCluster transform failed
2: Failed to create new connection
3: destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#,
)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::cassandra::sink_cluster")
.with_target("shotover::transforms::cassandra::sink_cluster::node_pool")
.with_message(
r#"A successful connection to a control node was made but attempts to connect to these nodes failed first:
r#"A successful connection to a node was made but attempts to connect to these nodes failed first:
* 172.16.1.3:9044:
- Failed to create new connection
- destination 172.16.1.3:9044 did not respond to connection attempt within 3s"#,
Expand Down
1 change: 0 additions & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ async-recursion = "1.0"
governor = { version = "0.5.0", default-features = false, features = ["std", "jitter", "quanta"] }
nonzero_ext = "0.3.0"
version-compare = "0.1"
split-iter = "0.1.0"
rand = { features = ["small_rng"], workspace = true }
clap.workspace = true
itertools.workspace = true
Expand Down
217 changes: 118 additions & 99 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use self::node_pool::{NodePoolBuilder, PreparedMetadata};
use self::node_pool::{get_accessible_owned_connection, NodePoolBuilder, PreparedMetadata};
use self::rewrite::{MessageRewriter, RewriteTableTy};
use crate::error::ChainResponse;
use crate::frame::cassandra::{CassandraMetadata, Tracing};
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, Messages, Metadata};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::cassandra::connection::{CassandraConnection, Response};
use crate::transforms::cassandra::connection::{CassandraConnection, Response, ResponseError};
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -254,13 +254,15 @@ impl CassandraSinkCluster {
.iter()
.any(|x| x.address == address && x.is_up)
{
let addresses = self.pool.get_shuffled_addresses_in_dc_rack(
let (connection, address) = self.pool.get_random_owned_connection_in_dc_rack(
&self.message_rewriter.local_shotover_node.rack,
&mut self.rng,
);
self.create_control_connection(&addresses).await.map_err(|e|
&self.connection_factory,
).await
.map_err(|e|
e.context("Failed to recreate control connection after control connection node went down")
)?;
self.set_control_connection(connection, address)
}
}
}
Expand All @@ -283,22 +285,34 @@ impl CassandraSinkCluster {
// Create the initial connection.
// Messages will be sent through this connection until we have extracted the handshake.
if self.control_connection.is_none() {
let points = if self.pool.nodes().iter().all(|x| !x.is_up) {
let mut points = Vec::with_capacity(self.contact_points.len());
let (connection, address) = if self.pool.nodes().iter().all(|x| !x.is_up) {
let mut start_nodes = Vec::with_capacity(self.contact_points.len());
for point in &self.contact_points {
points.push(tokio::net::lookup_host(point).await?.next().unwrap());
start_nodes.push(CassandraNode::new(
tokio::net::lookup_host(point).await?.next().unwrap(),
// All of these fields use the cheapest option because get_accessible_owned_connection does not use them at all
String::new(),
vec![],
Uuid::nil(),
));
}
points
} else {
self.pool.get_shuffled_addresses_in_dc_rack(
&self.message_rewriter.local_shotover_node.rack,
&mut self.rng,
)
};

self.create_control_connection(&points)
get_accessible_owned_connection(
&self.connection_factory,
start_nodes.iter_mut().collect(),
)
.await
.map_err(|e| e.context("Failed to create initial control connection"))?;
} else {
self.pool
.get_random_owned_connection_in_dc_rack(
&self.message_rewriter.local_shotover_node.rack,
&mut self.rng,
&self.connection_factory,
)
.await
}
.map_err(|e| e.context("Failed to create initial control connection"))?;
self.set_control_connection(connection, address);
}

if !self.init_handshake_complete {
Expand Down Expand Up @@ -357,38 +371,50 @@ impl CassandraSinkCluster {
let next_host_id = nodes_to_prepare_on
.pop()
.ok_or_else(|| anyhow!("ran out of nodes to send prepare messages to"))?;
self.pool
match self
.pool
.nodes()
.iter_mut()
.find(|node| node.host_id == next_host_id)
.ok_or_else(|| anyhow!("node {next_host_id} has dissapeared"))?
.get_connection(&self.connection_factory)
.await?
.send(message)?
.await
{
Ok(connection) => connection.send(message)?,
Err(err) => send_error_in_response_to_message(&message, &format!("{err}"))?,
}
} else if let Some((execute, metadata)) = get_execute_message(&mut message) {
// If the message is an execute we should perform token aware routing
match self
let rack = &self.message_rewriter.local_shotover_node.rack;
let connection = self
.pool
.get_replica_node_in_dc(
.get_replica_connection_in_dc(
execute,
&self.message_rewriter.local_shotover_node.rack,
rack,
self.version.unwrap(),
&mut self.rng,
&self.connection_factory,
)
.await
{
Ok(replica_node) => replica_node
.get_connection(&self.connection_factory)
.await?
.send(message)?,
Err(GetReplicaErr::NoReplicasFound | GetReplicaErr::NoKeyspaceMetadata) => self
.pool
.get_round_robin_node_in_dc_rack(
&self.message_rewriter.local_shotover_node.rack,
)
.get_connection(&self.connection_factory)
.await?
.send(message)?,
.await;

match connection {
Ok(connection) => connection.send(message)?,
Err(GetReplicaErr::NoKeyspaceMetadata) => {
match self
.pool
.get_random_connection_in_dc_rack(
rack,
&mut self.rng,
&self.connection_factory,
)
.await
{
Ok(connection) => connection.send(message)?,
Err(err) => {
send_error_in_response_to_metadata(&metadata, &format!("{err}"))
}
}
}
Err(GetReplicaErr::NoPreparedMetadata) => {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
let id = execute.id.clone();
Expand All @@ -397,33 +423,41 @@ impl CassandraSinkCluster {
// send an unprepared error in response to force
// the client to reprepare the query
return_chan_tx
.send(Ok(Message::from_frame(Frame::Cassandra(
CassandraFrame {
operation: CassandraOperation::Error(ErrorBody {
message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(),
ty: ErrorType::Unprepared(UnpreparedError { id }),
}),
stream_id: metadata.stream_id,
tracing: Tracing::Response(None), // We didn't actually hit a node so we don't have a tracing id
version: self.version.unwrap(),
warnings: vec![],
},
)))).expect("the receiver is guaranteed to be alive, so this must succeed");
.send(Ok(Message::from_frame(Frame::Cassandra(
CassandraFrame {
operation: CassandraOperation::Error(ErrorBody {
message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(),
ty: ErrorType::Unprepared(UnpreparedError { id }),
}),
stream_id: metadata.stream_id,
tracing: Tracing::Response(None), // We didn't actually hit a node so we don't have a tracing id
version: self.version.unwrap(),
warnings: vec![],
},
)))).expect("the receiver is guaranteed to be alive, so this must succeed");
return_chan_rx
}
Err(GetReplicaErr::NoNodeAvailable(err)) => {
send_error_in_response_to_metadata(&metadata, &format!("{err}"))
}
Err(GetReplicaErr::Other(err)) => {
return Err(err);
}
}
} else {
// otherwise just send to a random node
self.pool
.get_round_robin_node_in_dc_rack(
match self
.pool
.get_random_connection_in_dc_rack(
&self.message_rewriter.local_shotover_node.rack,
&mut self.rng,
&self.connection_factory,
)
.get_connection(&self.connection_factory)
.await?
.send(message)?
.await
{
Ok(connection) => connection.send(message)?,
Err(err) => send_error_in_response_to_message(&message, &format!("{err}"))?,
}
};

responses_future.push_back(return_chan_rx)
Expand Down Expand Up @@ -501,13 +535,13 @@ impl CassandraSinkCluster {
// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let addresses = self.pool.get_shuffled_addresses_in_dc_rack(
let (connection, address) = self.pool.get_random_owned_connection_in_dc_rack(
&self.message_rewriter.local_shotover_node.rack,
&mut self.rng,
);
self.create_control_connection(&addresses)
.await
&self.connection_factory
).await
.map_err(|e| e.context("Failed to recreate control connection when initial connection was possibly against the wrong node"))?;
self.set_control_connection(connection, address);
}
tracing::info!(
"Control connection finalized against node at: {:?}",
Expand All @@ -517,47 +551,9 @@ impl CassandraSinkCluster {
Ok(())
}

async fn create_control_connection(&mut self, addresses: &[SocketAddr]) -> Result<()> {
struct AddressError {
address: SocketAddr,
error: anyhow::Error,
}
fn bullet_list_of_node_failures(errors: &[AddressError]) -> String {
let mut node_errors = String::new();
for AddressError { error, address } in errors {
node_errors.push_str(&format!("\n* {address:?}:"));
for sub_error in error.chain() {
node_errors.push_str(&format!("\n - {sub_error}"));
}
}
node_errors
}

let mut errors = vec![];
for address in addresses {
match self.connection_factory.new_connection(address).await {
Ok(connection) => {
self.control_connection = Some(connection);
self.control_connection_address = Some(*address);
if !errors.is_empty() {
let node_errors = bullet_list_of_node_failures(&errors);
tracing::warn!("A successful connection to a control node was made but attempts to connect to these nodes failed first:{node_errors}");
}
return Ok(());
}
Err(error) => {
errors.push(AddressError {
error,
address: *address,
});
}
}
}

let node_errors = bullet_list_of_node_failures(&errors);
Err(anyhow!(
"Attempted to create a control connection against every node in the rack and all attempts failed:{node_errors}"
))
fn set_control_connection(&mut self, connection: CassandraConnection, address: SocketAddr) {
self.control_connection = Some(connection);
self.control_connection_address = Some(address);
}

fn is_system_query(&self, request: &mut Message) -> bool {
Expand All @@ -574,6 +570,29 @@ impl CassandraSinkCluster {
}
}

fn send_error_in_response_to_metadata(
metadata: &CassandraMetadata,
error: &str,
) -> oneshot::Receiver<Result<Message, ResponseError>> {
let (tx, rx) = oneshot::channel();
tx.send(Ok(Message::from_frame(Frame::Cassandra(
CassandraFrame::shotover_error(metadata.stream_id, metadata.version, error),
))))
.unwrap();
rx
}

fn send_error_in_response_to_message(
message: &Message,
error: &str,
) -> Result<oneshot::Receiver<Result<Message, ResponseError>>> {
if let Ok(Metadata::Cassandra(metadata)) = message.metadata() {
Ok(send_error_in_response_to_metadata(&metadata, error))
} else {
Err(anyhow!("Expected message to be of type cassandra"))
}
}

fn get_prepared_result_message(message: &mut Message) -> Option<(CBytesShort, PreparedMetadata)> {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Result(CassandraResult::Prepared(prepared)),
Expand Down
7 changes: 6 additions & 1 deletion shotover/src/transforms/cassandra/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ impl CassandraNode {

Ok(self.outbound.as_mut().unwrap())
}

pub fn report_issue(&mut self) {
self.is_up = false;
self.outbound = None;
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -90,7 +95,7 @@ impl ConnectionFactory {

/// For when you want to clone the config options for creating new connections but none of the state.
/// When the transform chain is cloned for a new incoming connection, this method should be used so the state doesn't also get cloned to
/// the new connection as aswell.
/// the new connection as well.
pub fn new_with_same_config(&self) -> Self {
Self {
connect_timeout: self.connect_timeout,
Expand Down
Loading

0 comments on commit 943b214

Please sign in to comment.