Skip to content

Commit b3f8dd1

Browse files
authored
Merge pull request #252 from muzarski/coalesce_delay
Configure coalescing delay
2 parents 716de88 + 796a675 commit b3f8dd1

File tree

2 files changed

+160
-2
lines changed

2 files changed

+160
-2
lines changed

include/cassandra.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1883,8 +1883,18 @@ cass_cluster_set_exponential_reconnect(CassCluster* cluster,
18831883
* request's roundtrip time. Larger values should be used for throughput
18841884
* bound workloads and lower values should be used for latency bound
18851885
* workloads.
1886+
*
1887+
* Notice that underlying Rust tokio timer has a granularity of millisecond.
1888+
* Thus, the sub-millisecond delays are implemented in a non-deterministic way
1889+
* by yielding the current tokio task.
1890+
*
1891+
* The semantics of mapping the provided number microseconds to the delay
1892+
* on rust-driver side:
1893+
* - 0us -> no delay, i.e. the delay is disabled
1894+
* - 1us - 999us -> small, non-deterministic delay
1895+
* - N us where N >= 1000 -> delay of (N / 1000)ms
18861896
*
1887-
* <b>Default:</b> 200 us
1897+
* <b>Default:</b> small, non-deterministic delay
18881898
*
18891899
* @public @memberof CassCluster
18901900
*

scylla-rust-wrapper/src/cluster.rs

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ use crate::types::*;
1010
use crate::uuid::CassUuid;
1111
use openssl::ssl::SslContextBuilder;
1212
use openssl_sys::SSL_CTX_up_ref;
13-
use scylla::client::SelfIdentity;
1413
use scylla::client::execution_profile::ExecutionProfileBuilder;
1514
use scylla::client::session::SessionConfig;
1615
use scylla::client::session_builder::SessionBuilder;
16+
use scylla::client::{SelfIdentity, WriteCoalescingDelay};
1717
use scylla::frame::Compression;
1818
use scylla::policies::load_balancing::{
1919
DefaultPolicyBuilder, LatencyAwarenessBuilder, LoadBalancingPolicy,
@@ -24,6 +24,7 @@ use scylla::statement::{Consistency, SerialConsistency};
2424
use std::collections::HashMap;
2525
use std::convert::TryInto;
2626
use std::future::Future;
27+
use std::num::NonZero;
2728
use std::os::raw::{c_char, c_int, c_uint};
2829
use std::sync::Arc;
2930
use std::time::Duration;
@@ -44,6 +45,11 @@ const DEFAULT_MAX_SCHEMA_WAIT_TIME: Duration = Duration::from_millis(10000);
4445
const DEFAULT_SCHEMA_AGREEMENT_INTERVAL: Duration = Duration::from_millis(200);
4546
// - setting TCP_NODELAY is true
4647
const DEFAULT_SET_TCP_NO_DELAY: bool = true;
48+
// - enabling write coalescing
49+
const DEFAULT_ENABLE_WRITE_COALESCING: bool = true;
50+
// - write coalescing delay
51+
const DEFAULT_WRITE_COALESCING_DELAY: WriteCoalescingDelay =
52+
WriteCoalescingDelay::SmallNondeterministic;
4753
// - connect timeout is 5000 millis
4854
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
4955
// - keepalive interval is 30 secs
@@ -217,6 +223,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
217223
.schema_agreement_interval(DEFAULT_SCHEMA_AGREEMENT_INTERVAL)
218224
.tcp_nodelay(DEFAULT_SET_TCP_NO_DELAY)
219225
.connection_timeout(DEFAULT_CONNECT_TIMEOUT)
226+
.write_coalescing(DEFAULT_ENABLE_WRITE_COALESCING)
227+
.write_coalescing_delay(DEFAULT_WRITE_COALESCING_DELAY)
220228
.keepalive_interval(DEFAULT_KEEPALIVE_INTERVAL)
221229
.keepalive_timeout(DEFAULT_KEEPALIVE_TIMEOUT)
222230
};
@@ -433,6 +441,40 @@ pub unsafe extern "C" fn cass_cluster_set_connect_timeout(
433441
cluster.session_builder.config.connect_timeout = Duration::from_millis(timeout_ms.into());
434442
}
435443

444+
#[unsafe(no_mangle)]
445+
pub unsafe extern "C" fn cass_cluster_set_coalesce_delay(
446+
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
447+
delay_us: cass_int64_t,
448+
) -> CassError {
449+
let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
450+
tracing::error!("Provided null cluster pointer to cass_cluster_set_coalesce_delay!");
451+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
452+
};
453+
454+
let delay = match delay_us.cmp(&0) {
455+
std::cmp::Ordering::Less => {
456+
tracing::error!("Provided negative delay to cass_cluster_set_coalesce_delay!");
457+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
458+
}
459+
std::cmp::Ordering::Equal => None,
460+
std::cmp::Ordering::Greater => match NonZero::new((delay_us / 1000) as u64) {
461+
Some(non_zero_delay) => Some(WriteCoalescingDelay::Milliseconds(non_zero_delay)),
462+
// This means that 0 < delay_us < 1000.
463+
None => Some(WriteCoalescingDelay::SmallNondeterministic),
464+
},
465+
};
466+
467+
match delay {
468+
Some(d) => {
469+
cluster.session_builder.config.enable_write_coalescing = true;
470+
cluster.session_builder.config.write_coalescing_delay = d;
471+
}
472+
None => cluster.session_builder.config.enable_write_coalescing = false,
473+
}
474+
475+
CassError::CASS_OK
476+
}
477+
436478
#[unsafe(no_mangle)]
437479
pub unsafe extern "C" fn cass_cluster_set_request_timeout(
438480
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
@@ -981,6 +1023,112 @@ mod tests {
9811023
os::raw::c_char,
9821024
};
9831025

1026+
#[test]
1027+
#[ntest::timeout(100)]
1028+
fn test_coalescing_delay() {
1029+
#[derive(Debug)]
1030+
struct DelayEqWrapper<'a>(&'a WriteCoalescingDelay);
1031+
impl PartialEq for DelayEqWrapper<'_> {
1032+
fn eq(&self, other: &Self) -> bool {
1033+
match (self.0, other.0) {
1034+
(
1035+
WriteCoalescingDelay::SmallNondeterministic,
1036+
WriteCoalescingDelay::SmallNondeterministic,
1037+
) => true,
1038+
(
1039+
WriteCoalescingDelay::Milliseconds(delay_self),
1040+
WriteCoalescingDelay::Milliseconds(delay_other),
1041+
) => delay_self == delay_other,
1042+
_ => false,
1043+
}
1044+
}
1045+
}
1046+
1047+
unsafe {
1048+
let mut cluster_raw = cass_cluster_new();
1049+
1050+
// Check the defaults
1051+
{
1052+
let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
1053+
assert!(cluster.session_builder.config.enable_write_coalescing);
1054+
assert_eq!(
1055+
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
1056+
DelayEqWrapper(&WriteCoalescingDelay::SmallNondeterministic)
1057+
);
1058+
}
1059+
1060+
// Provide negative delay
1061+
{
1062+
assert_cass_error_eq!(
1063+
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), -1),
1064+
CassError::CASS_ERROR_LIB_BAD_PARAMS
1065+
);
1066+
}
1067+
1068+
// Provide zero delay (disables write coalescing)
1069+
{
1070+
assert_cass_error_eq!(
1071+
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 0),
1072+
CassError::CASS_OK,
1073+
);
1074+
1075+
let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
1076+
assert!(!cluster.session_builder.config.enable_write_coalescing);
1077+
}
1078+
1079+
// Provide sub-millisecond delay
1080+
{
1081+
assert_cass_error_eq!(
1082+
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 420),
1083+
CassError::CASS_OK,
1084+
);
1085+
1086+
let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
1087+
assert!(cluster.session_builder.config.enable_write_coalescing);
1088+
assert_eq!(
1089+
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
1090+
DelayEqWrapper(&WriteCoalescingDelay::SmallNondeterministic)
1091+
);
1092+
}
1093+
1094+
// Provide millisecond delay
1095+
{
1096+
assert_cass_error_eq!(
1097+
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 1000),
1098+
CassError::CASS_OK,
1099+
);
1100+
1101+
let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
1102+
assert!(cluster.session_builder.config.enable_write_coalescing);
1103+
assert_eq!(
1104+
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
1105+
DelayEqWrapper(&WriteCoalescingDelay::Milliseconds(
1106+
NonZero::new(1).unwrap()
1107+
))
1108+
);
1109+
}
1110+
1111+
// Provide delay with some microseconds remainder - this should take the floor of (micros as f64 / 1000.0)
1112+
{
1113+
assert_cass_error_eq!(
1114+
cass_cluster_set_coalesce_delay(cluster_raw.borrow_mut(), 2137),
1115+
CassError::CASS_OK,
1116+
);
1117+
1118+
let cluster = BoxFFI::as_ref(cluster_raw.borrow()).unwrap();
1119+
assert!(cluster.session_builder.config.enable_write_coalescing);
1120+
assert_eq!(
1121+
DelayEqWrapper(&cluster.session_builder.config.write_coalescing_delay),
1122+
DelayEqWrapper(&WriteCoalescingDelay::Milliseconds(
1123+
NonZero::new(2).unwrap()
1124+
))
1125+
);
1126+
}
1127+
1128+
cass_cluster_free(cluster_raw);
1129+
}
1130+
}
1131+
9841132
#[test]
9851133
#[ntest::timeout(100)]
9861134
fn test_load_balancing_config() {

0 commit comments

Comments
 (0)