Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: include partition key in RequestSpan in human readable form #766

Merged
merged 3 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions scylla/src/statement/prepared_statement.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use smallvec::{smallvec, SmallVec};
use std::convert::TryInto;
Expand All @@ -6,6 +7,9 @@ use std::time::Duration;
use thiserror::Error;
use uuid::Uuid;

use scylla_cql::frame::frame_errors::ParseError;
use scylla_cql::frame::response::result::{deser_cql_value, CqlValue};

use super::StatementConfig;
use crate::frame::response::result::PreparedMetadata;
use crate::frame::types::{Consistency, SerialConsistency};
Expand Down Expand Up @@ -337,3 +341,191 @@ pub enum PartitionKeyError {
#[error("Value bytes too long to create partition key, max 65 535 allowed! value.len(): {0}")]
ValueTooLong(usize),
}

// The PartitionKeyDecoder reverses the process of PreparedStatement::compute_partition_key:
// it returns the consecutive values of partition key column that were encoded
// by the function into the Bytes object, additionally decoding them to CqlValue.
//
// The format follows the description here:
// <https://github.com/scylladb/scylla/blob/40adf38915b6d8f5314c621a94d694d172360833/compound_compat.hh#L33-L47>
//
// TODO: Currently, if there is a null value specified for a partition column,
// it will be skipped when creating the serialized partition key. We should
// not create such partition keys in the future, i.e. fail the request or
// route it to a random replica instead and let the DB reject it. In the
// meantime, this struct will return some garbage data while it tries to
// decode the key, but nothing bad like a panic should happen otherwise.
// The struct is currently only used for printing the partition key, so that's
// completely fine.
#[derive(Clone, Copy)]
pub(crate) struct PartitionKeyDecoder<'pk> {
prepared_metadata: &'pk PreparedMetadata,
partition_key: &'pk [u8],
value_index: usize,
}

impl<'pk> PartitionKeyDecoder<'pk> {
pub(crate) fn new(prepared_metadata: &'pk PreparedMetadata, partition_key: &'pk [u8]) -> Self {
Self {
prepared_metadata,
partition_key,
value_index: 0,
}
}
}

impl<'pk> Iterator for PartitionKeyDecoder<'pk> {
type Item = Result<CqlValue, ParseError>;

fn next(&mut self) -> Option<Self::Item> {
if self.value_index >= self.prepared_metadata.pk_indexes.len() {
return None;
}

let col_idx = self.prepared_metadata.pk_indexes[self.value_index].index as usize;
let spec = &self.prepared_metadata.col_specs[col_idx];

let cell = if self.prepared_metadata.pk_indexes.len() == 1 {
Ok(self.partition_key)
} else {
self.parse_cell()
};
self.value_index += 1;
Some(cell.and_then(|mut cell| deser_cql_value(&spec.typ, &mut cell)))
}
}

impl<'pk> PartitionKeyDecoder<'pk> {
fn parse_cell(&mut self) -> Result<&'pk [u8], ParseError> {
let buf = &mut self.partition_key;
let len = buf.read_u16::<BigEndian>()? as usize;
if buf.len() < len {
return Err(ParseError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"value too short",
)));
}
let col = &buf[..len];
*buf = &buf[len..];
buf.read_u8()?;
Ok(col)
}
}
piodul marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(test)]
mod tests {
use bytes::{BufMut, Bytes, BytesMut};
use scylla_cql::frame::response::result::{
ColumnSpec, ColumnType, CqlValue, PartitionKeyIndex, PreparedMetadata, TableSpec,
};

use super::PartitionKeyDecoder;

fn make_meta(
cols: impl IntoIterator<Item = ColumnType>,
idx: impl IntoIterator<Item = usize>,
) -> PreparedMetadata {
let table_spec = TableSpec {
ks_name: "ks".to_owned(),
table_name: "t".to_owned(),
};
let col_specs: Vec<_> = cols
.into_iter()
.enumerate()
.map(|(i, typ)| ColumnSpec {
name: format!("col_{}", i),
table_spec: table_spec.clone(),
typ,
})
.collect();
let pk_indexes = idx
.into_iter()
.enumerate()
.map(|(sequence, index)| PartitionKeyIndex {
index: index as u16,
sequence: sequence as u16,
})
.collect();
PreparedMetadata {
flags: 0,
col_count: col_specs.len(),
col_specs,
pk_indexes,
}
}

fn make_key<'pk>(cols: impl IntoIterator<Item = &'pk [u8]>) -> Bytes {
let cols: Vec<_> = cols.into_iter().collect();
// TODO: Use compute_partition_key or one of the variants
// after they are moved to a more sensible place
// instead of constructing the PK manually
let mut b = BytesMut::new();
if cols.len() == 1 {
b.extend_from_slice(cols[0]);
} else {
for c in cols {
b.put_i16(c.len() as i16);
b.extend_from_slice(c);
b.put_u8(0);
}
}
b.freeze()
}

#[test]
fn test_pk_decoder_single_column() {
let meta = make_meta([ColumnType::Int], [0]);
let pk = make_key([0i32.to_be_bytes().as_ref()]);
let cells = PartitionKeyDecoder::new(&meta, &pk)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(cells, vec![CqlValue::Int(0)]);
}

#[test]
fn test_pk_decoder_multiple_columns() {
let meta = make_meta(std::iter::repeat(ColumnType::Int).take(3), [0, 1, 2]);
let pk = make_key([
12i32.to_be_bytes().as_ref(),
34i32.to_be_bytes().as_ref(),
56i32.to_be_bytes().as_ref(),
]);
let cells = PartitionKeyDecoder::new(&meta, &pk)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
cells,
vec![CqlValue::Int(12), CqlValue::Int(34), CqlValue::Int(56)],
);
}

#[test]
fn test_pk_decoder_multiple_columns_shuffled() {
let meta = make_meta(
[
ColumnType::TinyInt,
ColumnType::SmallInt,
ColumnType::Int,
ColumnType::BigInt,
ColumnType::Blob,
],
[4, 0, 3],
);
let pk = make_key([
&[1, 2, 3, 4, 5],
67i8.to_be_bytes().as_ref(),
89i64.to_be_bytes().as_ref(),
]);
let cells = PartitionKeyDecoder::new(&meta, &pk)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
cells,
vec![
CqlValue::Blob(vec![1, 2, 3, 4, 5]),
CqlValue::TinyInt(67),
CqlValue::BigInt(89),
],
);
}
}
1 change: 1 addition & 0 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl RowIterator {

let span_creator = move || {
let span = RequestSpan::new_prepared(
prepared_ref.get_prepared_metadata(),
partition_key.as_ref(),
token,
serialized_values_size,
Expand Down
32 changes: 27 additions & 5 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ use crate::cloud::CloudConfig;
use crate::frame::types::LegacyConsistency;
use crate::history;
use crate::history::HistoryListener;
use crate::prepared_statement::PartitionKeyDecoder;
use crate::retry_policy::RetryPolicy;
use crate::utils::pretty::{CommaSeparatedDisplayer, CqlValueDisplayer};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::future::join_all;
use futures::future::try_join_all;
use itertools::Either;
pub use scylla_cql::errors::TranslationError;
use scylla_cql::frame::response::result::Rows;
use scylla_cql::frame::response::result::{PreparedMetadata, Rows};
use scylla_cql::frame::response::NonErrorResponse;
use std::borrow::Borrow;
use std::collections::HashMap;
Expand Down Expand Up @@ -952,8 +955,12 @@ impl Session {
is_confirmed_lwt: prepared.is_confirmed_lwt(),
};

let span =
RequestSpan::new_prepared(partition_key.as_ref(), token, serialized_values.size());
let span = RequestSpan::new_prepared(
prepared.get_prepared_metadata(),
partition_key.as_ref(),
token,
serialized_values.size(),
);

if !span.span().is_disabled() {
if let (Some(keyspace), Some(token)) = (statement_info.keyspace.as_ref(), token) {
Expand Down Expand Up @@ -2063,11 +2070,11 @@ impl RequestSpan {
}

pub(crate) fn new_prepared(
prepared_metadata: &PreparedMetadata,
partition_key: Option<&Bytes>,
token: Option<Token>,
request_size: usize,
) -> Self {
use crate::utils::pretty::HexBytes;
use tracing::field::Empty;

let span = trace_span!(
Expand All @@ -2087,7 +2094,10 @@ impl RequestSpan {
if let Some(partition_key) = partition_key {
span.record(
"partition_key",
tracing::field::display(format_args!("{:x}", HexBytes(partition_key))),
tracing::field::display(format_args!(
"{}",
partition_key_displayer(prepared_metadata, partition_key),
)),
);
}
if let Some(token) = token {
Expand Down Expand Up @@ -2185,3 +2195,15 @@ impl Drop for RequestSpan {
);
}
}

fn partition_key_displayer<'pk>(
prepared_metadata: &'pk PreparedMetadata,
partition_key: &'pk [u8],
) -> impl Display + 'pk {
CommaSeparatedDisplayer(
PartitionKeyDecoder::new(prepared_metadata, partition_key).map(|c| match c {
Ok(c) => Either::Left(CqlValueDisplayer(c)),
Err(_) => Either::Right("<decoding error>"),
}),
)
}
Loading