Skip to content

Commit

Permalink
Add warning to rewritten queries with WHERE (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 11, 2022
1 parent f1c3610 commit a469a28
Showing 1 changed file with 60 additions and 9 deletions.
69 changes: 60 additions & 9 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use cassandra_protocol::frame::{Opcode, Version};
use cassandra_protocol::query::QueryParams;
use cassandra_protocol::types::CBytesShort;
use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::{FQName, Identifier};
use cql3_parser::select::SelectElement;
use cql3_parser::common::{FQName, Identifier, Operand, RelationElement, RelationOperator};
use cql3_parser::select::{Select, SelectElement};
use futures::future::try_join_all;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
Expand Down Expand Up @@ -604,25 +604,37 @@ impl CassandraSinkCluster {
))
}

/// Returns any information required to correctly rewrite the response.
/// Will also perform minor modifications to the query required for the rewrite.
fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
if let Some(Frame::Cassandra(cassandra)) = request.frame() {
// No need to handle Batch as selects can only occur on Query
if let CassandraOperation::Query { query, .. } = &cassandra.operation {
if let CassandraStatement::Select(select) = query.as_ref() {
if let CassandraOperation::Query { query, .. } = &mut cassandra.operation {
if let CassandraStatement::Select(select) = query.as_mut() {
let ty = if self.local_table == select.table_name {
RewriteTableTy::Local
} else if self.peers_table == select.table_name
|| self.peers_v2_table == select.table_name
{
// TODO: fail if WHERE exists
RewriteTableTy::Peers
} else {
return None;
};

let warnings = if Self::has_no_where_clause(ty, select) {
vec![]
} else {
select.where_clause.clear();
vec![format!(
"WHERE clause on the query was ignored. Shotover does not support WHERE clauses on queries against {}",
select.table_name
)]
};

return Some(TableToRewrite {
index,
ty,
warnings,
version: cassandra.version,
selects: select.columns.clone(),
});
Expand All @@ -632,29 +644,62 @@ impl CassandraSinkCluster {
None
}

fn has_no_where_clause(ty: RewriteTableTy, select: &Select) -> bool {
select.where_clause.is_empty()
// Most drivers do `FROM system.local WHERE key = 'local'` when determining the topology.
// I'm not sure why they do that it seems to have no affect as there is only ever one row and its key is always 'local'.
// Maybe it was a workaround for an old version of cassandra that got copied around?
// To keep warning noise down we consider it as having no where clause.
|| (ty == RewriteTableTy::Local
&& select.where_clause
== [RelationElement {
obj: Operand::Column(Identifier::Unquoted("key".to_string())),
oper: RelationOperator::Equal,
value: Operand::Const("'local'".to_owned()),
}])
}

async fn rewrite_table(
&mut self,
table: TableToRewrite,
responses: &mut Vec<Message>,
) -> Result<()> {
fn get_warnings(message: &mut Message) -> Vec<String> {
if let Some(Frame::Cassandra(frame)) = message.frame() {
frame.warnings.clone()
} else {
vec![]
}
}

if table.index + 1 < responses.len() {
let peers_response = responses.remove(table.index + 1);
let mut peers_response = responses.remove(table.index + 1);

// Include warnings from every message that gets combined into the final message + any extra warnings noted in the TableToRewrite
let mut warnings = get_warnings(&mut peers_response);
warnings.extend(table.warnings.clone());

match table.ty {
RewriteTableTy::Local => {
if let Some(local_response) = responses.get_mut(table.index) {
self.rewrite_table_local(table, local_response, peers_response)
warnings.extend(get_warnings(local_response));

self.rewrite_table_local(table, local_response, peers_response, warnings)
.await?;
local_response.invalidate_cache();
}
}
RewriteTableTy::Peers => {
if table.index + 1 < responses.len() {
let local_response = responses.remove(table.index + 1);
let mut local_response = responses.remove(table.index + 1);
if let Some(client_peers_response) = responses.get_mut(table.index) {
warnings.extend(get_warnings(&mut local_response));
warnings.extend(get_warnings(client_peers_response));

let mut nodes = parse_system_nodes(peers_response)?;
nodes.extend(parse_system_nodes(local_response)?);

self.rewrite_table_peers(table, client_peers_response, nodes)
self.rewrite_table_peers(table, client_peers_response, nodes, warnings)
.await?;
client_peers_response.invalidate_cache();
}
Expand All @@ -671,6 +716,7 @@ impl CassandraSinkCluster {
table: TableToRewrite,
peers_response: &mut Message,
nodes: Vec<NodeInfo>,
warnings: Vec<String>,
) -> Result<()> {
let mut data_center_alias = "data_center";
let mut rack_alias = "rack";
Expand Down Expand Up @@ -724,6 +770,7 @@ impl CassandraSinkCluster {
}

if let Some(Frame::Cassandra(frame)) = peers_response.frame() {
frame.warnings = warnings;
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
Expand Down Expand Up @@ -820,6 +867,7 @@ impl CassandraSinkCluster {
table: TableToRewrite,
local_response: &mut Message,
peers_response: Message,
warnings: Vec<String>,
) -> Result<()> {
let mut peers = parse_system_nodes(peers_response)?;
peers.retain(|node| {
Expand Down Expand Up @@ -867,6 +915,7 @@ impl CassandraSinkCluster {
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
frame.warnings = warnings;
// The local_response message is guaranteed to come from a node that is in our configured data_center/rack.
// That means we can leave fields like rack and data_center alone and get exactly what we want.
for row in rows {
Expand Down Expand Up @@ -950,8 +999,10 @@ struct TableToRewrite {
ty: RewriteTableTy,
version: Version,
selects: Vec<SelectElement>,
warnings: Vec<String>,
}

#[derive(PartialEq, Clone, Copy)]
enum RewriteTableTy {
Local,
Peers,
Expand Down

0 comments on commit a469a28

Please sign in to comment.