Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1883,8 +1883,18 @@ cass_cluster_set_exponential_reconnect(CassCluster* cluster,
* request's roundtrip time. Larger values should be used for throughput
* bound workloads and lower values should be used for latency bound
* workloads.
*
* Notice that underlying Rust tokio timer has a granularity of millisecond.
* Thus, the sub-millisecond delays are implemented in a non-deterministic way
* by yielding the current tokio task.
*
* The semantics of mapping the provided number microseconds to the delay
* on rust-driver side:
* - 0us -> no delay, i.e. the delay is disabled
* - 1us - 999us -> small, non-deterministic delay
* - N us where N >= 1000 -> delay of (N / 1000)ms
*
* <b>Default:</b> 200 us
* <b>Default:</b> small, non-deterministic delay
*
* @public @memberof CassCluster
*
Expand Down
150 changes: 149 additions & 1 deletion scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use crate::types::*;
use crate::uuid::CassUuid;
use openssl::ssl::SslContextBuilder;
use openssl_sys::SSL_CTX_up_ref;
use scylla::client::SelfIdentity;
use scylla::client::execution_profile::ExecutionProfileBuilder;
use scylla::client::session::SessionConfig;
use scylla::client::session_builder::SessionBuilder;
use scylla::client::{SelfIdentity, WriteCoalescingDelay};
use scylla::frame::Compression;
use scylla::policies::load_balancing::{
DefaultPolicyBuilder, LatencyAwarenessBuilder, LoadBalancingPolicy,
Expand All @@ -24,6 +24,7 @@ use scylla::statement::{Consistency, SerialConsistency};
use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
use std::num::NonZero;
use std::os::raw::{c_char, c_int, c_uint};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -44,6 +45,11 @@ const DEFAULT_MAX_SCHEMA_WAIT_TIME: Duration = Duration::from_millis(10000);
const DEFAULT_SCHEMA_AGREEMENT_INTERVAL: Duration = Duration::from_millis(200);
// - setting TCP_NODELAY is true
const DEFAULT_SET_TCP_NO_DELAY: bool = true;
// - enabling write coalescing
const DEFAULT_ENABLE_WRITE_COALESCING: bool = true;
// - write coalescing delay
const DEFAULT_WRITE_COALESCING_DELAY: WriteCoalescingDelay =
WriteCoalescingDelay::SmallNondeterministic;
// - connect timeout is 5000 millis
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
// - keepalive interval is 30 secs
Expand Down Expand Up @@ -217,6 +223,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
.schema_agreement_interval(DEFAULT_SCHEMA_AGREEMENT_INTERVAL)
.tcp_nodelay(DEFAULT_SET_TCP_NO_DELAY)
.connection_timeout(DEFAULT_CONNECT_TIMEOUT)
.write_coalescing(DEFAULT_ENABLE_WRITE_COALESCING)
.write_coalescing_delay(DEFAULT_WRITE_COALESCING_DELAY)
.keepalive_interval(DEFAULT_KEEPALIVE_INTERVAL)
.keepalive_timeout(DEFAULT_KEEPALIVE_TIMEOUT)
};
Expand Down Expand Up @@ -433,6 +441,40 @@ pub unsafe extern "C" fn cass_cluster_set_connect_timeout(
cluster.session_builder.config.connect_timeout = Duration::from_millis(timeout_ms.into());
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_coalesce_delay(
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
delay_us: cass_int64_t,
) -> CassError {
let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
tracing::error!("Provided null cluster pointer to cass_cluster_set_coalesce_delay!");
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
};

let delay = match delay_us.cmp(&0) {
std::cmp::Ordering::Less => {
tracing::error!("Provided negative delay to cass_cluster_set_coalesce_delay!");
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
}
std::cmp::Ordering::Equal => None,
std::cmp::Ordering::Greater => match NonZero::new((delay_us / 1000) as u64) {
Some(non_zero_delay) => Some(WriteCoalescingDelay::Milliseconds(non_zero_delay)),
// This means that 0 < delay_us < 1000.
None => Some(WriteCoalescingDelay::SmallNondeterministic),
},
};

match delay {
Some(d) => {
cluster.session_builder.config.enable_write_coalescing = true;
cluster.session_builder.config.write_coalescing_delay = d;
}
None => cluster.session_builder.config.enable_write_coalescing = false,
}

CassError::CASS_OK
}

#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_request_timeout(
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
Expand Down Expand Up @@ -981,6 +1023,112 @@ mod tests {
os::raw::c_char,
};

#[test]
#[ntest::timeout(100)]
fn test_coalescing_delay() {
#[derive(Debug)]
struct DelayEqWrapper<'a>(&'a WriteCoalescingDelay);
impl PartialEq for DelayEqWrapper<'_> {
fn eq(&self, other: &Self) -> bool {
match (self.0, other.0) {
(
WriteCoalescingDelay::SmallNondeterministic,
WriteCoalescingDelay::SmallNondeterministic,
) => true,
(
WriteCoalescingDelay::Milliseconds(delay_self),
WriteCoalescingDelay::Milliseconds(delay_other),
) => delay_self == delay_other,
_ => false,
}
}
}

unsafe {
let mut cluster_raw = cass_cluster_new();

// Check the defaults
{
let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
assert!(cluster.session_builder.config.enable_write_coalescing);
assert_eq!(
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
DelayEqWrapper(&WriteCoalescingDelay::SmallNondeterministic)
);
}

// Provide negative delay
{
assert_cass_error_eq!(
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), -1),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
}

// Provide zero delay (disables write coalescing)
{
assert_cass_error_eq!(
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 0),
CassError::CASS_OK,
);

let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
assert!(!cluster.session_builder.config.enable_write_coalescing);
}

// Provide sub-millisecond delay
{
assert_cass_error_eq!(
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 420),
CassError::CASS_OK,
);

let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
assert!(cluster.session_builder.config.enable_write_coalescing);
assert_eq!(
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
DelayEqWrapper(&WriteCoalescingDelay::SmallNondeterministic)
);
}

// Provide millisecond delay
{
assert_cass_error_eq!(
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 1000),
CassError::CASS_OK,
);

let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
assert!(cluster.session_builder.config.enable_write_coalescing);
assert_eq!(
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
DelayEqWrapper(&WriteCoalescingDelay::Milliseconds(
NonZero::new(1).unwrap()
))
);
}

// Provide delay with some microseconds remainder - this should take the floor of (micros as f64 / 1000.0)
{
assert_cass_error_eq!(
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 2137),
CassError::CASS_OK,
);

let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
assert!(cluster.session_builder.config.enable_write_coalescing);
assert_eq!(
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
DelayEqWrapper(&WriteCoalescingDelay::Milliseconds(
NonZero::new(2).unwrap()
))
);
}

cass_cluster_free(cluster_raw);
}
}

#[test]
#[ntest::timeout(100)]
fn test_load_balancing_config() {
Expand Down