Skip to content

Commit

Permalink
Merge pull request #766 from piodul/pretty-print-routing-key-in-tracing
Browse files Browse the repository at this point in the history
session: include partition key in RequestSpan in human readable form
  • Loading branch information
wprzytula authored Jul 26, 2023
2 parents e4a8802 + a95ed83 commit e73d417
Show file tree
Hide file tree
Showing 4 changed files with 535 additions and 6 deletions.
192 changes: 192 additions & 0 deletions scylla/src/statement/prepared_statement.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use smallvec::{smallvec, SmallVec};
use std::convert::TryInto;
Expand All @@ -6,6 +7,9 @@ use std::time::Duration;
use thiserror::Error;
use uuid::Uuid;

use scylla_cql::frame::frame_errors::ParseError;
use scylla_cql::frame::response::result::{deser_cql_value, CqlValue};

use super::StatementConfig;
use crate::frame::response::result::PreparedMetadata;
use crate::frame::types::{Consistency, SerialConsistency};
Expand Down Expand Up @@ -337,3 +341,191 @@ pub enum PartitionKeyError {
#[error("Value bytes too long to create partition key, max 65 535 allowed! value.len(): {0}")]
ValueTooLong(usize),
}

// The PartitionKeyDecoder reverses the process of PreparedStatement::compute_partition_key:
// it returns the consecutive values of partition key column that were encoded
// by the function into the Bytes object, additionally decoding them to CqlValue.
//
// The format follows the description here:
// <https://github.com/scylladb/scylla/blob/40adf38915b6d8f5314c621a94d694d172360833/compound_compat.hh#L33-L47>
//
// TODO: Currently, if there is a null value specified for a partition column,
// it will be skipped when creating the serialized partition key. We should
// not create such partition keys in the future, i.e. fail the request or
// route it to a random replica instead and let the DB reject it. In the
// meantime, this struct will return some garbage data while it tries to
// decode the key, but nothing bad like a panic should happen otherwise.
// The struct is currently only used for printing the partition key, so that's
// completely fine.
#[derive(Clone, Copy)]
pub(crate) struct PartitionKeyDecoder<'pk> {
prepared_metadata: &'pk PreparedMetadata,
partition_key: &'pk [u8],
value_index: usize,
}

impl<'pk> PartitionKeyDecoder<'pk> {
pub(crate) fn new(prepared_metadata: &'pk PreparedMetadata, partition_key: &'pk [u8]) -> Self {
Self {
prepared_metadata,
partition_key,
value_index: 0,
}
}
}

impl<'pk> Iterator for PartitionKeyDecoder<'pk> {
type Item = Result<CqlValue, ParseError>;

fn next(&mut self) -> Option<Self::Item> {
if self.value_index >= self.prepared_metadata.pk_indexes.len() {
return None;
}

let col_idx = self.prepared_metadata.pk_indexes[self.value_index].index as usize;
let spec = &self.prepared_metadata.col_specs[col_idx];

let cell = if self.prepared_metadata.pk_indexes.len() == 1 {
Ok(self.partition_key)
} else {
self.parse_cell()
};
self.value_index += 1;
Some(cell.and_then(|mut cell| deser_cql_value(&spec.typ, &mut cell)))
}
}

impl<'pk> PartitionKeyDecoder<'pk> {
fn parse_cell(&mut self) -> Result<&'pk [u8], ParseError> {
let buf = &mut self.partition_key;
let len = buf.read_u16::<BigEndian>()? as usize;
if buf.len() < len {
return Err(ParseError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"value too short",
)));
}
let col = &buf[..len];
*buf = &buf[len..];
buf.read_u8()?;
Ok(col)
}
}

#[cfg(test)]
mod tests {
use bytes::{BufMut, Bytes, BytesMut};
use scylla_cql::frame::response::result::{
ColumnSpec, ColumnType, CqlValue, PartitionKeyIndex, PreparedMetadata, TableSpec,
};

use super::PartitionKeyDecoder;

fn make_meta(
cols: impl IntoIterator<Item = ColumnType>,
idx: impl IntoIterator<Item = usize>,
) -> PreparedMetadata {
let table_spec = TableSpec {
ks_name: "ks".to_owned(),
table_name: "t".to_owned(),
};
let col_specs: Vec<_> = cols
.into_iter()
.enumerate()
.map(|(i, typ)| ColumnSpec {
name: format!("col_{}", i),
table_spec: table_spec.clone(),
typ,
})
.collect();
let pk_indexes = idx
.into_iter()
.enumerate()
.map(|(sequence, index)| PartitionKeyIndex {
index: index as u16,
sequence: sequence as u16,
})
.collect();
PreparedMetadata {
flags: 0,
col_count: col_specs.len(),
col_specs,
pk_indexes,
}
}

fn make_key<'pk>(cols: impl IntoIterator<Item = &'pk [u8]>) -> Bytes {
let cols: Vec<_> = cols.into_iter().collect();
// TODO: Use compute_partition_key or one of the variants
// after they are moved to a more sensible place
// instead of constructing the PK manually
let mut b = BytesMut::new();
if cols.len() == 1 {
b.extend_from_slice(cols[0]);
} else {
for c in cols {
b.put_i16(c.len() as i16);
b.extend_from_slice(c);
b.put_u8(0);
}
}
b.freeze()
}

#[test]
fn test_pk_decoder_single_column() {
let meta = make_meta([ColumnType::Int], [0]);
let pk = make_key([0i32.to_be_bytes().as_ref()]);
let cells = PartitionKeyDecoder::new(&meta, &pk)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(cells, vec![CqlValue::Int(0)]);
}

#[test]
fn test_pk_decoder_multiple_columns() {
let meta = make_meta(std::iter::repeat(ColumnType::Int).take(3), [0, 1, 2]);
let pk = make_key([
12i32.to_be_bytes().as_ref(),
34i32.to_be_bytes().as_ref(),
56i32.to_be_bytes().as_ref(),
]);
let cells = PartitionKeyDecoder::new(&meta, &pk)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
cells,
vec![CqlValue::Int(12), CqlValue::Int(34), CqlValue::Int(56)],
);
}

#[test]
fn test_pk_decoder_multiple_columns_shuffled() {
let meta = make_meta(
[
ColumnType::TinyInt,
ColumnType::SmallInt,
ColumnType::Int,
ColumnType::BigInt,
ColumnType::Blob,
],
[4, 0, 3],
);
let pk = make_key([
&[1, 2, 3, 4, 5],
67i8.to_be_bytes().as_ref(),
89i64.to_be_bytes().as_ref(),
]);
let cells = PartitionKeyDecoder::new(&meta, &pk)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
cells,
vec![
CqlValue::Blob(vec![1, 2, 3, 4, 5]),
CqlValue::TinyInt(67),
CqlValue::BigInt(89),
],
);
}
}
1 change: 1 addition & 0 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl RowIterator {

let span_creator = move || {
let span = RequestSpan::new_prepared(
prepared_ref.get_prepared_metadata(),
partition_key.as_ref(),
token,
serialized_values_size,
Expand Down
32 changes: 27 additions & 5 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ use crate::cloud::CloudConfig;
use crate::frame::types::LegacyConsistency;
use crate::history;
use crate::history::HistoryListener;
use crate::prepared_statement::PartitionKeyDecoder;
use crate::retry_policy::RetryPolicy;
use crate::utils::pretty::{CommaSeparatedDisplayer, CqlValueDisplayer};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::future::join_all;
use futures::future::try_join_all;
use itertools::Either;
pub use scylla_cql::errors::TranslationError;
use scylla_cql::frame::response::result::Rows;
use scylla_cql::frame::response::result::{PreparedMetadata, Rows};
use scylla_cql::frame::response::NonErrorResponse;
use std::borrow::Borrow;
use std::collections::HashMap;
Expand Down Expand Up @@ -952,8 +955,12 @@ impl Session {
is_confirmed_lwt: prepared.is_confirmed_lwt(),
};

let span =
RequestSpan::new_prepared(partition_key.as_ref(), token, serialized_values.size());
let span = RequestSpan::new_prepared(
prepared.get_prepared_metadata(),
partition_key.as_ref(),
token,
serialized_values.size(),
);

if !span.span().is_disabled() {
if let (Some(keyspace), Some(token)) = (statement_info.keyspace.as_ref(), token) {
Expand Down Expand Up @@ -2063,11 +2070,11 @@ impl RequestSpan {
}

pub(crate) fn new_prepared(
prepared_metadata: &PreparedMetadata,
partition_key: Option<&Bytes>,
token: Option<Token>,
request_size: usize,
) -> Self {
use crate::utils::pretty::HexBytes;
use tracing::field::Empty;

let span = trace_span!(
Expand All @@ -2087,7 +2094,10 @@ impl RequestSpan {
if let Some(partition_key) = partition_key {
span.record(
"partition_key",
tracing::field::display(format_args!("{:x}", HexBytes(partition_key))),
tracing::field::display(format_args!(
"{}",
partition_key_displayer(prepared_metadata, partition_key),
)),
);
}
if let Some(token) = token {
Expand Down Expand Up @@ -2185,3 +2195,15 @@ impl Drop for RequestSpan {
);
}
}

fn partition_key_displayer<'pk>(
prepared_metadata: &'pk PreparedMetadata,
partition_key: &'pk [u8],
) -> impl Display + 'pk {
CommaSeparatedDisplayer(
PartitionKeyDecoder::new(prepared_metadata, partition_key).map(|c| match c {
Ok(c) => Either::Left(CqlValueDisplayer(c)),
Err(_) => Either::Right("<decoding error>"),
}),
)
}
Loading

0 comments on commit e73d417

Please sign in to comment.