From dbc6da55a7974bea1415606e11e0c9ea2ba667cf Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 1 Sep 2022 18:33:46 +1000 Subject: [PATCH] Fix system.peers generation --- shotover-proxy/src/codec/cassandra.rs | 4 +- shotover-proxy/src/frame/cassandra.rs | 10 +- shotover-proxy/src/message/mod.rs | 131 ++++++++++++------ .../transforms/cassandra/sink_cluster/mod.rs | 14 +- 4 files changed, 102 insertions(+), 57 deletions(-) diff --git a/shotover-proxy/src/codec/cassandra.rs b/shotover-proxy/src/codec/cassandra.rs index 3f542edd9..ea0394ffd 100644 --- a/shotover-proxy/src/codec/cassandra.rs +++ b/shotover-proxy/src/codec/cassandra.rs @@ -51,7 +51,7 @@ impl Decoder for CassandraCodec { Ok(frame_len) => { // Clear the read bytes from the FramedReader let bytes = src.split_to(frame_len); - tracing::debug!( + tracing::error!( "incoming cassandra message:\n{}", pretty_hex::pretty_hex(&bytes) ); @@ -127,7 +127,7 @@ impl Encoder for CassandraCodec { Encodable::Bytes(bytes) => dst.extend_from_slice(&bytes), Encodable::Frame(frame) => self.encode_raw(frame.into_cassandra().unwrap(), dst), } - tracing::debug!( + tracing::error!( "outgoing cassandra message:\n{}", pretty_hex::pretty_hex(&&dst[start..]) ); diff --git a/shotover-proxy/src/frame/cassandra.rs b/shotover-proxy/src/frame/cassandra.rs index 6d827094a..6780ab6d2 100644 --- a/shotover-proxy/src/frame/cassandra.rs +++ b/shotover-proxy/src/frame/cassandra.rs @@ -482,15 +482,7 @@ impl CassandraOperation { let rows_count = rows.len() as CInt; let rows_content = rows .into_iter() - .map(|row| { - row.into_iter() - .map(|value| { - CBytes::new( - cassandra_protocol::types::value::Bytes::from(value).into_inner(), - ) - }) - .collect() - }) + .map(|row| row.into_iter().map(CBytes::from).collect()) .collect(); return ResResultBody::Rows(BodyResResultRows { diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index d5d1234ac..f9b04d7c8 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -8,6 +8,8 @@ use anyhow::{anyhow, Result}; use bigdecimal::BigDecimal; use bytes::{Buf, Bytes}; use bytes_utils::Str; +use cassandra_protocol::frame::Serialize as FrameSerialize; +use cassandra_protocol::types::CInt; use cassandra_protocol::{ frame::{ message_error::{AdditionalErrorInfo, ErrorBody}, @@ -26,6 +28,7 @@ use num::BigInt; use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; +use std::io::{Cursor, Write}; use std::net::IpAddr; use std::num::NonZeroU32; use uuid::Uuid; @@ -632,62 +635,102 @@ impl MessageValue { CassandraType::Null => MessageValue::Null, } } -} -impl From for cassandra_protocol::types::value::Bytes { - fn from(value: MessageValue) -> cassandra_protocol::types::value::Bytes { - match value { - MessageValue::Null => (-1_i32).into(), - MessageValue::Bytes(b) => cassandra_protocol::types::value::Bytes::new(b.to_vec()), - MessageValue::Strings(s) => s.into(), - MessageValue::Integer(x, size) => { - cassandra_protocol::types::value::Bytes::new(match size { - IntSize::I64 => (x as i64).to_be_bytes().to_vec(), - IntSize::I32 => (x as i32).to_be_bytes().to_vec(), - IntSize::I16 => (x as i16).to_be_bytes().to_vec(), - IntSize::I8 => (x as i8).to_be_bytes().to_vec(), - }) + fn into_cbytes(self) -> CBytes { + match self { + MessageValue::Null => { + // Despite the name of the function this actually creates a cassandra NULL value instead of a cassandra empty value + CBytes::new_empty() } - MessageValue::Float(f) => f.into_inner().into(), - MessageValue::Boolean(b) => b.into(), - MessageValue::List(l) => l.into(), - MessageValue::Rows(r) => cassandra_protocol::types::value::Bytes::from(r), - MessageValue::NamedRows(n) => cassandra_protocol::types::value::Bytes::from(n), - MessageValue::Document(d) => cassandra_protocol::types::value::Bytes::from(d), - MessageValue::Inet(i) => i.into(), - MessageValue::FragmentedResponse(l) => cassandra_protocol::types::value::Bytes::from(l), - MessageValue::Ascii(a) => a.into(), - MessageValue::Double(d) => d.into_inner().into(), - MessageValue::Set(s) => s.into_iter().collect_vec().into(), - MessageValue::Map(m) => m.into(), - MessageValue::Varint(v) => v.into(), + MessageValue::Bytes(b) => CBytes::new(b.to_vec()), + MessageValue::Strings(s) => CBytes::new(s.into_bytes()), + MessageValue::Integer(x, size) => CBytes::new(match size { + IntSize::I64 => (x as i64).to_be_bytes().to_vec(), + IntSize::I32 => (x as i32).to_be_bytes().to_vec(), + IntSize::I16 => (x as i16).to_be_bytes().to_vec(), + IntSize::I8 => (x as i8).to_be_bytes().to_vec(), + }), + MessageValue::Float(f) => CBytes::new(f.into_inner().to_be_bytes().to_vec()), + MessageValue::Boolean(b) => CBytes::new(vec![if b { 1 } else { 0 }]), + MessageValue::List(l) => vec_into_cbytes(l), + //MessageValue::Rows(r) => cassandra_protocol::types::value::Bytes::from(r), + //MessageValue::NamedRows(n) => cassandra_protocol::types::value::Bytes::from(n), + //MessageValue::Document(d) => cassandra_protocol::types::value::Bytes::from(d), + MessageValue::Inet(i) => CBytes::new(match i { + IpAddr::V4(ip) => ip.octets().to_vec(), + IpAddr::V6(ip) => ip.octets().to_vec(), + }), + MessageValue::FragmentedResponse(l) => vec_into_cbytes(l), + MessageValue::Ascii(a) => CBytes::new(a.into_bytes()), + MessageValue::Double(d) => CBytes::new(d.into_inner().to_be_bytes().into()), + MessageValue::Set(s) => vec_into_cbytes(s.into_iter().collect_vec()), + //MessageValue::Map(m) => m.into(), + //MessageValue::Varint(v) => v.into(), MessageValue::Decimal(d) => { let (unscaled, scale) = d.into_bigint_and_exponent(); - cassandra_protocol::types::decimal::Decimal { - unscaled, - scale: scale as i32, - } - .into() + CBytes::new( + cassandra_protocol::types::decimal::Decimal { + unscaled, + scale: scale as i32, + } + .serialize_to_vec(Version::V4), + ) } - MessageValue::Date(d) => d.into(), - MessageValue::Timestamp(t) => t.into(), + MessageValue::Date(d) => CBytes::new(d.to_be_bytes().to_vec()), + MessageValue::Timestamp(t) => CBytes::new(t.to_be_bytes().to_vec()), MessageValue::Duration(d) => { - // TODO: Either this function should be made fallible or we Duration should have validated setters - cassandra_protocol::types::duration::Duration::new(d.months, d.days, d.nanoseconds) + // TODO: Either this function should be made fallible or Duration should have validated setters + CBytes::new( + cassandra_protocol::types::duration::Duration::new( + d.months, + d.days, + d.nanoseconds, + ) .unwrap() - .into() + .serialize_to_vec(Version::V4), + ) } - MessageValue::Timeuuid(t) => t.into(), - MessageValue::Varchar(v) => v.into(), - MessageValue::Uuid(u) => u.into(), - MessageValue::Time(t) => t.into(), - MessageValue::Counter(c) => c.into(), - MessageValue::Tuple(t) => t.into(), - MessageValue::Udt(u) => u.into(), + //MessageValue::Timeuuid(t) => t.into(), + MessageValue::Varchar(v) => CBytes::new(v.into_bytes()), + //MessageValue::Uuid(u) => u.into(), + MessageValue::Time(t) => CBytes::new(t.to_be_bytes().to_vec()), + MessageValue::Counter(c) => CBytes::new(c.to_be_bytes().to_vec()), + //MessageValue::Tuple(t) => t.into(), + //MessageValue::Udt(u) => u.into(), + _ => todo!(), } } } +// TODO: just call into_cbytes directly +impl From for CBytes { + fn from(value: MessageValue) -> CBytes { + // cassandra-protocol handles null values incredibly poorly. + // so we need to rewrite their logic to operate at the CBytes level in order to have the null value expressible + // + // Additionally reimplementing this logic allows us to allocate a lot less + // and its also way easier to understand than the .into soup their API exposes. + // + // TODO: This should be upstreamable but will require rewriting their entire CBytes/Bytes/Value API + // and so will take a long time to both write and review + value.into_cbytes() + } +} + +fn vec_into_cbytes(values: Vec) -> CBytes { + let mut bytes = vec![]; + let mut cursor = Cursor::new(&mut bytes); + let len = values.len() as CInt; + cursor.write_all(&len.to_be_bytes()).unwrap(); + + for value in values { + let value_bytes = value.into_cbytes(); + value_bytes.serialize(&mut cursor, Version::V4); + } + + CBytes::new(bytes) +} + mod my_bytes { use bytes::Bytes; use serde::{Deserialize, Deserializer, Serializer}; diff --git a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs index 632be6133..32a88aa64 100644 --- a/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs @@ -462,6 +462,8 @@ impl CassandraSinkCluster { let mut data_center_alias = "data_center"; let mut rack_alias = "rack"; let mut host_id_alias = "host_id"; + let mut native_address_alias = "native_address"; + let mut native_port_alias = "native_port"; let mut preferred_ip_alias = "preferred_ip"; let mut preferred_port_alias = "preferred_port"; let mut rpc_address_alias = "rpc_address"; @@ -483,6 +485,10 @@ impl CassandraSinkCluster { rack_alias = alias; } else if column.name == Identifier::Unquoted("host_id".to_string()) { host_id_alias = alias; + } else if column.name == Identifier::Unquoted("native_address".to_string()) { + native_address_alias = alias; + } else if column.name == Identifier::Unquoted("native_port".to_string()) { + native_port_alias = alias; } else if column.name == Identifier::Unquoted("preferred_ip".to_string()) { preferred_ip_alias = alias; } else if column.name == Identifier::Unquoted("preferred_port".to_string()) { @@ -559,13 +565,17 @@ impl CassandraSinkCluster { || colspec.name == rpc_address_alias { MessageValue::Null - } else if colspec.name == peer_alias { + } else if colspec.name == native_address_alias { MessageValue::Inet(shotover_peer.address.ip()) - } else if colspec.name == peer_port_alias { + } else if colspec.name == native_port_alias { MessageValue::Integer( shotover_peer.address.port() as i64, IntSize::I32, ) + } else if colspec.name == peer_alias { + MessageValue::Inet(shotover_peer.address.ip()) + } else if colspec.name == peer_port_alias { + MessageValue::Integer(7000, IntSize::I32) } else if colspec.name == release_version_alias { MessageValue::Varchar(release_version.clone()) } else if colspec.name == tokens_alias {