Skip to content

Commit

Permalink
Merge branch 'main' into upstream-futures-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Aug 16, 2022
2 parents 49bd298 + d19e190 commit 6389eb3
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 79 deletions.
177 changes: 99 additions & 78 deletions shotover-proxy/src/transforms/cassandra/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::transforms::util::Response;
use crate::transforms::{Transform, Transforms, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cassandra_protocol::consistency::Consistency;
use cassandra_protocol::frame::Version;
use cassandra_protocol::query::QueryParams;
use cql3_parser::cassandra_statement::CassandraStatement;
Expand Down Expand Up @@ -262,8 +261,15 @@ pub fn create_topology_task(
) {
tokio::spawn(async move {
while let Some(handshake) = handshake_rx.recv().await {
if let Err(err) = topology_task_process(&tls, &nodes, handshake, &data_center).await {
tracing::error!("{err:?}");
let mut attempts = 0;
while let Err(err) = topology_task_process(&tls, &nodes, &handshake, &data_center).await
{
tracing::error!("topology task failed, retrying, error was: {err:?}");
attempts += 1;
if attempts > 3 {
// 3 attempts have failed, lets try a new handshake
break;
}
}

// Sleep for an hour.
Expand All @@ -279,37 +285,51 @@ pub fn create_topology_task(
async fn topology_task_process(
tls: &Option<TlsConnector>,
nodes: &Arc<RwLock<Vec<CassandraNode>>>,
handshake: TaskHandshake,
handshake: &TaskHandshake,
data_center: &str,
) -> Result<()> {
let outbound = new_connection(handshake.address, &handshake.handshake, tls, &None).await?;
let outbound = new_connection(&handshake.address, &handshake.handshake, tls, &None).await?;

let (return_chan_tx, return_chan_rx) = oneshot::channel();
let (peers_tx, peers_rx) = oneshot::channel();
outbound.send(
Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
stream_id: 0,
tracing_id: None,
warnings: vec![],
operation: CassandraOperation::Query {
query: Box::new(parse_statement_single("SELECT * FROM system.peers")),
params: Box::new(QueryParams {
consistency: Consistency::One,
with_names: false,
values: None,
page_size: Some(5000),
paging_state: None,
serial_consistency: None,
timestamp: Some(1643855761086585),
keyspace: None,
now_in_seconds: None,
}),
query: Box::new(parse_statement_single(
"SELECT peer, rack, data_center, tokens FROM system.peers",
)),
params: Box::new(QueryParams::default()),
},
})),
peers_tx,
)?;

let (local_tx, local_rx) = oneshot::channel();
outbound.send(
Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
stream_id: 1,
tracing_id: None,
warnings: vec![],
operation: CassandraOperation::Query {
query: Box::new(parse_statement_single(
"SELECT listen_address, rack, data_center, tokens FROM system.local",
)),
params: Box::new(QueryParams::default()),
},
})),
return_chan_tx,
local_tx,
)?;
let mut response = return_chan_rx.await?.response?;
let new_nodes = get_nodes_from_system_peers(&mut response, data_center);

let (new_nodes, more_nodes) = tokio::join!(
async { system_peers_into_nodes(peers_rx.await?.response?, data_center) },
async { system_peers_into_nodes(local_rx.await?.response?, data_center) }
);
let mut new_nodes = new_nodes?;
new_nodes.extend(more_nodes?);

let mut write_lock = nodes.write().await;
let expensive_drop = std::mem::replace(&mut *write_lock, new_nodes);
Expand All @@ -321,69 +341,70 @@ async fn topology_task_process(
Ok(())
}

fn get_nodes_from_system_peers(
response: &mut Message,
fn system_peers_into_nodes(
mut response: Message,
config_data_center: &str,
) -> Vec<CassandraNode> {
let mut new_nodes = vec![];
let peer_ident = Identifier::Unquoted("peer".into());
let rack_ident = Identifier::Unquoted("rack".into());
let data_center_ident = Identifier::Unquoted("data_center".into());
let tokens_ident = Identifier::Unquoted("tokens".into());

) -> Result<Vec<CassandraNode>> {
if let Some(Frame::Cassandra(frame)) = response.frame() {
// CassandraOperation::Error(_) is another possible case, we should silently ignore such cases
if let CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
metadata,
}) = &mut frame.operation
{
for row in rows.iter() {
let mut address = None;
let mut rack = None;
let mut data_center = None;
let mut tokens = vec![];
for (i, col) in metadata.col_specs.iter().enumerate() {
let ident = Identifier::parse(&col.name);
if ident == peer_ident {
if let Some(MessageValue::Inet(value)) = row.get(i) {
address = Some(*value);
}
} else if ident == rack_ident {
if let Some(MessageValue::Varchar(value)) = row.get(i) {
rack = Some(value.clone());
}
} else if ident == data_center_ident {
if let Some(MessageValue::Varchar(value)) = row.get(i) {
data_center = Some(value.clone());
}
} else if ident == tokens_ident {
if let Some(MessageValue::List(list)) = row.get(i) {
tokens = list
.iter()
.filter_map(|x| match x {
MessageValue::Varchar(a) => Some(a.clone()),
_ => None,
})
.collect();
}
match &mut frame.operation {
CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
..
}) => rows
.iter_mut()
.filter(|row| {
if let Some(MessageValue::Varchar(data_center)) = row.get(2) {
data_center == config_data_center
} else {
false
}
}
if let (Some(address), Some(rack), Some(data_center)) = (address, rack, data_center)
{
if data_center == config_data_center {
new_nodes.push(CassandraNode {
address,
_rack: rack,
_tokens: tokens,
outbound: None,
});
})
.map(|row| {
if row.len() != 4 {
return Err(anyhow!("expected 4 columns but was {}", row.len()));
}
}
}

let tokens = if let Some(MessageValue::List(list)) = row.pop() {
list.into_iter()
.map::<Result<String>, _>(|x| match x {
MessageValue::Varchar(a) => Ok(a),
_ => Err(anyhow!("tokens value not a varchar")),
})
.collect::<Result<Vec<String>>>()?
} else {
return Err(anyhow!("tokens not a list"));
};
let _data_center = row.pop();
let rack = if let Some(MessageValue::Varchar(value)) = row.pop() {
value
} else {
return Err(anyhow!("rack not a varchar"));
};
let address = if let Some(MessageValue::Inet(value)) = row.pop() {
value
} else {
return Err(anyhow!("address not an inet"));
};

Ok(CassandraNode {
address,
_rack: rack,
_tokens: tokens,
outbound: None,
})
})
.collect(),
operation => Err(anyhow!(
"system.peers returned unexpected cassandra operation: {:?}",
operation
)),
}
} else {
Err(anyhow!(
"Failed to parse system.peers response {:?}",
response
))
}
new_nodes
}

fn is_use_statement(request: &mut Message) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/cassandra_int_tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn test() {
}

// make assertions on the nodes list
assert_eq!(nodes.len(), 2);
assert_eq!(nodes.len(), 3);
let mut possible_addresses: Vec<IpAddr> = vec![
"172.16.1.2".parse().unwrap(),
"172.16.1.3".parse().unwrap(),
Expand Down

0 comments on commit 6389eb3

Please sign in to comment.