diff --git a/include/cassandra.h b/include/cassandra.h
index 257ad70e..f5882125 100644
--- a/include/cassandra.h
+++ b/include/cassandra.h
@@ -1883,8 +1883,18 @@ cass_cluster_set_exponential_reconnect(CassCluster* cluster,
* request's roundtrip time. Larger values should be used for throughput
* bound workloads and lower values should be used for latency bound
* workloads.
+ *
+ * Notice that underlying Rust tokio timer has a granularity of millisecond.
+ * Thus, the sub-millisecond delays are implemented in a non-deterministic way
+ * by yielding the current tokio task.
+ *
+ * The semantics of mapping the provided number microseconds to the delay
+ * on rust-driver side:
+ * - 0us -> no delay, i.e. the delay is disabled
+ * - 1us - 999us -> small, non-deterministic delay
+ * - N us where N >= 1000 -> delay of (N / 1000)ms
*
- * Default: 200 us
+ * Default: small, non-deterministic delay
*
* @public @memberof CassCluster
*
diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs
index 8650f267..6d49fd2b 100644
--- a/scylla-rust-wrapper/src/cluster.rs
+++ b/scylla-rust-wrapper/src/cluster.rs
@@ -10,10 +10,10 @@ use crate::types::*;
use crate::uuid::CassUuid;
use openssl::ssl::SslContextBuilder;
use openssl_sys::SSL_CTX_up_ref;
-use scylla::client::SelfIdentity;
use scylla::client::execution_profile::ExecutionProfileBuilder;
use scylla::client::session::SessionConfig;
use scylla::client::session_builder::SessionBuilder;
+use scylla::client::{SelfIdentity, WriteCoalescingDelay};
use scylla::frame::Compression;
use scylla::policies::load_balancing::{
DefaultPolicyBuilder, LatencyAwarenessBuilder, LoadBalancingPolicy,
@@ -24,6 +24,7 @@ use scylla::statement::{Consistency, SerialConsistency};
use std::collections::HashMap;
use std::convert::TryInto;
use std::future::Future;
+use std::num::NonZero;
use std::os::raw::{c_char, c_int, c_uint};
use std::sync::Arc;
use std::time::Duration;
@@ -44,6 +45,11 @@ const DEFAULT_MAX_SCHEMA_WAIT_TIME: Duration = Duration::from_millis(10000);
const DEFAULT_SCHEMA_AGREEMENT_INTERVAL: Duration = Duration::from_millis(200);
// - setting TCP_NODELAY is true
const DEFAULT_SET_TCP_NO_DELAY: bool = true;
+// - enabling write coalescing
+const DEFAULT_ENABLE_WRITE_COALESCING: bool = true;
+// - write coalescing delay
+const DEFAULT_WRITE_COALESCING_DELAY: WriteCoalescingDelay =
+ WriteCoalescingDelay::SmallNondeterministic;
// - connect timeout is 5000 millis
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
// - keepalive interval is 30 secs
@@ -217,6 +223,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr,
+ delay_us: cass_int64_t,
+) -> CassError {
+ let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
+ tracing::error!("Provided null cluster pointer to cass_cluster_set_coalesce_delay!");
+ return CassError::CASS_ERROR_LIB_BAD_PARAMS;
+ };
+
+ let delay = match delay_us.cmp(&0) {
+ std::cmp::Ordering::Less => {
+ tracing::error!("Provided negative delay to cass_cluster_set_coalesce_delay!");
+ return CassError::CASS_ERROR_LIB_BAD_PARAMS;
+ }
+ std::cmp::Ordering::Equal => None,
+ std::cmp::Ordering::Greater => match NonZero::new((delay_us / 1000) as u64) {
+ Some(non_zero_delay) => Some(WriteCoalescingDelay::Milliseconds(non_zero_delay)),
+ // This means that 0 < delay_us < 1000.
+ None => Some(WriteCoalescingDelay::SmallNondeterministic),
+ },
+ };
+
+ match delay {
+ Some(d) => {
+ cluster.session_builder.config.enable_write_coalescing = true;
+ cluster.session_builder.config.write_coalescing_delay = d;
+ }
+ None => cluster.session_builder.config.enable_write_coalescing = false,
+ }
+
+ CassError::CASS_OK
+}
+
#[unsafe(no_mangle)]
pub unsafe extern "C" fn cass_cluster_set_request_timeout(
cluster_raw: CassBorrowedExclusivePtr,
@@ -981,6 +1023,112 @@ mod tests {
os::raw::c_char,
};
+ #[test]
+ #[ntest::timeout(100)]
+ fn test_coalescing_delay() {
+ #[derive(Debug)]
+ struct DelayEqWrapper<'a>(&'a WriteCoalescingDelay);
+ impl PartialEq for DelayEqWrapper<'_> {
+ fn eq(&self, other: &Self) -> bool {
+ match (self.0, other.0) {
+ (
+ WriteCoalescingDelay::SmallNondeterministic,
+ WriteCoalescingDelay::SmallNondeterministic,
+ ) => true,
+ (
+ WriteCoalescingDelay::Milliseconds(delay_self),
+ WriteCoalescingDelay::Milliseconds(delay_other),
+ ) => delay_self == delay_other,
+ _ => false,
+ }
+ }
+ }
+
+ unsafe {
+ let mut cluster_raw = cass_cluster_new();
+
+ // Check the defaults
+ {
+ let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
+ assert!(cluster.session_builder.config.enable_write_coalescing);
+ assert_eq!(
+ DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
+ DelayEqWrapper(&WriteCoalescingDelay::SmallNondeterministic)
+ );
+ }
+
+ // Provide negative delay
+ {
+ assert_cass_error_eq!(
+ cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), -1),
+ CassError::CASS_ERROR_LIB_BAD_PARAMS
+ );
+ }
+
+ // Provide zero delay (disables write coalescing)
+ {
+ assert_cass_error_eq!(
+ cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 0),
+ CassError::CASS_OK,
+ );
+
+ let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
+ assert!(!cluster.session_builder.config.enable_write_coalescing);
+ }
+
+ // Provide sub-millisecond delay
+ {
+ assert_cass_error_eq!(
+ cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 420),
+ CassError::CASS_OK,
+ );
+
+ let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
+ assert!(cluster.session_builder.config.enable_write_coalescing);
+ assert_eq!(
+ DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
+ DelayEqWrapper(&WriteCoalescingDelay::SmallNondeterministic)
+ );
+ }
+
+ // Provide millisecond delay
+ {
+ assert_cass_error_eq!(
+ cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 1000),
+ CassError::CASS_OK,
+ );
+
+ let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
+ assert!(cluster.session_builder.config.enable_write_coalescing);
+ assert_eq!(
+ DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
+ DelayEqWrapper(&WriteCoalescingDelay::Milliseconds(
+ NonZero::new(1).unwrap()
+ ))
+ );
+ }
+
+ // Provide delay with some microseconds remainder - this should take the floor of (micros as f64 / 1000.0)
+ {
+ assert_cass_error_eq!(
+ cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 2137),
+ CassError::CASS_OK,
+ );
+
+ let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
+ assert!(cluster.session_builder.config.enable_write_coalescing);
+ assert_eq!(
+ DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
+ DelayEqWrapper(&WriteCoalescingDelay::Milliseconds(
+ NonZero::new(2).unwrap()
+ ))
+ );
+ }
+
+ cass_cluster_free(cluster_raw);
+ }
+ }
+
#[test]
#[ntest::timeout(100)]
fn test_load_balancing_config() {