Skip to content

Commit

Permalink
Token aware routing (#824)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Oct 5, 2022
1 parent ccbb3c1 commit 9386f81
Show file tree
Hide file tree
Showing 5 changed files with 2,905 additions and 38 deletions.
248 changes: 210 additions & 38 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::ChainResponse;
use crate::frame::cassandra::parse_statement_single;
use crate::frame::cassandra::{parse_statement_single, CassandraMetadata};
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{IntSize, Message, MessageValue, Messages};
use crate::tls::{TlsConnector, TlsConnectorConfig};
Expand All @@ -9,8 +9,12 @@ use crate::transforms::{Transform, Transforms, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cassandra_protocol::events::ServerEvent;
use cassandra_protocol::frame::Version;
use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType, UnpreparedError};
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::frame::message_result::PreparedMetadata;
use cassandra_protocol::frame::{Opcode, Version};
use cassandra_protocol::query::QueryParams;
use cassandra_protocol::types::CBytesShort;
use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::{FQName, Identifier};
use cql3_parser::select::SelectElement;
Expand All @@ -19,6 +23,7 @@ use futures::StreamExt;
use itertools::Itertools;
use metrics::{register_counter, Counter};
use node::{CassandraNode, ConnectionFactory};
use node_pool::{GetReplicaErr, NodePool};
use rand::prelude::*;
use serde::Deserialize;
use std::net::SocketAddr;
Expand All @@ -30,6 +35,9 @@ use uuid::Uuid;
use version_compare::Cmp;

pub mod node;
mod node_pool;
mod routing_key;
mod token_map;
pub mod topology;

#[derive(Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -105,7 +113,7 @@ pub struct CassandraSinkCluster {
/// Can be populated at two points:
/// 1. If topology_task_nodes is already populated when the very first message is received then we populate local_nodes immediately.
/// 2. Otherwise local_nodes will be populated immediately after we have a confirmed successful handshake, waiting until topology_task_nodes is populated.
local_nodes: Vec<CassandraNode>,
pool: NodePool,
/// Only written to by the topology task
/// Transform instances should never write to this.
topology_task_nodes: Arc<RwLock<Vec<CassandraNode>>>,
Expand All @@ -130,7 +138,7 @@ impl Clone for CassandraSinkCluster {
peers_v2_table: self.peers_v2_table.clone(),
system_keyspaces: self.system_keyspaces.clone(),
local_shotover_node: self.local_shotover_node.clone(),
local_nodes: vec![],
pool: NodePool::new(vec![]),
topology_task_nodes: self.topology_task_nodes.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx: self.task_handshake_tx.clone(),
Expand Down Expand Up @@ -180,7 +188,7 @@ impl CassandraSinkCluster {
Identifier::parse("system_distributed"),
],
local_shotover_node,
local_nodes: vec![],
pool: NodePool::new(vec![]),
topology_task_nodes: nodes_shared,
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
task_handshake_tx,
Expand Down Expand Up @@ -228,18 +236,20 @@ impl CassandraSinkCluster {
// Create the initial connection.
// Messages will be sent through this connection until we have extracted the handshake.
if self.init_handshake_connection.is_none() {
if self.local_nodes.is_empty() {
if self.pool.nodes.is_empty() {
let nodes_shared = self.topology_task_nodes.read().await;
self.local_nodes = nodes_shared.clone();
self.pool.set_nodes(nodes_shared.clone());
}

let random_point = if self.local_nodes.iter().all(|x| !x.is_up) {
let random_point = if self.pool.nodes.iter().all(|x| !x.is_up) {
tokio::net::lookup_host(self.contact_points.choose(&mut self.rng).unwrap())
.await?
.next()
.unwrap()
} else {
self.get_random_node_in_dc_rack().address
self.pool
.get_random_node_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.address
};

self.init_handshake_connection =
Expand All @@ -265,46 +275,125 @@ impl CassandraSinkCluster {
}

let mut responses_future = FuturesOrdered::new();

let mut responses_future_use = FuturesOrdered::new();
let mut use_future_index_to_node_index = vec![];

let mut responses_future_prepare = FuturesOrdered::new();

for mut message in messages {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
if self.local_nodes.is_empty()
if self.pool.nodes.is_empty()
|| !self.init_handshake_complete
// system.local and system.peers must be routed to the same node otherwise the system.local node will be amongst the system.peers nodes and a node will be missing
// DDL statements and system.local must be routed through the same connection, so that schema_version changes appear immediately in system.local
|| is_ddl_statement(&mut message)
|| self.is_system_query(&mut message)
{
self.init_handshake_connection.as_mut().unwrap()
self.init_handshake_connection
.as_mut()
.unwrap()
.send(message, return_chan_tx)?;
} else if is_use_statement(&mut message) {
// Adding the USE statement to the handshake ensures that any new connection
// created will have the correct keyspace setup.
self.connection_factory.set_use_message(message.clone());

// Send the USE statement to all open connections to ensure they are all in sync
for (node_index, node) in self.local_nodes.iter().enumerate() {
if let Some(outbound) = &node.outbound {
for (node_index, node) in self.pool.nodes.iter().enumerate() {
if let Some(connection) = &node.outbound {
let (return_chan_tx, return_chan_rx) = oneshot::channel();
outbound.send(message.clone(), return_chan_tx)?;
connection.send(message.clone(), return_chan_tx)?;
responses_future_use.push_back(return_chan_rx);
use_future_index_to_node_index.push(node_index);
}
}

// Send the USE statement to the handshake connection and use the response as shotovers response
self.init_handshake_connection.as_mut().unwrap()
self.init_handshake_connection
.as_mut()
.unwrap()
.send(message, return_chan_tx)?;
} else if is_prepare_message(&mut message) {
// Send the PREPARE statement to all connections
for node in self.pool.nodes.iter_mut() {
let (return_chan_tx, return_chan_rx) = oneshot::channel();

node.get_connection(&self.connection_factory)
.await?
.send(message.clone(), return_chan_tx)?;

responses_future_prepare.push_back(return_chan_rx);
}

// TODO use the first node for this rather than repreparing
// send the PREPARE statement to the handshake connection and use the response as shotover's response
self.init_handshake_connection
.as_mut()
.unwrap()
.send(message, return_chan_tx)?;
} else {
// We have a full nodes list and handshake, so we can do proper routing now.
let random_node = self
.local_nodes
.iter_mut()
.filter(|x| x.is_up)
.choose(&mut self.rng)
.unwrap();
random_node.get_connection(&self.connection_factory).await?
// If the message is an execute we should perform token aware routing
if let Some((execute, metadata)) = get_execute_message(&mut message) {
match self
.pool
.replica_node(execute, &metadata.version, &mut self.rng)
{
Ok(Some(replica_node)) => {
replica_node
.get_connection(&self.connection_factory)
.await?
.send(message, return_chan_tx)?;
}
Ok(None) => {
let node = self.pool.get_random_node_in_dc_rack(
&self.local_shotover_node.rack,
&mut self.rng,
);
node.get_connection(&self.connection_factory)
.await?
.send(message, return_chan_tx)?;
}
Err(GetReplicaErr::NoMetadata) => {
let id = execute.id.clone();
tracing::info!("forcing re-prepare on {:?}", id);
// this shotover node doesn't have the metadata
// send an unprepared error in response to force
// the client to reprepare the query
return_chan_tx
.send(Response {
original: message.clone(),
response: Ok(Message::from_frame(Frame::Cassandra(
CassandraFrame {
operation: CassandraOperation::Error(ErrorBody {
message: "Shotover does not have this query's metadata. Please re-prepare on this Shotover host before sending again.".into(),
ty: ErrorType::Unprepared(UnpreparedError {
id,
}),
}),
stream_id: metadata.stream_id,
tracing_id: metadata.tracing_id,
version: metadata.version,
warnings: vec![],
},
))),
}).expect("the receiver is guaranteed to be alive, so this must succeed");
}
Err(GetReplicaErr::Other(err)) => {
return Err(err);
}
};

// otherwise just send to a random node
} else {
let node = self
.pool
.get_random_node_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng);
node.get_connection(&self.connection_factory)
.await?
.send(message, return_chan_tx)?;
}
}
.send(message, return_chan_tx)?;

responses_future.push_back(return_chan_rx)
}
Expand All @@ -313,6 +402,41 @@ impl CassandraSinkCluster {
super::connection::receive(self.read_timeout, &self.failed_requests, responses_future)
.await?;

{
let mut prepare_responses = super::connection::receive(
self.read_timeout,
&self.failed_requests,
responses_future_prepare,
)
.await?;

if !prepare_responses.windows(2).all(|w| w[0] == w[1]) {
let err_str = prepare_responses
.iter_mut()
.filter_map(|response| {
if let Some(Frame::Cassandra(CassandraFrame {
operation:
CassandraOperation::Result(CassandraResult::Prepared(prepared)),
..
})) = response.frame()
{
Some(format!("\n{:?}", prepared))
} else {
None
}
})
.collect::<String>();

if cfg!(test) {
panic!("{}", err_str);
} else {
tracing::error!(
"Nodes did not return the same response to PREPARE statement {err_str}"
);
}
}
}

// When the server indicates that it is ready for normal operation via Ready or AuthSuccess,
// we have succesfully collected an entire handshake so we mark the handshake as complete.
if !self.init_handshake_complete {
Expand All @@ -336,14 +460,20 @@ impl CassandraSinkCluster {
// If any errors occurred close the connection as we can no
// longer make any guarantees about the current state of the connection
if !is_use_statement_successful(response) {
self.local_nodes[node_index].outbound = None;
self.pool.nodes[node_index].outbound = None;
}
}

for table_to_rewrite in tables_to_rewrite {
self.rewrite_table(table_to_rewrite, &mut responses).await?;
}

for response in responses.iter_mut() {
if let Some((id, metadata)) = get_prepared_result_message(response) {
self.pool.add_prepared_result(id, metadata);
}
}

Ok(responses)
}

Expand All @@ -358,13 +488,16 @@ impl CassandraSinkCluster {
}
self.init_handshake_complete = true;

if self.local_nodes.is_empty() {
if self.pool.nodes.is_empty() {
self.populate_local_nodes().await?;

// If we have to populate the local_nodes at this point then that means the control connection
// may not have been made against a node in the configured data_center/rack.
// Therefore we need to recreate the control connection to ensure that it is in the configured data_center/rack.
let random_address = self.get_random_node_in_dc_rack().address;
let random_address = self
.pool
.get_random_node_in_dc_rack(&self.local_shotover_node.rack, &mut self.rng)
.address;
self.init_handshake_connection = Some(
self.connection_factory
.new_connection(random_address)
Expand All @@ -383,12 +516,12 @@ impl CassandraSinkCluster {
async fn populate_local_nodes(&mut self) -> Result<()> {
let start = Instant::now();
loop {
if self.local_nodes.is_empty() {
if self.pool.nodes.is_empty() {
let nodes_shared = self.topology_task_nodes.read().await;
self.local_nodes = nodes_shared.clone();
self.pool.set_nodes(nodes_shared.clone());
}

if !self.local_nodes.is_empty() {
if !self.pool.nodes.is_empty() {
return Ok(());
}

Expand All @@ -402,14 +535,6 @@ impl CassandraSinkCluster {
}
}

fn get_random_node_in_dc_rack(&mut self) -> &CassandraNode {
self.local_nodes
.iter()
.filter(|x| x.rack == self.local_shotover_node.rack && x.is_up)
.choose(&mut self.rng)
.unwrap()
}

fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
if let Some(Frame::Cassandra(cassandra)) = request.frame() {
// No need to handle Batch as selects can only occur on Query
Expand Down Expand Up @@ -754,6 +879,53 @@ enum RewriteTableTy {
Peers,
}

fn get_prepared_result_message(message: &mut Message) -> Option<(CBytesShort, PreparedMetadata)> {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Result(CassandraResult::Prepared(prepared)),
..
})) = message.frame()
{
return Some((prepared.id.clone(), prepared.metadata.clone()));
}

None
}

fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, CassandraMetadata)> {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Execute(execute_body),
version,
stream_id,
tracing_id,
..
})) = message.frame()
{
return Some((
execute_body,
CassandraMetadata {
version: *version,
stream_id: *stream_id,
tracing_id: *tracing_id,
opcode: Opcode::Execute,
},
));
}

None
}

fn is_prepare_message(message: &mut Message) -> bool {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Prepare(_),
..
})) = message.frame()
{
return true;
}

false
}

fn is_use_statement(request: &mut Message) -> bool {
if let Some(Frame::Cassandra(frame)) = request.frame() {
if let CassandraOperation::Query { query, .. } = &mut frame.operation {
Expand Down
Loading

0 comments on commit 9386f81

Please sign in to comment.