From 3ed278566296d3ba4c62252cf70509f54afe021f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Tue, 18 Mar 2025 12:46:10 +0100 Subject: [PATCH 1/7] cargo/cmake: compile with cpp_rust_unstable cfg Just introducing the config file is unfortunately not enough. CMakeRust does not seem to pick up the config file. Even if it did (after some adjustments), it defines the RUSTFLAGS env variable which takes precedence over the config file. The value of RUSTFLAGS changes depending on the platform (so we can't just put them in the config file). --- scylla-rust-wrapper/.cargo/config.toml | 3 +++ scylla-rust-wrapper/CMakeLists.txt | 6 ++++++ 2 files changed, 9 insertions(+) create mode 100644 scylla-rust-wrapper/.cargo/config.toml diff --git a/scylla-rust-wrapper/.cargo/config.toml b/scylla-rust-wrapper/.cargo/config.toml new file mode 100644 index 00000000..6da9418f --- /dev/null +++ b/scylla-rust-wrapper/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +# To enable cpp-rust only features from Rust driver. +rustflags = ["--cfg", "cpp_rust_unstable"] diff --git a/scylla-rust-wrapper/CMakeLists.txt b/scylla-rust-wrapper/CMakeLists.txt index 760b6a64..2b0552da 100644 --- a/scylla-rust-wrapper/CMakeLists.txt +++ b/scylla-rust-wrapper/CMakeLists.txt @@ -21,6 +21,12 @@ macro(create_copy source_file dest_name) add_custom_target(${dest_name}_copy ALL DEPENDS ${CMAKE_BINARY_DIR}/${dest_name}) endmacro() +if(DEFINED CMAKE_Rust_FLAGS) + set(CMAKE_Rust_FLAGS "${CMAKE_Rust_FLAGS} --cfg cpp_rust_unstable") +else() + set(CMAKE_Rust_FLAGS "--cfg cpp_rust_unstable") +endif() + if(APPLE) set(INSTALL_NAME_SHARED "libscylla-cpp-driver.${PROJECT_VERSION_STRING}.dylib") set(INSTALL_NAME_SHARED_SYMLINK_VERSION "libscylla-cpp-driver.${PROJECT_VERSION_MAJOR}.dylib") From ceca06a405858f4c36cdc3c84b88918b4d5a1e40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Tue, 18 Mar 2025 14:29:19 +0100 Subject: [PATCH 2/7] iterator: make CassResultIterator an enum It is to distinguish between the two types of iterators that we have: - iterator over non-rows result (always returning null) - iterator over rows result In one of the next commits, we will modify `CassRowsResultIterator`, so it holds `TypedRowIterator` instead of reference to CassRowsResult. This change would be necessary then anyway, so it's better to do it now and reduce the noise during review in later commit. --- scylla-rust-wrapper/src/iterator.rs | 63 ++++++++++++++++------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/scylla-rust-wrapper/src/iterator.rs b/scylla-rust-wrapper/src/iterator.rs index e591fcf8..c19c6f4c 100644 --- a/scylla-rust-wrapper/src/iterator.rs +++ b/scylla-rust-wrapper/src/iterator.rs @@ -17,11 +17,16 @@ pub use crate::cass_iterator_types::CassIteratorType; use std::os::raw::c_char; -pub struct CassResultIterator<'result> { - result: &'result CassResult, +pub struct CassRowsResultIterator<'result> { + result: &'result CassRowsResult, position: Option, } +pub enum CassResultIterator<'result> { + NonRows, + Rows(CassRowsResultIterator<'result>), +} + pub struct CassRowIterator<'result> { row: &'result CassRow, position: Option, @@ -158,16 +163,17 @@ pub unsafe extern "C" fn cass_iterator_next( match &mut iter { CassIterator::Result(result_iterator) => { - let new_pos: usize = result_iterator.position.map_or(0, |prev_pos| prev_pos + 1); + let CassResultIterator::Rows(rows_result_iterator) = result_iterator else { + return false as cass_bool_t; + }; - result_iterator.position = Some(new_pos); + let new_pos: usize = rows_result_iterator + .position + .map_or(0, |prev_pos| prev_pos + 1); - match &result_iterator.result.kind { - CassResultKind::Rows(rows_result) => { - (new_pos < rows_result.rows.len()) as cass_bool_t - } - CassResultKind::NonRows => false as cass_bool_t, - } + rows_result_iterator.position = Some(new_pos); + + (new_pos < rows_result_iterator.result.rows.len()) as cass_bool_t } CassIterator::Row(row_iterator) => { let new_pos: usize = row_iterator.position.map_or(0, |prev_pos| prev_pos + 1); @@ -265,25 +271,21 @@ pub unsafe extern "C" fn cass_iterator_get_row<'result>( let iter = BoxFFI::as_ref(iterator).unwrap(); // Defined only for result iterator, for other types should return null - if let CassIterator::Result(result_iterator) = iter { - let iter_position = match result_iterator.position { - Some(pos) => pos, - None => return RefFFI::null(), - }; - - let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result_iterator.result.kind else { - return RefFFI::null(); - }; + let CassIterator::Result(CassResultIterator::Rows(rows_result_iterator)) = iter else { + return RefFFI::null(); + }; - let row: &CassRow = match rows.get(iter_position) { - Some(row) => row, - None => return RefFFI::null(), - }; + let iter_position = match rows_result_iterator.position { + Some(pos) => pos, + None => return RefFFI::null(), + }; - return RefFFI::as_ptr(row); - } + let row: &CassRow = match rows_result_iterator.result.rows.get(iter_position) { + Some(row) => row, + None => return RefFFI::null(), + }; - RefFFI::null() + RefFFI::as_ptr(row) } #[no_mangle] @@ -644,9 +646,12 @@ pub unsafe extern "C" fn cass_iterator_from_result<'result>( ) -> CassOwnedExclusivePtr, CMut> { let result_from_raw = ArcFFI::as_ref(result).unwrap(); - let iterator = CassResultIterator { - result: result_from_raw, - position: None, + let iterator = match &result_from_raw.kind { + CassResultKind::NonRows => CassResultIterator::NonRows, + CassResultKind::Rows(rows_result) => CassResultIterator::Rows(CassRowsResultIterator { + result: rows_result, + position: None, + }), }; BoxFFI::into_ptr(Box::new(CassIterator::Result(iterator))) From 90f22fda5a29392da67f8bb8615060050eb90cca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Tue, 3 Dec 2024 03:46:40 +0100 Subject: [PATCH 3/7] result: deserialize rows lazily If we look at the CassResult methods, there is no method that requests a CassRow at given index. It means, that we can lazily deserialize rows during iteration (`cass_iterator_from_result`). The only exception is `cass_result_first_row`. The result should hold first deserialized row, and simply return it. We should not deserialize the row each time this method is called. This is exactly what this commit does. It adjusts both `CassRowsResult` and `CassRowsResultIterator`: - CassRowsResult now holds only first row, and all deserialization payload (i.e. metadata and serialized rows). - CassRowsResultIterator from now on holds a TypedRowIterator, instead of reference to CassRowsResult. This semantic is more efficient, and is employed by cpp-driver as well. --- scylla-rust-wrapper/src/iterator.rs | 67 ++++++++++++++--------- scylla-rust-wrapper/src/query_error.rs | 2 +- scylla-rust-wrapper/src/query_result.rs | 72 +++++++++++++------------ 3 files changed, 81 insertions(+), 60 deletions(-) diff --git a/scylla-rust-wrapper/src/iterator.rs b/scylla-rust-wrapper/src/iterator.rs index c19c6f4c..61fbc750 100644 --- a/scylla-rust-wrapper/src/iterator.rs +++ b/scylla-rust-wrapper/src/iterator.rs @@ -1,3 +1,6 @@ +use scylla::deserialize::result::TypedRowIterator; +use scylla::value::Row; + use crate::argconv::{ write_str_to_c, ArcFFI, BoxFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, CassOwnedExclusivePtr, FromBox, RefFFI, FFI, @@ -9,17 +12,19 @@ use crate::metadata::{ }; use crate::query_result::{ cass_value_is_collection, cass_value_item_count, cass_value_type, CassResult, CassResultKind, - CassRow, CassRowsResult, CassValue, Collection, Value, + CassResultMetadata, CassRow, CassValue, Collection, Value, }; use crate::types::{cass_bool_t, size_t}; pub use crate::cass_iterator_types::CassIteratorType; use std::os::raw::c_char; +use std::sync::Arc; pub struct CassRowsResultIterator<'result> { - result: &'result CassRowsResult, - position: Option, + iterator: TypedRowIterator<'result, 'result, Row>, + result_metadata: Arc, + current_row: Option, } pub enum CassResultIterator<'result> { @@ -167,13 +172,25 @@ pub unsafe extern "C" fn cass_iterator_next( return false as cass_bool_t; }; - let new_pos: usize = rows_result_iterator - .position - .map_or(0, |prev_pos| prev_pos + 1); - - rows_result_iterator.position = Some(new_pos); - - (new_pos < rows_result_iterator.result.rows.len()) as cass_bool_t + let new_row = rows_result_iterator + .iterator + .next() + .and_then(|res| match res { + Ok(row) => Some(row), + Err(e) => { + // We have no way to propagate the error (return type is bool). + // Let's at least log the deserialization error. + tracing::error!("Failed to deserialize next row: {e}"); + None + } + }) + .map(|row| { + CassRow::from_row_and_metadata(row, &rows_result_iterator.result_metadata) + }); + + rows_result_iterator.current_row = new_row; + + rows_result_iterator.current_row.is_some() as cass_bool_t } CassIterator::Row(row_iterator) => { let new_pos: usize = row_iterator.position.map_or(0, |prev_pos| prev_pos + 1); @@ -266,7 +283,7 @@ pub unsafe extern "C" fn cass_iterator_next( #[no_mangle] pub unsafe extern "C" fn cass_iterator_get_row<'result>( - iterator: CassBorrowedSharedPtr, CConst>, + iterator: CassBorrowedSharedPtr<'result, CassIterator<'result>, CConst>, ) -> CassBorrowedSharedPtr<'result, CassRow, CConst> { let iter = BoxFFI::as_ref(iterator).unwrap(); @@ -275,17 +292,11 @@ pub unsafe extern "C" fn cass_iterator_get_row<'result>( return RefFFI::null(); }; - let iter_position = match rows_result_iterator.position { - Some(pos) => pos, - None => return RefFFI::null(), - }; - - let row: &CassRow = match rows_result_iterator.result.rows.get(iter_position) { - Some(row) => row, - None => return RefFFI::null(), - }; - - RefFFI::as_ptr(row) + rows_result_iterator + .current_row + .as_ref() + .map(RefFFI::as_ptr) + .unwrap_or(RefFFI::null()) } #[no_mangle] @@ -648,10 +659,14 @@ pub unsafe extern "C" fn cass_iterator_from_result<'result>( let iterator = match &result_from_raw.kind { CassResultKind::NonRows => CassResultIterator::NonRows, - CassResultKind::Rows(rows_result) => CassResultIterator::Rows(CassRowsResultIterator { - result: rows_result, - position: None, - }), + CassResultKind::Rows(cass_rows_result) => { + CassResultIterator::Rows(CassRowsResultIterator { + // unwrap: Row always passes the typecheck. + iterator: cass_rows_result.raw_rows.rows_iter::().unwrap(), + result_metadata: Arc::clone(&cass_rows_result.metadata), + current_row: None, + }) + } }; BoxFFI::into_ptr(Box::new(CassIterator::Result(iterator))) diff --git a/scylla-rust-wrapper/src/query_error.rs b/scylla-rust-wrapper/src/query_error.rs index a44d69a3..5a08d06e 100644 --- a/scylla-rust-wrapper/src/query_error.rs +++ b/scylla-rust-wrapper/src/query_error.rs @@ -15,7 +15,7 @@ pub enum CassErrorResult { Query(#[from] ExecutionError), #[error(transparent)] ResultMetadataLazyDeserialization(#[from] ResultMetadataAndRowsCountParseError), - #[error("Failed to deserialize rows: {0}")] + #[error("Failed to deserialize first row: {0}")] Deserialization(#[from] DeserializationError), } diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index 8018dc44..9368d1d1 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -10,6 +10,7 @@ use crate::query_result::Value::{CollectionValue, RegularValue}; use crate::types::*; use crate::uuid::CassUuid; use scylla::errors::IntoRowsResultError; +use scylla::frame::response::result::DeserializedMetadataAndRawRows; use scylla::response::query_result::{ColumnSpecs, QueryResult}; use scylla::response::PagingStateResponse; use scylla::value::{CqlValue, Row}; @@ -24,7 +25,8 @@ pub enum CassResultKind { } pub struct CassRowsResult { - pub rows: Vec, + pub raw_rows: DeserializedMetadataAndRawRows, + pub first_row: Option, pub metadata: Arc, } @@ -55,21 +57,21 @@ impl CassResult { )) }); - // For now, let's eagerly deserialize rows into type-erased CqlValues. - // Lazy deserialization requires a non-trivial refactor that needs to be discussed. - let rows: Vec = rows_result - .rows::() - // SAFETY: this unwrap is safe, because `Row` always - // passes the typecheck, no matter the type of the columns. + let (raw_rows, tracing_id, _) = rows_result.into_inner(); + let first_row = raw_rows + .rows_iter::() + // unwrap: Row always passes the typecheck. .unwrap() - .collect::>()?; - let cass_rows = create_cass_rows_from_rows(rows, &metadata); + .next() + .transpose()? + .map(|row| CassRow::from_row_and_metadata(row, &metadata)); let cass_result = CassResult { - tracing_id: rows_result.tracing_id(), + tracing_id, paging_state_response, kind: CassResultKind::Rows(CassRowsResult { - rows: cass_rows, + raw_rows, + first_row, metadata, }), }; @@ -128,16 +130,13 @@ impl FFI for CassRow { type Origin = FromRef; } -pub fn create_cass_rows_from_rows( - rows: Vec, - metadata: &Arc, -) -> Vec { - rows.into_iter() - .map(|r| CassRow { - columns: create_cass_row_columns(r, metadata), - result_metadata: metadata.clone(), - }) - .collect() +impl CassRow { + pub fn from_row_and_metadata(row: Row, metadata: &Arc) -> Self { + Self { + columns: create_cass_row_columns(row, metadata), + result_metadata: Arc::clone(metadata), + } + } } pub enum Value { @@ -799,11 +798,11 @@ pub unsafe extern "C" fn cass_result_row_count( ) -> size_t { let result = ArcFFI::as_ref(result_raw).unwrap(); - let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result.kind else { + let CassResultKind::Rows(CassRowsResult { raw_rows, .. }) = &result.kind else { return 0; }; - rows.len() as size_t + raw_rows.rows_count() as size_t } #[no_mangle] @@ -825,11 +824,14 @@ pub unsafe extern "C" fn cass_result_first_row( ) -> CassBorrowedSharedPtr { let result = ArcFFI::as_ref(result_raw).unwrap(); - let CassResultKind::Rows(CassRowsResult { rows, .. }) = &result.kind else { + let CassResultKind::Rows(CassRowsResult { first_row, .. }) = &result.kind else { return RefFFI::null(); }; - rows.first().map(RefFFI::as_ptr).unwrap_or(RefFFI::null()) + first_row + .as_ref() + .map(RefFFI::as_ptr) + .unwrap_or(RefFFI::null()) } #[no_mangle] @@ -867,7 +869,7 @@ pub unsafe extern "C" fn cass_result_paging_state_token( #[cfg(test)] mod tests { use scylla::cluster::metadata::{CollectionType, ColumnType, NativeType}; - use scylla::frame::response::result::{ColumnSpec, TableSpec}; + use scylla::frame::response::result::{ColumnSpec, DeserializedMetadataAndRawRows, TableSpec}; use scylla::response::query_result::ColumnSpecs; use scylla::response::PagingStateResponse; use scylla::value::{CqlValue, Row}; @@ -885,8 +887,8 @@ mod tests { use std::{ffi::c_char, ptr::addr_of_mut, sync::Arc}; use super::{ - cass_result_column_count, cass_result_column_type, create_cass_rows_from_rows, - CassBorrowedSharedPtr, CassResult, CassResultKind, CassResultMetadata, CassRowsResult, + cass_result_column_count, cass_result_column_type, CassBorrowedSharedPtr, CassResult, + CassResultKind, CassResultMetadata, CassRow, CassRowsResult, }; fn col_spec(name: &'static str, typ: ColumnType<'static>) -> ColumnSpec<'static> { @@ -909,8 +911,8 @@ mod tests { ), ]))); - let rows = create_cass_rows_from_rows( - vec![Row { + let first_row = Some(CassRow::from_row_and_metadata( + Row { columns: vec![ Some(CqlValue::BigInt(42)), None, @@ -920,14 +922,18 @@ mod tests { CqlValue::Float(9999.9999), ])), ], - }], + }, &metadata, - ); + )); CassResult { tracing_id: None, paging_state_response: PagingStateResponse::NoMorePages, - kind: CassResultKind::Rows(CassRowsResult { rows, metadata }), + kind: CassResultKind::Rows(CassRowsResult { + raw_rows: DeserializedMetadataAndRawRows::mock_empty(), + first_row, + metadata, + }), } } From a33b42fc0db132f7c2dd8ede064a6acd7d3ed839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Wed, 2 Apr 2025 04:37:30 +0200 Subject: [PATCH 4/7] cargo: add yoke dependency In the later commit, this PR introduces self-borrowing to the codebase. As of now, it's not possible to express it using safe Rust. Luckily, there is a yoke crate that will allow us to do that. --- scylla-rust-wrapper/Cargo.lock | 29 +++++++++++++++++++++++++++-- scylla-rust-wrapper/Cargo.toml | 1 + 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/scylla-rust-wrapper/Cargo.lock b/scylla-rust-wrapper/Cargo.lock index 1d2d99c6..94e2c542 100644 --- a/scylla-rust-wrapper/Cargo.lock +++ b/scylla-rust-wrapper/Cargo.lock @@ -1148,6 +1148,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", + "yoke 0.8.0", ] [[package]] @@ -1167,7 +1168,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "uuid", - "yoke", + "yoke 0.7.5", ] [[package]] @@ -1799,7 +1800,19 @@ checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", - "yoke-derive", + "yoke-derive 0.7.5", + "zerofrom", +] + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive 0.8.0", "zerofrom", ] @@ -1815,6 +1828,18 @@ dependencies = [ "synstructure", ] +[[package]] +name = "yoke-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.99", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/scylla-rust-wrapper/Cargo.toml b/scylla-rust-wrapper/Cargo.toml index ed850b85..26b3f783 100644 --- a/scylla-rust-wrapper/Cargo.toml +++ b/scylla-rust-wrapper/Cargo.toml @@ -26,6 +26,7 @@ tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } tracing = "0.1.37" futures = "0.3" thiserror = "1.0" +yoke = { version = "0.8.0", features = ["derive"] } [build-dependencies] bindgen = "0.65" From 734cb41433c424bc36d84fc8a1dd50c3adb8cca1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Tue, 18 Mar 2025 14:10:21 +0100 Subject: [PATCH 5/7] row: borrow metadata from result, instead of cloning an Arc Because of that, we need to introduce a lifetime parameter to CassRow. CassRow will from now on borrow metadata from CassResult (CassRowsResult). OTOH, notice that CassRowsResult need to hold the deserialized first row. This is because `cass_result_first_row` method exists. We don't want to deserialize the row each time this method is called. Because of that, the self-borrowing appears in our codebase. We have a CassRowsResult, which holds a CassRow that borrows CassResultMetadata from the CassRowsResult. It's not possible to express it in safe Rust, thus we make use of yoke crate which wraps unsafe code and exposes safe abstractions on top of it. --- scylla-rust-wrapper/src/execution_error.rs | 299 +++++++++++++++++++++ scylla-rust-wrapper/src/iterator.rs | 15 +- scylla-rust-wrapper/src/query_result.rs | 122 +++++++-- 3 files changed, 400 insertions(+), 36 deletions(-) create mode 100644 scylla-rust-wrapper/src/execution_error.rs diff --git a/scylla-rust-wrapper/src/execution_error.rs b/scylla-rust-wrapper/src/execution_error.rs new file mode 100644 index 00000000..a7ddf5d2 --- /dev/null +++ b/scylla-rust-wrapper/src/execution_error.rs @@ -0,0 +1,299 @@ +use crate::argconv::*; +use crate::cass_error::*; +use crate::cass_error_types::CassWriteType; +use crate::cass_types::CassConsistency; +use crate::types::*; +use scylla::deserialize::DeserializationError; +use scylla::errors::{DbError, ExecutionError, RequestAttemptError, WriteType}; +use scylla::frame::frame_errors::ResultMetadataAndRowsCountParseError; +use scylla::statement::Consistency; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum CassErrorResult { + #[error(transparent)] + Execution(#[from] ExecutionError), + #[error(transparent)] + ResultMetadataLazyDeserialization(#[from] ResultMetadataAndRowsCountParseError), + #[error("Failed to deserialize first row: {0}")] + Deserialization(#[from] DeserializationError), +} + +impl FFI for CassErrorResult { + type Origin = FromArc; +} + +impl From for CassConsistency { + fn from(c: Consistency) -> CassConsistency { + match c { + Consistency::Any => CassConsistency::CASS_CONSISTENCY_ANY, + Consistency::One => CassConsistency::CASS_CONSISTENCY_ONE, + Consistency::Two => CassConsistency::CASS_CONSISTENCY_TWO, + Consistency::Three => CassConsistency::CASS_CONSISTENCY_THREE, + Consistency::Quorum => CassConsistency::CASS_CONSISTENCY_QUORUM, + Consistency::All => CassConsistency::CASS_CONSISTENCY_ALL, + Consistency::LocalQuorum => CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM, + Consistency::EachQuorum => CassConsistency::CASS_CONSISTENCY_EACH_QUORUM, + Consistency::LocalOne => CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, + Consistency::Serial => CassConsistency::CASS_CONSISTENCY_SERIAL, + Consistency::LocalSerial => CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL, + } + } +} + +impl From<&WriteType> for CassWriteType { + fn from(c: &WriteType) -> CassWriteType { + match c { + WriteType::Simple => CassWriteType::CASS_WRITE_TYPE_SIMPLE, + WriteType::Batch => CassWriteType::CASS_WRITE_TYPE_BATCH, + WriteType::UnloggedBatch => CassWriteType::CASS_WRITE_TYPE_UNLOGGED_BATCH, + WriteType::Counter => CassWriteType::CASS_WRITE_TYPE_COUNTER, + WriteType::BatchLog => CassWriteType::CASS_WRITE_TYPE_BATCH_LOG, + WriteType::Cas => CassWriteType::CASS_WRITE_TYPE_CAS, + WriteType::View => CassWriteType::CASS_WRITE_TYPE_VIEW, + WriteType::Cdc => CassWriteType::CASS_WRITE_TYPE_CDC, + WriteType::Other(_) => CassWriteType::CASS_WRITE_TYPE_UNKNOWN, + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_free( + error_result: CassOwnedSharedPtr, +) { + ArcFFI::free(error_result); +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_code( + error_result: CassBorrowedSharedPtr, +) -> CassError { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + error_result.to_cass_error() +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_consistency( + error_result: CassBorrowedSharedPtr, +) -> CassConsistency { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::Unavailable { consistency, .. }, _) + | RequestAttemptError::DbError(DbError::ReadTimeout { consistency, .. }, _) + | RequestAttemptError::DbError(DbError::WriteTimeout { consistency, .. }, _) + | RequestAttemptError::DbError(DbError::ReadFailure { consistency, .. }, _) + | RequestAttemptError::DbError(DbError::WriteFailure { consistency, .. }, _), + )) => CassConsistency::from(*consistency), + _ => CassConsistency::CASS_CONSISTENCY_UNKNOWN, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_responses_received( + error_result: CassBorrowedSharedPtr, +) -> cass_int32_t { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError(attempt_error)) => { + match attempt_error { + RequestAttemptError::DbError(DbError::Unavailable { alive, .. }, _) => *alive, + RequestAttemptError::DbError(DbError::ReadTimeout { received, .. }, _) => *received, + RequestAttemptError::DbError(DbError::WriteTimeout { received, .. }, _) => { + *received + } + RequestAttemptError::DbError(DbError::ReadFailure { received, .. }, _) => *received, + RequestAttemptError::DbError(DbError::WriteFailure { received, .. }, _) => { + *received + } + _ => -1, + } + } + _ => -1, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_responses_required( + error_result: CassBorrowedSharedPtr, +) -> cass_int32_t { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError(attempt_error)) => { + match attempt_error { + RequestAttemptError::DbError(DbError::Unavailable { required, .. }, _) => *required, + RequestAttemptError::DbError(DbError::ReadTimeout { required, .. }, _) => *required, + RequestAttemptError::DbError(DbError::WriteTimeout { required, .. }, _) => { + *required + } + RequestAttemptError::DbError(DbError::ReadFailure { required, .. }, _) => *required, + RequestAttemptError::DbError(DbError::WriteFailure { required, .. }, _) => { + *required + } + _ => -1, + } + } + _ => -1, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_num_failures( + error_result: CassBorrowedSharedPtr, +) -> cass_int32_t { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::ReadFailure { numfailures, .. }, _), + )) => *numfailures, + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::WriteFailure { numfailures, .. }, _), + )) => *numfailures, + _ => -1, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_data_present( + error_result: CassBorrowedSharedPtr, +) -> cass_bool_t { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::ReadTimeout { data_present, .. }, _), + )) => { + if *data_present { + cass_true + } else { + cass_false + } + } + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::ReadFailure { data_present, .. }, _), + )) => { + if *data_present { + cass_true + } else { + cass_false + } + } + _ => cass_false, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_write_type( + error_result: CassBorrowedSharedPtr, +) -> CassWriteType { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::WriteTimeout { write_type, .. }, _), + )) => CassWriteType::from(write_type), + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::WriteFailure { write_type, .. }, _), + )) => CassWriteType::from(write_type), + _ => CassWriteType::CASS_WRITE_TYPE_UNKNOWN, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_keyspace( + error_result: CassBorrowedSharedPtr, + c_keyspace: *mut *const ::std::os::raw::c_char, + c_keyspace_len: *mut size_t, +) -> CassError { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::AlreadyExists { keyspace, .. }, _), + )) => { + unsafe { write_str_to_c(keyspace.as_str(), c_keyspace, c_keyspace_len) }; + CassError::CASS_OK + } + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::FunctionFailure { keyspace, .. }, _), + )) => { + unsafe { write_str_to_c(keyspace.as_str(), c_keyspace, c_keyspace_len) }; + CassError::CASS_OK + } + _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_table( + error_result: CassBorrowedSharedPtr, + c_table: *mut *const ::std::os::raw::c_char, + c_table_len: *mut size_t, +) -> CassError { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::AlreadyExists { table, .. }, _), + )) => { + unsafe { write_str_to_c(table.as_str(), c_table, c_table_len) }; + CassError::CASS_OK + } + _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_function( + error_result: CassBorrowedSharedPtr, + c_function: *mut *const ::std::os::raw::c_char, + c_function_len: *mut size_t, +) -> CassError { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::FunctionFailure { function, .. }, _), + )) => { + unsafe { write_str_to_c(function.as_str(), c_function, c_function_len) }; + CassError::CASS_OK + } + _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_num_arg_types( + error_result: CassBorrowedSharedPtr, +) -> size_t { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::FunctionFailure { arg_types, .. }, _), + )) => arg_types.len() as size_t, + _ => 0, + } +} + +#[no_mangle] +pub unsafe extern "C" fn cass_error_result_arg_type( + error_result: CassBorrowedSharedPtr, + index: size_t, + arg_type: *mut *const ::std::os::raw::c_char, + arg_type_length: *mut size_t, +) -> CassError { + let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); + match error_result { + CassErrorResult::Execution(ExecutionError::LastAttemptError( + RequestAttemptError::DbError(DbError::FunctionFailure { arg_types, .. }, _), + )) => { + if index >= arg_types.len() as size_t { + return CassError::CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS; + } + unsafe { + write_str_to_c( + arg_types[index as usize].as_str(), + arg_type, + arg_type_length, + ) + }; + CassError::CASS_OK + } + _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, + } +} diff --git a/scylla-rust-wrapper/src/iterator.rs b/scylla-rust-wrapper/src/iterator.rs index 61fbc750..039c2c74 100644 --- a/scylla-rust-wrapper/src/iterator.rs +++ b/scylla-rust-wrapper/src/iterator.rs @@ -19,12 +19,11 @@ use crate::types::{cass_bool_t, size_t}; pub use crate::cass_iterator_types::CassIteratorType; use std::os::raw::c_char; -use std::sync::Arc; pub struct CassRowsResultIterator<'result> { iterator: TypedRowIterator<'result, 'result, Row>, - result_metadata: Arc, - current_row: Option, + result_metadata: &'result CassResultMetadata, + current_row: Option>, } pub enum CassResultIterator<'result> { @@ -33,7 +32,7 @@ pub enum CassResultIterator<'result> { } pub struct CassRowIterator<'result> { - row: &'result CassRow, + row: &'result CassRow<'result>, position: Option, } @@ -185,7 +184,7 @@ pub unsafe extern "C" fn cass_iterator_next( } }) .map(|row| { - CassRow::from_row_and_metadata(row, &rows_result_iterator.result_metadata) + CassRow::from_row_and_metadata(row, rows_result_iterator.result_metadata) }); rows_result_iterator.current_row = new_row; @@ -284,7 +283,7 @@ pub unsafe extern "C" fn cass_iterator_next( #[no_mangle] pub unsafe extern "C" fn cass_iterator_get_row<'result>( iterator: CassBorrowedSharedPtr<'result, CassIterator<'result>, CConst>, -) -> CassBorrowedSharedPtr<'result, CassRow, CConst> { +) -> CassBorrowedSharedPtr<'result, CassRow<'result>, CConst> { let iter = BoxFFI::as_ref(iterator).unwrap(); // Defined only for result iterator, for other types should return null @@ -663,7 +662,7 @@ pub unsafe extern "C" fn cass_iterator_from_result<'result>( CassResultIterator::Rows(CassRowsResultIterator { // unwrap: Row always passes the typecheck. iterator: cass_rows_result.raw_rows.rows_iter::().unwrap(), - result_metadata: Arc::clone(&cass_rows_result.metadata), + result_metadata: &cass_rows_result.metadata, current_row: None, }) } @@ -675,7 +674,7 @@ pub unsafe extern "C" fn cass_iterator_from_result<'result>( #[no_mangle] #[allow(clippy::needless_lifetimes)] pub unsafe extern "C" fn cass_iterator_from_row<'result>( - row: CassBorrowedSharedPtr<'result, CassRow, CConst>, + row: CassBorrowedSharedPtr<'result, CassRow<'result>, CConst>, ) -> CassOwnedExclusivePtr, CMut> { let row_from_raw = RefFFI::as_ref(row).unwrap(); diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index 9368d1d1..7b12b67b 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -9,6 +9,7 @@ use crate::query_error::CassErrorResult; use crate::query_result::Value::{CollectionValue, RegularValue}; use crate::types::*; use crate::uuid::CassUuid; +use row_with_self_borrowed_metadata::RowWithSelfBorrowedMetadata; use scylla::errors::IntoRowsResultError; use scylla::frame::response::result::DeserializedMetadataAndRawRows; use scylla::response::query_result::{ColumnSpecs, QueryResult}; @@ -26,7 +27,9 @@ pub enum CassResultKind { pub struct CassRowsResult { pub raw_rows: DeserializedMetadataAndRawRows, - pub first_row: Option, + // 'static, because of self-borrowing. + // CassRow borrows `metadata` field. + pub first_row: Option, pub metadata: Arc, } @@ -58,13 +61,10 @@ impl CassResult { }); let (raw_rows, tracing_id, _) = rows_result.into_inner(); - let first_row = raw_rows - .rows_iter::() - // unwrap: Row always passes the typecheck. - .unwrap() - .next() - .transpose()? - .map(|row| CassRow::from_row_and_metadata(row, &metadata)); + let first_row = RowWithSelfBorrowedMetadata::first_from_raw_rows_and_metadata( + &raw_rows, + Arc::clone(&metadata), + )?; let cass_result = CassResult { tracing_id, @@ -121,20 +121,84 @@ impl CassResultMetadata { /// The lifetime of CassRow is bound to CassResult. /// It will be freed, when CassResult is freed.(see #[cass_result_free]) -pub struct CassRow { +pub struct CassRow<'result> { pub columns: Vec, - pub result_metadata: Arc, + pub result_metadata: &'result CassResultMetadata, } -impl FFI for CassRow { +impl FFI for CassRow<'_> { type Origin = FromRef; } -impl CassRow { - pub fn from_row_and_metadata(row: Row, metadata: &Arc) -> Self { +impl<'result> CassRow<'result> { + pub fn from_row_and_metadata(row: Row, result_metadata: &'result CassResultMetadata) -> Self { Self { - columns: create_cass_row_columns(row, metadata), - result_metadata: Arc::clone(metadata), + columns: create_cass_row_columns(row, result_metadata), + result_metadata, + } + } +} + +/// Module defining [`RowWithSelfBorrowedMetadata`] struct. +/// The purpose of this module is so the `query_result` module does not directly depend on `yoke`. +mod row_with_self_borrowed_metadata { + use std::sync::Arc; + + use scylla::frame::response::result::DeserializedMetadataAndRawRows; + use scylla::value::Row; + use yoke::{Yoke, Yokeable}; + + use crate::query_error::CassErrorResult; + + use super::{CassResultMetadata, CassRow}; + + /// A simple wrapper over CassRow. + /// Needed, so we can implement Yokeable for it, instead of implementing it for CassRow. + #[derive(Yokeable)] + struct CassRowWrapper<'result>(CassRow<'result>); + + /// A wrapper over struct which self-borrows the metadata allocated using Arc. + /// + /// It's needed to safely express the relationship between [`CassRowsResult`][super::CassRowsResult] + /// and its `first_row` field. The relationship is as follows: + /// 1. `CassRowsResult` owns `metadata` field, which is an `Arc`. + /// 2. `CassRowsResult` owns the row (`first_row`) + /// 3. `CassRow` borrows `metadata` field (as a reference) + /// + /// This struct is a shared owner of the metadata, and self-borrows the metadata + /// to the `CassRow` it contains. + pub struct RowWithSelfBorrowedMetadata(Yoke, Arc>); + + impl RowWithSelfBorrowedMetadata { + /// Constructs [`RowWithSelfBorrowedMetadata`] based on the first row from `raw_rows`. + pub(super) fn first_from_raw_rows_and_metadata( + raw_rows: &DeserializedMetadataAndRawRows, + metadata: Arc, + ) -> Result, CassErrorResult> { + let row = raw_rows + .rows_iter::() + // unwrap: Row always passes the typecheck. + .unwrap() + .next() + .transpose()? + .map(|row: Row| Self::new_from_row_and_metadata(row, metadata)); + + Ok(row) + } + + pub(super) fn row(&self) -> &CassRow<'_> { + &self.0.get().0 + } + + pub(super) fn new_from_row_and_metadata( + row: Row, + metadata: Arc, + ) -> Self { + let yoke = Yoke::attach_to_cart(metadata, |metadata_ref| { + CassRowWrapper(CassRow::from_row_and_metadata(row, metadata_ref)) + }); + + Self(yoke) } } } @@ -165,7 +229,7 @@ impl FFI for CassValue { type Origin = FromRef; } -fn create_cass_row_columns(row: Row, metadata: &Arc) -> Vec { +fn create_cass_row_columns(row: Row, metadata: &CassResultMetadata) -> Vec { row.columns .into_iter() .zip(metadata.col_specs.iter()) @@ -297,10 +361,10 @@ unsafe fn result_has_more_pages(result: &CassBorrowedSharedPtr, +pub unsafe extern "C" fn cass_row_get_column<'result>( + row_raw: CassBorrowedSharedPtr<'result, CassRow<'result>, CConst>, index: size_t, -) -> CassBorrowedSharedPtr { +) -> CassBorrowedSharedPtr<'result, CassValue, CConst> { let row: &CassRow = RefFFI::as_ref(row_raw).unwrap(); let index_usize: usize = index.try_into().unwrap(); @@ -313,10 +377,10 @@ pub unsafe extern "C" fn cass_row_get_column( } #[no_mangle] -pub unsafe extern "C" fn cass_row_get_column_by_name( - row: CassBorrowedSharedPtr, +pub unsafe extern "C" fn cass_row_get_column_by_name<'result>( + row: CassBorrowedSharedPtr<'result, CassRow<'result>, CConst>, name: *const c_char, -) -> CassBorrowedSharedPtr { +) -> CassBorrowedSharedPtr<'result, CassValue, CConst> { let name_str = unsafe { ptr_to_cstr(name) }.unwrap(); let name_length = name_str.len(); @@ -324,11 +388,11 @@ pub unsafe extern "C" fn cass_row_get_column_by_name( } #[no_mangle] -pub unsafe extern "C" fn cass_row_get_column_by_name_n( - row: CassBorrowedSharedPtr, +pub unsafe extern "C" fn cass_row_get_column_by_name_n<'result>( + row: CassBorrowedSharedPtr<'result, CassRow<'result>, CConst>, name: *const c_char, name_length: size_t, -) -> CassBorrowedSharedPtr { +) -> CassBorrowedSharedPtr<'result, CassValue, CConst> { let row_from_raw = RefFFI::as_ref(row).unwrap(); let mut name_str = unsafe { ptr_to_cstr_n(name, name_length).unwrap() }; let mut is_case_sensitive = false; @@ -830,6 +894,7 @@ pub unsafe extern "C" fn cass_result_first_row( first_row .as_ref() + .map(RowWithSelfBorrowedMetadata::row) .map(RefFFI::as_ptr) .unwrap_or(RefFFI::null()) } @@ -886,9 +951,10 @@ mod tests { }; use std::{ffi::c_char, ptr::addr_of_mut, sync::Arc}; + use super::row_with_self_borrowed_metadata::RowWithSelfBorrowedMetadata; use super::{ cass_result_column_count, cass_result_column_type, CassBorrowedSharedPtr, CassResult, - CassResultKind, CassResultMetadata, CassRow, CassRowsResult, + CassResultKind, CassResultMetadata, CassRowsResult, }; fn col_spec(name: &'static str, typ: ColumnType<'static>) -> ColumnSpec<'static> { @@ -911,7 +977,7 @@ mod tests { ), ]))); - let first_row = Some(CassRow::from_row_and_metadata( + let first_row = Some(RowWithSelfBorrowedMetadata::new_from_row_and_metadata( Row { columns: vec![ Some(CqlValue::BigInt(42)), @@ -923,7 +989,7 @@ mod tests { ])), ], }, - &metadata, + Arc::clone(&metadata), )); CassResult { From 95b3bc79fa82162bb375281e1040ba0e7f2e81a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Wed, 2 Apr 2025 04:55:28 +0200 Subject: [PATCH 6/7] iterator: cleanup cass_iterator_next method I extracted the logic for each iterator to corresponding methods. --- scylla-rust-wrapper/src/iterator.rs | 226 +++++++++++++++------------- 1 file changed, 122 insertions(+), 104 deletions(-) diff --git a/scylla-rust-wrapper/src/iterator.rs b/scylla-rust-wrapper/src/iterator.rs index 039c2c74..7c3f1de6 100644 --- a/scylla-rust-wrapper/src/iterator.rs +++ b/scylla-rust-wrapper/src/iterator.rs @@ -31,53 +31,159 @@ pub enum CassResultIterator<'result> { Rows(CassRowsResultIterator<'result>), } +impl CassResultIterator<'_> { + fn next(&mut self) -> bool { + let CassResultIterator::Rows(rows_result_iterator) = self else { + return false; + }; + + let new_row = rows_result_iterator + .iterator + .next() + .and_then(|res| match res { + Ok(row) => Some(row), + Err(e) => { + // We have no way to propagate the error (return type is bool). + // Let's at least log the deserialization error. + tracing::error!("Failed to deserialize next row: {e}"); + None + } + }) + .map(|row| CassRow::from_row_and_metadata(row, rows_result_iterator.result_metadata)); + + rows_result_iterator.current_row = new_row; + + rows_result_iterator.current_row.is_some() + } +} + pub struct CassRowIterator<'result> { row: &'result CassRow<'result>, position: Option, } +impl CassRowIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.row.columns.len() + } +} + pub struct CassCollectionIterator<'result> { value: &'result CassValue, count: u64, position: Option, } +impl CassCollectionIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count.try_into().unwrap() + } +} + pub struct CassMapIterator<'result> { value: &'result CassValue, count: u64, position: Option, } +impl CassMapIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count.try_into().unwrap() + } +} + pub struct CassUdtIterator<'result> { value: &'result CassValue, count: u64, position: Option, } +impl CassUdtIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count.try_into().unwrap() + } +} + pub struct CassSchemaMetaIterator<'schema> { value: &'schema CassSchemaMeta, count: usize, position: Option, } +impl CassSchemaMetaIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count + } +} + pub struct CassKeyspaceMetaIterator<'schema> { value: &'schema CassKeyspaceMeta, count: usize, position: Option, } +impl CassKeyspaceMetaIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count + } +} + pub struct CassTableMetaIterator<'schema> { value: &'schema CassTableMeta, count: usize, position: Option, } +impl CassTableMetaIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count + } +} + pub struct CassViewMetaIterator<'schema> { value: &'schema CassMaterializedViewMeta, count: usize, position: Option, } +impl CassViewMetaIterator<'_> { + fn next(&mut self) -> bool { + let new_pos: usize = self.position.map_or(0, |prev_pos| prev_pos + 1); + + self.position = Some(new_pos); + + new_pos < self.count + } +} + /// An iterator over columns metadata. /// Can be constructed from either table ([`cass_iterator_columns_from_table_meta()`]) /// or view metadata ([`cass_iterator_columns_from_materialized_view_meta()`]). @@ -165,119 +271,31 @@ pub unsafe extern "C" fn cass_iterator_next( ) -> cass_bool_t { let mut iter = BoxFFI::as_mut_ref(iterator).unwrap(); - match &mut iter { - CassIterator::Result(result_iterator) => { - let CassResultIterator::Rows(rows_result_iterator) = result_iterator else { - return false as cass_bool_t; - }; - - let new_row = rows_result_iterator - .iterator - .next() - .and_then(|res| match res { - Ok(row) => Some(row), - Err(e) => { - // We have no way to propagate the error (return type is bool). - // Let's at least log the deserialization error. - tracing::error!("Failed to deserialize next row: {e}"); - None - } - }) - .map(|row| { - CassRow::from_row_and_metadata(row, rows_result_iterator.result_metadata) - }); - - rows_result_iterator.current_row = new_row; - - rows_result_iterator.current_row.is_some() as cass_bool_t - } - CassIterator::Row(row_iterator) => { - let new_pos: usize = row_iterator.position.map_or(0, |prev_pos| prev_pos + 1); - - row_iterator.position = Some(new_pos); - - (new_pos < row_iterator.row.columns.len()) as cass_bool_t - } + let result = match &mut iter { + CassIterator::Result(result_iterator) => result_iterator.next(), + CassIterator::Row(row_iterator) => row_iterator.next(), CassIterator::Collection(collection_iterator) - | CassIterator::Tuple(collection_iterator) => { - let new_pos: usize = collection_iterator - .position - .map_or(0, |prev_pos| prev_pos + 1); - - collection_iterator.position = Some(new_pos); - - (new_pos < collection_iterator.count.try_into().unwrap()) as cass_bool_t - } - CassIterator::Map(map_iterator) => { - let new_pos: usize = map_iterator.position.map_or(0, |prev_pos| prev_pos + 1); - - map_iterator.position = Some(new_pos); - - (new_pos < map_iterator.count.try_into().unwrap()) as cass_bool_t - } - CassIterator::UdtFields(udt_iterator) => { - let new_pos: usize = udt_iterator.position.map_or(0, |prev_pos| prev_pos + 1); - - udt_iterator.position = Some(new_pos); - - (new_pos < udt_iterator.count.try_into().unwrap()) as cass_bool_t - } - CassIterator::KeyspacesMeta(schema_meta_iterator) => { - let new_pos: usize = schema_meta_iterator - .position - .map_or(0, |prev_pos| prev_pos + 1); - - schema_meta_iterator.position = Some(new_pos); - - (new_pos < schema_meta_iterator.count) as cass_bool_t - } - CassIterator::TablesMeta(keyspace_meta_iterator) => { - let new_pos: usize = keyspace_meta_iterator - .position - .map_or(0, |prev_pos| prev_pos + 1); - - keyspace_meta_iterator.position = Some(new_pos); - - (new_pos < keyspace_meta_iterator.count) as cass_bool_t - } - CassIterator::UserTypes(keyspace_meta_iterator) => { - let new_pos: usize = keyspace_meta_iterator - .position - .map_or(0, |prev_pos| prev_pos + 1); - - keyspace_meta_iterator.position = Some(new_pos); - - (new_pos < keyspace_meta_iterator.count) as cass_bool_t - } - CassIterator::MaterializedViewsMeta(CassMaterializedViewsMetaIterator::FromKeyspace( + | CassIterator::Tuple(collection_iterator) => collection_iterator.next(), + CassIterator::Map(map_iterator) => map_iterator.next(), + CassIterator::UdtFields(udt_iterator) => udt_iterator.next(), + CassIterator::KeyspacesMeta(schema_meta_iterator) => schema_meta_iterator.next(), + CassIterator::TablesMeta(keyspace_meta_iterator) + | CassIterator::UserTypes(keyspace_meta_iterator) + | CassIterator::MaterializedViewsMeta(CassMaterializedViewsMetaIterator::FromKeyspace( keyspace_meta_iterator, - )) => { - let new_pos: usize = keyspace_meta_iterator - .position - .map_or(0, |prev_pos| prev_pos + 1); - - keyspace_meta_iterator.position = Some(new_pos); - - (new_pos < keyspace_meta_iterator.count) as cass_bool_t - } + )) => keyspace_meta_iterator.next(), CassIterator::MaterializedViewsMeta(CassMaterializedViewsMetaIterator::FromTable( table_iterator, )) | CassIterator::ColumnsMeta(CassColumnsMetaIterator::FromTable(table_iterator)) => { - let new_pos: usize = table_iterator.position.map_or(0, |prev_pos| prev_pos + 1); - - table_iterator.position = Some(new_pos); - - (new_pos < table_iterator.count) as cass_bool_t + table_iterator.next() } CassIterator::ColumnsMeta(CassColumnsMetaIterator::FromView(view_iterator)) => { - let new_pos: usize = view_iterator.position.map_or(0, |prev_pos| prev_pos + 1); - - view_iterator.position = Some(new_pos); - - (new_pos < view_iterator.count) as cass_bool_t + view_iterator.next() } - } + }; + + result as cass_bool_t } #[no_mangle] From b6e1f8571c4cad07c5ac36398d27b4a012beff65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Wed, 2 Apr 2025 06:29:17 +0200 Subject: [PATCH 7/7] treewide: rename query_error.rs to execution_error.rs QueryError was replaced with ExecutionError in Rust driver 1.0. --- scylla-rust-wrapper/src/cass_error.rs | 4 +- scylla-rust-wrapper/src/future.rs | 2 +- scylla-rust-wrapper/src/lib.rs | 2 +- scylla-rust-wrapper/src/query_error.rs | 311 ------------------------ scylla-rust-wrapper/src/query_result.rs | 4 +- 5 files changed, 6 insertions(+), 317 deletions(-) delete mode 100644 scylla-rust-wrapper/src/query_error.rs diff --git a/scylla-rust-wrapper/src/cass_error.rs b/scylla-rust-wrapper/src/cass_error.rs index 511cedae..c7d8d3e1 100644 --- a/scylla-rust-wrapper/src/cass_error.rs +++ b/scylla-rust-wrapper/src/cass_error.rs @@ -2,7 +2,7 @@ use scylla::errors::*; // Re-export error types. pub(crate) use crate::cass_error_types::{CassError, CassErrorSource}; -use crate::query_error::CassErrorResult; +use crate::execution_error::CassErrorResult; use crate::statement::UnknownNamedParameterError; pub trait ToCassError { @@ -12,7 +12,7 @@ pub trait ToCassError { impl ToCassError for CassErrorResult { fn to_cass_error(&self) -> CassError { match self { - CassErrorResult::Query(query_error) => query_error.to_cass_error(), + CassErrorResult::Execution(execution_error) => execution_error.to_cass_error(), // TODO: // For now let's leave these as LIB_INVALID_DATA. diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 59f2a925..ddbff9ad 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -2,8 +2,8 @@ use crate::argconv::*; use crate::cass_error::CassError; use crate::cass_error::CassErrorMessage; use crate::cass_error::ToCassError; +use crate::execution_error::CassErrorResult; use crate::prepared::CassPrepared; -use crate::query_error::CassErrorResult; use crate::query_result::CassResult; use crate::types::*; use crate::uuid::CassUuid; diff --git a/scylla-rust-wrapper/src/lib.rs b/scylla-rust-wrapper/src/lib.rs index f4dfa31b..014749f5 100644 --- a/scylla-rust-wrapper/src/lib.rs +++ b/scylla-rust-wrapper/src/lib.rs @@ -17,6 +17,7 @@ pub mod cluster; pub mod collection; pub mod date_time; pub mod exec_profile; +pub mod execution_error; mod external; pub mod future; pub mod inet; @@ -26,7 +27,6 @@ mod logging; pub mod metadata; pub mod misc; pub mod prepared; -pub mod query_error; pub mod query_result; pub mod retry_policy; pub mod session; diff --git a/scylla-rust-wrapper/src/query_error.rs b/scylla-rust-wrapper/src/query_error.rs deleted file mode 100644 index 5a08d06e..00000000 --- a/scylla-rust-wrapper/src/query_error.rs +++ /dev/null @@ -1,311 +0,0 @@ -use crate::argconv::*; -use crate::cass_error::*; -use crate::cass_error_types::CassWriteType; -use crate::cass_types::CassConsistency; -use crate::types::*; -use scylla::deserialize::DeserializationError; -use scylla::errors::{DbError, ExecutionError, RequestAttemptError, WriteType}; -use scylla::frame::frame_errors::ResultMetadataAndRowsCountParseError; -use scylla::statement::Consistency; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum CassErrorResult { - #[error(transparent)] - Query(#[from] ExecutionError), - #[error(transparent)] - ResultMetadataLazyDeserialization(#[from] ResultMetadataAndRowsCountParseError), - #[error("Failed to deserialize first row: {0}")] - Deserialization(#[from] DeserializationError), -} - -impl FFI for CassErrorResult { - type Origin = FromArc; -} - -impl From for CassConsistency { - fn from(c: Consistency) -> CassConsistency { - match c { - Consistency::Any => CassConsistency::CASS_CONSISTENCY_ANY, - Consistency::One => CassConsistency::CASS_CONSISTENCY_ONE, - Consistency::Two => CassConsistency::CASS_CONSISTENCY_TWO, - Consistency::Three => CassConsistency::CASS_CONSISTENCY_THREE, - Consistency::Quorum => CassConsistency::CASS_CONSISTENCY_QUORUM, - Consistency::All => CassConsistency::CASS_CONSISTENCY_ALL, - Consistency::LocalQuorum => CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM, - Consistency::EachQuorum => CassConsistency::CASS_CONSISTENCY_EACH_QUORUM, - Consistency::LocalOne => CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, - Consistency::Serial => CassConsistency::CASS_CONSISTENCY_SERIAL, - Consistency::LocalSerial => CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL, - } - } -} - -impl From<&WriteType> for CassWriteType { - fn from(c: &WriteType) -> CassWriteType { - match c { - WriteType::Simple => CassWriteType::CASS_WRITE_TYPE_SIMPLE, - WriteType::Batch => CassWriteType::CASS_WRITE_TYPE_BATCH, - WriteType::UnloggedBatch => CassWriteType::CASS_WRITE_TYPE_UNLOGGED_BATCH, - WriteType::Counter => CassWriteType::CASS_WRITE_TYPE_COUNTER, - WriteType::BatchLog => CassWriteType::CASS_WRITE_TYPE_BATCH_LOG, - WriteType::Cas => CassWriteType::CASS_WRITE_TYPE_CAS, - WriteType::View => CassWriteType::CASS_WRITE_TYPE_VIEW, - WriteType::Cdc => CassWriteType::CASS_WRITE_TYPE_CDC, - WriteType::Other(_) => CassWriteType::CASS_WRITE_TYPE_UNKNOWN, - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_free( - error_result: CassOwnedSharedPtr, -) { - ArcFFI::free(error_result); -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_code( - error_result: CassBorrowedSharedPtr, -) -> CassError { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - error_result.to_cass_error() -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_consistency( - error_result: CassBorrowedSharedPtr, -) -> CassConsistency { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError( - RequestAttemptError::DbError(DbError::Unavailable { consistency, .. }, _) - | RequestAttemptError::DbError(DbError::ReadTimeout { consistency, .. }, _) - | RequestAttemptError::DbError(DbError::WriteTimeout { consistency, .. }, _) - | RequestAttemptError::DbError(DbError::ReadFailure { consistency, .. }, _) - | RequestAttemptError::DbError(DbError::WriteFailure { consistency, .. }, _), - )) => CassConsistency::from(*consistency), - _ => CassConsistency::CASS_CONSISTENCY_UNKNOWN, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_responses_received( - error_result: CassBorrowedSharedPtr, -) -> cass_int32_t { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(attempt_error)) => { - match attempt_error { - RequestAttemptError::DbError(DbError::Unavailable { alive, .. }, _) => *alive, - RequestAttemptError::DbError(DbError::ReadTimeout { received, .. }, _) => *received, - RequestAttemptError::DbError(DbError::WriteTimeout { received, .. }, _) => { - *received - } - RequestAttemptError::DbError(DbError::ReadFailure { received, .. }, _) => *received, - RequestAttemptError::DbError(DbError::WriteFailure { received, .. }, _) => { - *received - } - _ => -1, - } - } - _ => -1, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_responses_required( - error_result: CassBorrowedSharedPtr, -) -> cass_int32_t { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(attempt_error)) => { - match attempt_error { - RequestAttemptError::DbError(DbError::Unavailable { required, .. }, _) => *required, - RequestAttemptError::DbError(DbError::ReadTimeout { required, .. }, _) => *required, - RequestAttemptError::DbError(DbError::WriteTimeout { required, .. }, _) => { - *required - } - RequestAttemptError::DbError(DbError::ReadFailure { required, .. }, _) => *required, - RequestAttemptError::DbError(DbError::WriteFailure { required, .. }, _) => { - *required - } - _ => -1, - } - } - _ => -1, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_num_failures( - error_result: CassBorrowedSharedPtr, -) -> cass_int32_t { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::ReadFailure { numfailures, .. }, - _, - ))) => *numfailures, - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::WriteFailure { numfailures, .. }, - _, - ))) => *numfailures, - _ => -1, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_data_present( - error_result: CassBorrowedSharedPtr, -) -> cass_bool_t { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::ReadTimeout { data_present, .. }, - _, - ))) => { - if *data_present { - cass_true - } else { - cass_false - } - } - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::ReadFailure { data_present, .. }, - _, - ))) => { - if *data_present { - cass_true - } else { - cass_false - } - } - _ => cass_false, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_write_type( - error_result: CassBorrowedSharedPtr, -) -> CassWriteType { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::WriteTimeout { write_type, .. }, - _, - ))) => CassWriteType::from(write_type), - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::WriteFailure { write_type, .. }, - _, - ))) => CassWriteType::from(write_type), - _ => CassWriteType::CASS_WRITE_TYPE_UNKNOWN, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_keyspace( - error_result: CassBorrowedSharedPtr, - c_keyspace: *mut *const ::std::os::raw::c_char, - c_keyspace_len: *mut size_t, -) -> CassError { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::AlreadyExists { keyspace, .. }, - _, - ))) => { - unsafe { write_str_to_c(keyspace.as_str(), c_keyspace, c_keyspace_len) }; - CassError::CASS_OK - } - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::FunctionFailure { keyspace, .. }, - _, - ))) => { - unsafe { write_str_to_c(keyspace.as_str(), c_keyspace, c_keyspace_len) }; - CassError::CASS_OK - } - _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_table( - error_result: CassBorrowedSharedPtr, - c_table: *mut *const ::std::os::raw::c_char, - c_table_len: *mut size_t, -) -> CassError { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::AlreadyExists { table, .. }, - _, - ))) => { - unsafe { write_str_to_c(table.as_str(), c_table, c_table_len) }; - CassError::CASS_OK - } - _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_function( - error_result: CassBorrowedSharedPtr, - c_function: *mut *const ::std::os::raw::c_char, - c_function_len: *mut size_t, -) -> CassError { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::FunctionFailure { function, .. }, - _, - ))) => { - unsafe { write_str_to_c(function.as_str(), c_function, c_function_len) }; - CassError::CASS_OK - } - _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_num_arg_types( - error_result: CassBorrowedSharedPtr, -) -> size_t { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::FunctionFailure { arg_types, .. }, - _, - ))) => arg_types.len() as size_t, - _ => 0, - } -} - -#[no_mangle] -pub unsafe extern "C" fn cass_error_result_arg_type( - error_result: CassBorrowedSharedPtr, - index: size_t, - arg_type: *mut *const ::std::os::raw::c_char, - arg_type_length: *mut size_t, -) -> CassError { - let error_result: &CassErrorResult = ArcFFI::as_ref(error_result).unwrap(); - match error_result { - CassErrorResult::Query(ExecutionError::LastAttemptError(RequestAttemptError::DbError( - DbError::FunctionFailure { arg_types, .. }, - _, - ))) => { - if index >= arg_types.len() as size_t { - return CassError::CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS; - } - unsafe { - write_str_to_c( - arg_types[index as usize].as_str(), - arg_type, - arg_type_length, - ) - }; - CassError::CASS_OK - } - _ => CassError::CASS_ERROR_LIB_INVALID_ERROR_RESULT_TYPE, - } -} diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index 7b12b67b..64011e94 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -4,8 +4,8 @@ use crate::cass_types::{ cass_data_type_type, get_column_type, CassColumnSpec, CassDataType, CassDataTypeInner, CassValueType, MapDataType, }; +use crate::execution_error::CassErrorResult; use crate::inet::CassInet; -use crate::query_error::CassErrorResult; use crate::query_result::Value::{CollectionValue, RegularValue}; use crate::types::*; use crate::uuid::CassUuid; @@ -148,7 +148,7 @@ mod row_with_self_borrowed_metadata { use scylla::value::Row; use yoke::{Yoke, Yokeable}; - use crate::query_error::CassErrorResult; + use crate::execution_error::CassErrorResult; use super::{CassResultMetadata, CassRow};