Skip to content

Commit 2c15ab4

Browse files
committed
lbp: construct CassHostFilter
In cpp-driver, the following rule is upheld: if a host is rejected by **all** policies, the connection to that host is not opened at all. We can achieve this by: - taking the union of all whitelists (per hosts and per dcs) - taking the intersection of all blacklists (per hosts and per dcs) Now, if a host is not in the union of whitelists, it is rejected. If a host is in the intersection of blacklists, it is rejected. Note: if the execution profile does not have a base LBP defined (one of: round-robin, dc-aware, rack-aware), all of the extensions are ignored - including the filtering rules. This is achieved by filtering out such profiles in CassCluster::build_host_filter.
1 parent 78dd3ee commit 2c15ab4

File tree

4 files changed

+80
-2
lines changed

4 files changed

+80
-2
lines changed

scylla-rust-wrapper/src/cluster.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use crate::cass_error::CassError;
33
use crate::cass_types::CassConsistency;
44
use crate::exec_profile::{CassExecProfile, ExecProfileName, exec_profile_builder_modify};
55
use crate::future::CassFuture;
6-
use crate::load_balancing::{LoadBalancingConfig, LoadBalancingKind, set_filtering};
6+
use crate::load_balancing::{
7+
CassHostFilter, LoadBalancingConfig, LoadBalancingKind, set_filtering,
8+
};
79
use crate::retry_policy::CassRetryPolicy;
810
use crate::retry_policy::RetryPolicy::*;
911
use crate::ssl::CassSsl;
@@ -15,6 +17,7 @@ use scylla::client::execution_profile::ExecutionProfileBuilder;
1517
use scylla::client::session_builder::SessionBuilder;
1618
use scylla::client::{PoolSize, SelfIdentity, WriteCoalescingDelay};
1719
use scylla::frame::Compression;
20+
use scylla::policies::host_filter::HostFilter;
1821
use scylla::policies::load_balancing::LatencyAwarenessBuilder;
1922
use scylla::policies::retry::RetryPolicy;
2023
use scylla::policies::speculative_execution::SimpleSpeculativeExecutionPolicy;
@@ -95,6 +98,25 @@ impl CassCluster {
9598
pub(crate) fn get_client_id(&self) -> Option<uuid::Uuid> {
9699
self.client_id
97100
}
101+
102+
pub(crate) fn build_host_filter(&self) -> Arc<dyn HostFilter> {
103+
CassHostFilter::new_from_lbp_configs(
104+
std::iter::once(&self.load_balancing_config).chain(
105+
self.execution_profile_map
106+
.values()
107+
// Filter out the profiles that do not have specified base LBP.
108+
// If base LBP is not specified, the extensions such as filtering
109+
// are simply ignored - default (cluster) LBP is used.
110+
.filter_map(|exec_profile| {
111+
exec_profile
112+
.load_balancing_config
113+
.load_balancing_kind
114+
.as_ref()
115+
.map(|_| &exec_profile.load_balancing_config)
116+
}),
117+
),
118+
)
119+
}
98120
}
99121

100122
// Utilities for integration testing

scylla-rust-wrapper/src/exec_profile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::types::{
3737
#[derive(Clone, Debug)]
3838
pub struct CassExecProfile {
3939
inner: ExecutionProfileBuilder,
40-
load_balancing_config: LoadBalancingConfig,
40+
pub(crate) load_balancing_config: LoadBalancingConfig,
4141
}
4242

4343
impl FFI for CassExecProfile {

scylla-rust-wrapper/src/load_balancing.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,57 @@ where
331331
intersection.filter(|v| !v.is_empty())
332332
}
333333

334+
impl CassHostFilter {
335+
/// Computes the filtering rules from the load balancing policies, and
336+
/// returns a corresponding `HostFilter`.
337+
///
338+
/// In cpp-driver, the following rule is upheld: if a host is rejected
339+
/// by **all** policies, the connection to that host is not opened at all.
340+
///
341+
/// We can achieve this by:
342+
/// - taking the union of all whitelists (per hosts and per dcs)
343+
/// - taking the intersection of all blacklists (per hosts and per dcs)
344+
///
345+
/// Now, if a host is not in the union of whitelists, it is rejected.
346+
/// If a host is in the intersection of blacklists, it is rejected.
347+
pub(crate) fn new_from_lbp_configs<'a>(
348+
configs: impl Iterator<Item = &'a LoadBalancingConfig> + Clone,
349+
) -> Arc<dyn HostFilter> {
350+
let whitelist_hosts = nonempty_union(
351+
configs
352+
.clone()
353+
.map(|lbp_config| &lbp_config.filtering.whitelist_hosts),
354+
);
355+
356+
let blacklist_hosts = nonempty_intersection(
357+
configs
358+
.clone()
359+
.map(|lbp_config| &lbp_config.filtering.blacklist_hosts),
360+
);
361+
362+
let whitelist_dc = nonempty_union(
363+
configs
364+
.clone()
365+
.map(|lbp_config| &lbp_config.filtering.whitelist_dc),
366+
);
367+
368+
let blacklist_dc = nonempty_intersection(
369+
configs
370+
.clone()
371+
.map(|lbp_config| &lbp_config.filtering.blacklist_dc),
372+
);
373+
374+
Arc::new(Self {
375+
filtering: FilteringInfo {
376+
whitelist_hosts,
377+
blacklist_hosts,
378+
whitelist_dc,
379+
blacklist_dc,
380+
},
381+
})
382+
}
383+
}
384+
334385
#[cfg(test)]
335386
mod tests {
336387
#[test]

scylla-rust-wrapper/src/session.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use scylla::cluster::metadata::ColumnType;
2121
use scylla::errors::ExecutionError;
2222
use scylla::frame::types::Consistency;
2323
use scylla::observability::metrics::MetricsError;
24+
use scylla::policies::host_filter::HostFilter;
2425
use scylla::response::PagingStateResponse;
2526
use scylla::response::query_result::QueryResult;
2627
use scylla::statement::unprepared::Statement;
@@ -78,11 +79,13 @@ impl CassSessionInner {
7879
) -> CassOwnedSharedPtr<CassFuture, CMut> {
7980
let session_builder = build_session_builder(cluster);
8081
let exec_profile_map = cluster.execution_profile_map().clone();
82+
let host_filter = cluster.build_host_filter();
8183

8284
CassFuture::make_raw(Self::connect_fut(
8385
session_opt,
8486
session_builder,
8587
exec_profile_map,
88+
host_filter,
8689
cluster
8790
.get_client_id()
8891
// If user did not set a client id, generate a random uuid v4.
@@ -95,6 +98,7 @@ impl CassSessionInner {
9598
session_opt: Arc<RwLock<Option<CassSessionInner>>>,
9699
session_builder_fut: impl Future<Output = SessionBuilder>,
97100
exec_profile_builder_map: HashMap<ExecProfileName, CassExecProfile>,
101+
host_filter: Arc<dyn HostFilter>,
98102
client_id: uuid::Uuid,
99103
keyspace: Option<String>,
100104
) -> CassFutureResult {
@@ -124,6 +128,7 @@ impl CassSessionInner {
124128
}
125129

126130
let session = session_builder
131+
.host_filter(host_filter)
127132
.build()
128133
.await
129134
.map_err(|err| (err.to_cass_error(), err.msg()))?;

0 commit comments

Comments
 (0)