diff --git a/shotover-proxy/src/frame/cassandra.rs b/shotover-proxy/src/frame/cassandra.rs index 6efb27521..ba02805ba 100644 --- a/shotover-proxy/src/frame/cassandra.rs +++ b/shotover-proxy/src/frame/cassandra.rs @@ -16,8 +16,8 @@ use cassandra_protocol::frame::message_register::BodyReqRegister; use cassandra_protocol::frame::message_request::RequestBody; use cassandra_protocol::frame::message_response::ResponseBody; use cassandra_protocol::frame::message_result::{ - BodyResResultPrepared, BodyResResultSetKeyspace, ResResultBody, ResultKind, RowsMetadata, - RowsMetadataFlags, + BodyResResultPrepared, BodyResResultSetKeyspace, ColSpec, ColTypeOption, ResResultBody, + ResultKind, RowsMetadata, RowsMetadataFlags, }; use cassandra_protocol::frame::{ Direction, Envelope as RawCassandraFrame, Flags, Opcode, Serialize, StreamId, Version, @@ -31,6 +31,7 @@ use cql3_parser::cassandra_ast::CassandraAST; use cql3_parser::cassandra_statement::CassandraStatement; use cql3_parser::common::Operand; use nonzero_ext::nonzero; +use std::fmt::{Display, Formatter, Result as FmtResult}; use std::io::Cursor; use std::net::IpAddr; use std::num::NonZeroU32; @@ -687,6 +688,139 @@ pub struct CassandraBatch { timestamp: Option, } +impl Display for CassandraFrame { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{} stream:{}", self.version, self.stream_id)?; + if let Some(tracing_id) = self.tracing_id { + write!(f, " tracing_id:{}", tracing_id)?; + } + if !self.warnings.is_empty() { + write!(f, " warnings:{:?}", self.warnings)?; + } + match &self.operation { + CassandraOperation::Query { query, params } => { + let QueryParams { + consistency, + with_names, + values, + page_size, + paging_state, + serial_consistency, + timestamp, + keyspace, + now_in_seconds, + } = params.as_ref(); + + write!( + f, + " Query consistency:{} with_names:{:?}", + consistency, with_names, + )?; + + if let Some(values) = values { + write!(f, " values:{:?}", values)?; + } + if let Some(page_size) = page_size { + write!(f, " page_size:{:?}", page_size)?; + } + if let Some(paging_state) = paging_state { + write!(f, " paging_state:{:?}", paging_state)?; + } + if let Some(serial_consistency) = serial_consistency { + write!(f, " serial_consistency:{:?}", serial_consistency)?; + } + if let Some(timestamp) = timestamp { + write!(f, " timestamp:{:?}", timestamp)?; + } + if let Some(keyspace) = keyspace { + write!(f, " keyspace:{:?}", keyspace)?; + } + if let Some(now_in_seconds) = now_in_seconds { + write!(f, " now_in_seconds:{:?}", now_in_seconds)?; + } + write!(f, " {}", query) + } + CassandraOperation::Register(BodyReqRegister { events }) => { + write!(f, " Register {:?}", events) + } + CassandraOperation::Error(ErrorBody { + error_code, + message, + additional_info, + }) => { + write!( + f, + " Error 0x{:x} {:?} {:?}", + error_code, additional_info, message + ) + } + CassandraOperation::Result(result) => match result { + CassandraResult::Rows { rows, metadata } => { + let RowsMetadata { + flags, + columns_count, + paging_state, + new_metadata_id, + global_table_spec, + col_specs, + } = metadata.as_ref(); + + write!( + f, + " Result Rows {:?} columns_count:{}", + flags, columns_count, + )?; + if let Some(paging_state) = paging_state { + write!(f, " paging_state:{:?}", paging_state)?; + } + if let Some(new_metadata_id) = new_metadata_id { + write!(f, " new_metadata_id:{:?}", new_metadata_id)?; + } + if let Some(global_table_spec) = global_table_spec { + write!( + f, + " global_name:{}.{}", + global_table_spec.ks_name, global_table_spec.table_name + )?; + } + write!(f, " cols:[")?; + let mut need_comma = false; + for col_spec in col_specs { + let ColSpec { + table_spec, + name, + col_type, + } = col_spec; + + let ColTypeOption { id, value } = col_type; + + if need_comma { + write!(f, ", ")?; + } + need_comma = true; + write!(f, "{}:{:?}", name, id)?; + if let Some(value) = value { + write!(f, " of {:?}", value)?; + } + if let Some(table_spec) = table_spec { + write!(f, " table_spec:{:?}", table_spec)?; + } + } + write!(f, "]")?; + for row in rows { + write!(f, "\n {:?}", row)?; + } + Ok(()) + } + CassandraResult::Void => write!(f, "Result Void"), + _ => write!(f, "Result {:?}", result), + }, + CassandraOperation::Ready(_) => write!(f, " Ready"), + _ => write!(f, " {:?}", self.operation), + } + } +} + #[cfg(test)] mod test { use crate::frame::cassandra::{parse_statement_single, to_cassandra_type}; diff --git a/shotover-proxy/src/frame/mod.rs b/shotover-proxy/src/frame/mod.rs index a88f027b9..3488e6850 100644 --- a/shotover-proxy/src/frame/mod.rs +++ b/shotover-proxy/src/frame/mod.rs @@ -5,6 +5,7 @@ pub use redis_protocol::resp2::types::Frame as RedisFrame; use anyhow::{anyhow, Result}; use bytes::Bytes; +use std::fmt::{Display, Formatter, Result as FmtResult}; #[derive(PartialEq, Debug, Clone, Copy)] pub enum MessageType { @@ -78,3 +79,13 @@ impl Frame { } } } + +impl Display for Frame { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + match self { + Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame), + Frame::Redis(frame) => write!(f, "Redis {:?})", frame), + Frame::None => write!(f, "None"), + } + } +} diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index 907ebee53..48caec26f 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -347,6 +347,20 @@ impl Message { None => None, } } + + pub fn to_high_level_string(&mut self) -> String { + if let Some(response) = self.frame() { + format!("{}", response) + } else if let Some(MessageInner::RawBytes { + bytes, + message_type, + }) = &self.inner + { + format!("Unparseable {:?} message {:?}", message_type, bytes) + } else { + unreachable!("self.frame() failed so MessageInner must still be RawBytes") + } + } } /// There are 3 levels of processing the message can be in. diff --git a/shotover-proxy/src/transforms/debug/printer.rs b/shotover-proxy/src/transforms/debug/printer.rs index fc6022b4e..7c379380f 100644 --- a/shotover-proxy/src/transforms/debug/printer.rs +++ b/shotover-proxy/src/transforms/debug/printer.rs @@ -23,11 +23,17 @@ impl DebugPrinter { #[async_trait] impl Transform for DebugPrinter { - async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse { - info!("Request content: {:?}", message_wrapper.messages); + async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse { + for request in &mut message_wrapper.messages { + info!("Request: {}", request.to_high_level_string()); + } + self.counter += 1; - let response = message_wrapper.call_next_transform().await; - info!("Response content: {:?}", response); - response + let mut responses = message_wrapper.call_next_transform().await?; + + for response in &mut responses { + info!("Response: {}", response.to_high_level_string()); + } + Ok(responses) } } diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 44ee55918..8c93eb72f 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -39,7 +39,6 @@ use anyhow::Result; use async_recursion::async_recursion; use async_trait::async_trait; use core::fmt; -use core::fmt::Display; use futures::Future; use metrics::{counter, histogram}; use serde::Deserialize; @@ -401,12 +400,6 @@ impl<'a> Clone for Wrapper<'a> { } } -impl<'a> Display for Wrapper<'a> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { - f.write_fmt(format_args!("{:#?}", self.messages)) - } -} - tokio::task_local! { #[allow(clippy::declare_interior_mutable_const)] pub static CONTEXT_CHAIN_NAME: String;