Skip to content

Commit

Permalink
Fix system.peers generation
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 2, 2022
1 parent 0bbea6d commit dbc6da5
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 57 deletions.
4 changes: 2 additions & 2 deletions shotover-proxy/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Decoder for CassandraCodec {
Ok(frame_len) => {
// Clear the read bytes from the FramedReader
let bytes = src.split_to(frame_len);
tracing::debug!(
tracing::error!(
"incoming cassandra message:\n{}",
pretty_hex::pretty_hex(&bytes)
);
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Encoder<Messages> for CassandraCodec {
Encodable::Bytes(bytes) => dst.extend_from_slice(&bytes),
Encodable::Frame(frame) => self.encode_raw(frame.into_cassandra().unwrap(), dst),
}
tracing::debug!(
tracing::error!(
"outgoing cassandra message:\n{}",
pretty_hex::pretty_hex(&&dst[start..])
);
Expand Down
10 changes: 1 addition & 9 deletions shotover-proxy/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,15 +482,7 @@ impl CassandraOperation {
let rows_count = rows.len() as CInt;
let rows_content = rows
.into_iter()
.map(|row| {
row.into_iter()
.map(|value| {
CBytes::new(
cassandra_protocol::types::value::Bytes::from(value).into_inner(),
)
})
.collect()
})
.map(|row| row.into_iter().map(CBytes::from).collect())
.collect();

return ResResultBody::Rows(BodyResResultRows {
Expand Down
131 changes: 87 additions & 44 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use anyhow::{anyhow, Result};
use bigdecimal::BigDecimal;
use bytes::{Buf, Bytes};
use bytes_utils::Str;
use cassandra_protocol::frame::Serialize as FrameSerialize;
use cassandra_protocol::types::CInt;
use cassandra_protocol::{
frame::{
message_error::{AdditionalErrorInfo, ErrorBody},
Expand All @@ -26,6 +28,7 @@ use num::BigInt;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::io::{Cursor, Write};
use std::net::IpAddr;
use std::num::NonZeroU32;
use uuid::Uuid;
Expand Down Expand Up @@ -632,62 +635,102 @@ impl MessageValue {
CassandraType::Null => MessageValue::Null,
}
}
}

impl From<MessageValue> for cassandra_protocol::types::value::Bytes {
fn from(value: MessageValue) -> cassandra_protocol::types::value::Bytes {
match value {
MessageValue::Null => (-1_i32).into(),
MessageValue::Bytes(b) => cassandra_protocol::types::value::Bytes::new(b.to_vec()),
MessageValue::Strings(s) => s.into(),
MessageValue::Integer(x, size) => {
cassandra_protocol::types::value::Bytes::new(match size {
IntSize::I64 => (x as i64).to_be_bytes().to_vec(),
IntSize::I32 => (x as i32).to_be_bytes().to_vec(),
IntSize::I16 => (x as i16).to_be_bytes().to_vec(),
IntSize::I8 => (x as i8).to_be_bytes().to_vec(),
})
fn into_cbytes(self) -> CBytes {
match self {
MessageValue::Null => {
// Despite the name of the function this actually creates a cassandra NULL value instead of a cassandra empty value
CBytes::new_empty()
}
MessageValue::Float(f) => f.into_inner().into(),
MessageValue::Boolean(b) => b.into(),
MessageValue::List(l) => l.into(),
MessageValue::Rows(r) => cassandra_protocol::types::value::Bytes::from(r),
MessageValue::NamedRows(n) => cassandra_protocol::types::value::Bytes::from(n),
MessageValue::Document(d) => cassandra_protocol::types::value::Bytes::from(d),
MessageValue::Inet(i) => i.into(),
MessageValue::FragmentedResponse(l) => cassandra_protocol::types::value::Bytes::from(l),
MessageValue::Ascii(a) => a.into(),
MessageValue::Double(d) => d.into_inner().into(),
MessageValue::Set(s) => s.into_iter().collect_vec().into(),
MessageValue::Map(m) => m.into(),
MessageValue::Varint(v) => v.into(),
MessageValue::Bytes(b) => CBytes::new(b.to_vec()),
MessageValue::Strings(s) => CBytes::new(s.into_bytes()),
MessageValue::Integer(x, size) => CBytes::new(match size {
IntSize::I64 => (x as i64).to_be_bytes().to_vec(),
IntSize::I32 => (x as i32).to_be_bytes().to_vec(),
IntSize::I16 => (x as i16).to_be_bytes().to_vec(),
IntSize::I8 => (x as i8).to_be_bytes().to_vec(),
}),
MessageValue::Float(f) => CBytes::new(f.into_inner().to_be_bytes().to_vec()),
MessageValue::Boolean(b) => CBytes::new(vec![if b { 1 } else { 0 }]),
MessageValue::List(l) => vec_into_cbytes(l),
//MessageValue::Rows(r) => cassandra_protocol::types::value::Bytes::from(r),
//MessageValue::NamedRows(n) => cassandra_protocol::types::value::Bytes::from(n),
//MessageValue::Document(d) => cassandra_protocol::types::value::Bytes::from(d),
MessageValue::Inet(i) => CBytes::new(match i {
IpAddr::V4(ip) => ip.octets().to_vec(),
IpAddr::V6(ip) => ip.octets().to_vec(),
}),
MessageValue::FragmentedResponse(l) => vec_into_cbytes(l),
MessageValue::Ascii(a) => CBytes::new(a.into_bytes()),
MessageValue::Double(d) => CBytes::new(d.into_inner().to_be_bytes().into()),
MessageValue::Set(s) => vec_into_cbytes(s.into_iter().collect_vec()),
//MessageValue::Map(m) => m.into(),
//MessageValue::Varint(v) => v.into(),
MessageValue::Decimal(d) => {
let (unscaled, scale) = d.into_bigint_and_exponent();
cassandra_protocol::types::decimal::Decimal {
unscaled,
scale: scale as i32,
}
.into()
CBytes::new(
cassandra_protocol::types::decimal::Decimal {
unscaled,
scale: scale as i32,
}
.serialize_to_vec(Version::V4),
)
}
MessageValue::Date(d) => d.into(),
MessageValue::Timestamp(t) => t.into(),
MessageValue::Date(d) => CBytes::new(d.to_be_bytes().to_vec()),
MessageValue::Timestamp(t) => CBytes::new(t.to_be_bytes().to_vec()),
MessageValue::Duration(d) => {
// TODO: Either this function should be made fallible or we Duration should have validated setters
cassandra_protocol::types::duration::Duration::new(d.months, d.days, d.nanoseconds)
// TODO: Either this function should be made fallible or Duration should have validated setters
CBytes::new(
cassandra_protocol::types::duration::Duration::new(
d.months,
d.days,
d.nanoseconds,
)
.unwrap()
.into()
.serialize_to_vec(Version::V4),
)
}
MessageValue::Timeuuid(t) => t.into(),
MessageValue::Varchar(v) => v.into(),
MessageValue::Uuid(u) => u.into(),
MessageValue::Time(t) => t.into(),
MessageValue::Counter(c) => c.into(),
MessageValue::Tuple(t) => t.into(),
MessageValue::Udt(u) => u.into(),
//MessageValue::Timeuuid(t) => t.into(),
MessageValue::Varchar(v) => CBytes::new(v.into_bytes()),
//MessageValue::Uuid(u) => u.into(),
MessageValue::Time(t) => CBytes::new(t.to_be_bytes().to_vec()),
MessageValue::Counter(c) => CBytes::new(c.to_be_bytes().to_vec()),
//MessageValue::Tuple(t) => t.into(),
//MessageValue::Udt(u) => u.into(),
_ => todo!(),
}
}
}

// TODO: just call into_cbytes directly
impl From<MessageValue> for CBytes {
fn from(value: MessageValue) -> CBytes {
// cassandra-protocol handles null values incredibly poorly.
// so we need to rewrite their logic to operate at the CBytes level in order to have the null value expressible
//
// Additionally reimplementing this logic allows us to allocate a lot less
// and its also way easier to understand than the .into soup their API exposes.
//
// TODO: This should be upstreamable but will require rewriting their entire CBytes/Bytes/Value API
// and so will take a long time to both write and review
value.into_cbytes()
}
}

fn vec_into_cbytes(values: Vec<MessageValue>) -> CBytes {
let mut bytes = vec![];
let mut cursor = Cursor::new(&mut bytes);
let len = values.len() as CInt;
cursor.write_all(&len.to_be_bytes()).unwrap();

for value in values {
let value_bytes = value.into_cbytes();
value_bytes.serialize(&mut cursor, Version::V4);
}

CBytes::new(bytes)
}

mod my_bytes {
use bytes::Bytes;
use serde::{Deserialize, Deserializer, Serializer};
Expand Down
14 changes: 12 additions & 2 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ impl CassandraSinkCluster {
let mut data_center_alias = "data_center";
let mut rack_alias = "rack";
let mut host_id_alias = "host_id";
let mut native_address_alias = "native_address";
let mut native_port_alias = "native_port";
let mut preferred_ip_alias = "preferred_ip";
let mut preferred_port_alias = "preferred_port";
let mut rpc_address_alias = "rpc_address";
Expand All @@ -483,6 +485,10 @@ impl CassandraSinkCluster {
rack_alias = alias;
} else if column.name == Identifier::Unquoted("host_id".to_string()) {
host_id_alias = alias;
} else if column.name == Identifier::Unquoted("native_address".to_string()) {
native_address_alias = alias;
} else if column.name == Identifier::Unquoted("native_port".to_string()) {
native_port_alias = alias;
} else if column.name == Identifier::Unquoted("preferred_ip".to_string()) {
preferred_ip_alias = alias;
} else if column.name == Identifier::Unquoted("preferred_port".to_string()) {
Expand Down Expand Up @@ -559,13 +565,17 @@ impl CassandraSinkCluster {
|| colspec.name == rpc_address_alias
{
MessageValue::Null
} else if colspec.name == peer_alias {
} else if colspec.name == native_address_alias {
MessageValue::Inet(shotover_peer.address.ip())
} else if colspec.name == peer_port_alias {
} else if colspec.name == native_port_alias {
MessageValue::Integer(
shotover_peer.address.port() as i64,
IntSize::I32,
)
} else if colspec.name == peer_alias {
MessageValue::Inet(shotover_peer.address.ip())
} else if colspec.name == peer_port_alias {
MessageValue::Integer(7000, IntSize::I32)
} else if colspec.name == release_version_alias {
MessageValue::Varchar(release_version.clone())
} else if colspec.name == tokens_alias {
Expand Down

0 comments on commit dbc6da5

Please sign in to comment.