diff --git a/.github/workflows/semver_checks.yml b/.github/workflows/semver_checks.yml index 976057b125..bcad2f0b3c 100644 --- a/.github/workflows/semver_checks.yml +++ b/.github/workflows/semver_checks.yml @@ -60,8 +60,8 @@ jobs: run: rustup update - name: Install semver-checks # Official action uses binary releases fetched from GitHub - # If this pipeline becomes too slow, we should do this too - run: cargo install cargo-semver-checks --no-default-features + # If this pipeline becomes too slow, we should do this too. + run: cargo install cargo-semver-checks --no-default-features --features gix-reqwest - name: Verify the API compatibilty with PR base id: semver-pr-check run: | @@ -147,6 +147,6 @@ jobs: - name: Update rust toolchain run: rustup update - name: Install semver-checks - run: cargo install cargo-semver-checks --no-default-features + run: cargo install cargo-semver-checks --no-default-features --features gix-reqwest - name: Run semver-checks to see if it agrees with version updates run: make semver-version diff --git a/Cargo.lock.msrv b/Cargo.lock.msrv index 64b81b820a..0d4b29192e 100644 --- a/Cargo.lock.msrv +++ b/Cargo.lock.msrv @@ -1471,7 +1471,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" -version = "0.13.1" +version = "0.13.2" dependencies = [ "arc-swap", "assert_matches", @@ -1516,7 +1516,7 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.2.1" +version = "0.2.2" dependencies = [ "async-trait", "bigdecimal", @@ -1539,7 +1539,7 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "0.5.1" +version = "0.5.2" dependencies = [ "darling", "proc-macro2", diff --git a/scylla-cql/Cargo.toml b/scylla-cql/Cargo.toml index 751459cee3..68f3240092 100644 --- a/scylla-cql/Cargo.toml +++ b/scylla-cql/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scylla-cql" -version = "0.2.1" +version = "0.2.2" edition = "2021" description = "CQL data types and primitives, for interacting with Scylla." repository = "https://github.com/scylladb/scylla-rust-driver" @@ -10,7 +10,7 @@ categories = ["database"] license = "MIT OR Apache-2.0" [dependencies] -scylla-macros = { version = "0.5.1", path = "../scylla-macros" } +scylla-macros = { version = "0.5.2", path = "../scylla-macros" } byteorder = "1.3.4" bytes = "1.0.1" tokio = { version = "1.12", features = ["io-util", "time"] } diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index 527d481eb2..526f1b8187 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -925,7 +925,11 @@ fn deser_rows( let server_metadata = deser_result_metadata(buf)?; let metadata = match cached_metadata { - Some(metadata) => metadata.clone(), + Some(cached) => ResultMetadata { + col_count: cached.col_count, + paging_state: server_metadata.paging_state, + col_specs: cached.col_specs.clone(), + }, None => { // No cached_metadata provided. Server is supposed to provide the result metadata. if server_metadata.col_count != server_metadata.col_specs.len() { diff --git a/scylla-macros/Cargo.toml b/scylla-macros/Cargo.toml index dc008b7d2c..4f14fb22ac 100644 --- a/scylla-macros/Cargo.toml +++ b/scylla-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scylla-macros" -version = "0.5.1" +version = "0.5.2" edition = "2021" description = "proc macros for scylla async CQL driver" repository = "https://github.com/scylladb/scylla-rust-driver" diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index 301651b0e3..89931cfff3 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "scylla" -version = "0.13.1" +version = "0.13.2" edition = "2021" description = "Async CQL driver for Rust, optimized for Scylla, fully compatible with Apache Cassandraâ„¢" repository = "https://github.com/scylladb/scylla-rust-driver" @@ -40,8 +40,8 @@ full-serialization = [ ] [dependencies] -scylla-macros = { version = "0.5.1", path = "../scylla-macros" } -scylla-cql = { version = "0.2.1", path = "../scylla-cql" } +scylla-macros = { version = "0.5.2", path = "../scylla-macros" } +scylla-cql = { version = "0.2.2", path = "../scylla-cql" } byteorder = "1.3.4" bytes = "1.0.1" futures = "0.3.6" @@ -96,3 +96,4 @@ harness = false [lints.rust] unreachable_pub = "warn" +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(scylla_cloud_tests)'] } diff --git a/scylla/src/macros.rs b/scylla/src/macros.rs index 5c01bff053..e5eadc7b4c 100644 --- a/scylla/src/macros.rs +++ b/scylla/src/macros.rs @@ -39,6 +39,7 @@ pub use scylla_cql::macros::IntoUserType; /// - serialization will succed if suffix of UDT fields is missing. If there are missing fields in the /// middle it will fail. Note that if "skip_name_checks" is enabled, and the types happen to match, /// it is possible for serialization to succeed with unexpected result. +/// /// This behavior is the default to support ALTERing UDTs by adding new fields. /// You can require exact match of fields using `force_exact_match` attribute. /// diff --git a/scylla/src/transport/cql_types_test.rs b/scylla/src/transport/cql_types_test.rs index 8429a6ce0e..35e7bd9554 100644 --- a/scylla/src/transport/cql_types_test.rs +++ b/scylla/src/transport/cql_types_test.rs @@ -3,7 +3,7 @@ use crate::cql_to_rust::FromCqlVal; use crate::frame::response::result::CqlValue; use crate::frame::value::{Counter, CqlDate, CqlTime, CqlTimestamp}; use crate::macros::FromUserType; -use crate::test_utils::{create_new_session_builder, setup_tracing}; +use crate::test_utils::{create_new_session_builder, scylla_supports_tablets, setup_tracing}; use crate::transport::session::Session; use crate::utils::test_utils::unique_keyspace_name; use itertools::Itertools; @@ -16,23 +16,27 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; // Used to prepare a table for test -// Creates a new keyspace +// Creates a new keyspace, without tablets if requested and the ScyllaDB instance supports them. // Drops and creates table {table_name} (id int PRIMARY KEY, val {type_name}) -async fn init_test(table_name: &str, type_name: &str) -> Session { +async fn init_test_maybe_without_tablets( + table_name: &str, + type_name: &str, + supports_tablets: bool, +) -> Session { let session: Session = create_new_session_builder().build().await.unwrap(); let ks = unique_keyspace_name(); - session - .query( - format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ - {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", - ks - ), - &[], - ) - .await - .unwrap(); + let mut create_ks = format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = \ + {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", + ks + ); + + if !supports_tablets && scylla_supports_tablets(&session).await { + create_ks += " AND TABLETS = {'enabled': false}" + } + + session.query(create_ks, &[]).await.unwrap(); session.use_keyspace(ks, false).await.unwrap(); session @@ -54,6 +58,13 @@ async fn init_test(table_name: &str, type_name: &str) -> Session { session } +// Used to prepare a table for test +// Creates a new keyspace +// Drops and creates table {table_name} (id int PRIMARY KEY, val {type_name}) +async fn init_test(table_name: &str, type_name: &str) -> Session { + init_test_maybe_without_tablets(table_name, type_name, true).await +} + // This function tests serialization and deserialization mechanisms by sending insert and select // queries to running Scylla instance. // To do so, it: @@ -267,7 +278,7 @@ async fn test_counter() { // Can't use run_tests, because counters are special and can't be inserted let type_name = "counter"; - let session: Session = init_test(type_name, type_name).await; + let session: Session = init_test_maybe_without_tablets(type_name, type_name, false).await; for (i, test) in tests.iter().enumerate() { let update_bound_value = format!("UPDATE {} SET val = val + ? WHERE id = ?", type_name); diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 46aa282992..dd5a989d52 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -58,15 +58,28 @@ impl NodeLocationPreference { } } +/// An ordering requirement for replicas. #[derive(Clone, Copy)] enum ReplicaOrder { + /// No requirement. Replicas can be returned in arbitrary order. Arbitrary, - RingOrder, + + /// A requirement for the order to be deterministic, not only across statement executions + /// but also across drivers. This is used for LWT optimisation, to avoid Paxos conflicts. + Deterministic, } +/// Statement kind, used to enable specific load balancing patterns for certain cases. +/// +/// Currently, there is a distinguished case of LWT statements, which should always be routed +/// to replicas in a deterministic order to avoid Paxos conflicts. Other statements +/// are routed to random replicas to balance the load. #[derive(Clone, Copy)] enum StatementType { + /// The statement is a confirmed LWT. It's to be routed specifically. Lwt, + + /// The statement is not a confirmed LWT. It's to be routed in a default way. NonLwt, } @@ -82,16 +95,52 @@ enum PickedReplica<'a> { /// The default load balancing policy. /// -/// It can be configured to be datacenter-aware and token-aware. -/// Datacenter failover for queries with non local consistency mode is also supported. -/// Latency awareness is available, althrough not recommended. +/// It can be configured to be datacenter-aware, rack-aware and token-aware. +/// Datacenter failover (sending query to a node from a remote datacenter) +/// for queries with non local consistency mode is also supported. +/// +/// Latency awareness is available, although **not recommended**: +/// the penalisation mechanism it involves may interact badly with other +/// mechanisms, such as LWT optimisation. Also, the very tactics of penalising +/// nodes for recently measures latencies is believed to not be very stable +/// and beneficial. The number of in-flight requests, for instance, seems +/// to be a better metric showing how (over)loaded a target node/shard is. +/// For now, however, we don't have an implementation of the +/// in-flight-requests-aware policy. #[allow(clippy::type_complexity)] pub struct DefaultPolicy { + /// Preferences regarding node location. One of: rack and DC, DC, or no preference. preferences: NodeLocationPreference, + + /// Configures whether the policy takes token into consideration when creating plans. + /// If this is set to `true` AND token, keyspace and table are available, + /// then policy prefers replicas and puts them earlier in the query plan. is_token_aware: bool, + + /// Whether to permit remote nodes (those not located in the preferred DC) in plans. + /// If no preferred DC is set, this has no effect. permit_dc_failover: bool, + + /// A predicate that a target (node + shard) must satisfy in order to be picked. + /// This was introduced to make latency awareness cleaner. + /// - if latency awareness is disabled, then `pick_predicate` is just `Self::is_alive()`; + /// - if latency awareness is enabled, then it is `Self::is_alive() && latency_predicate()`, + /// which checks that the target is not penalised due to high latencies. pick_predicate: Box, Option) -> bool + Send + Sync>, + + /// Additional layer that penalises targets that are too slow compared to others + /// in terms of latency. It works in the following way: + /// - for `pick`, it uses `latency_predicate` to filter out penalised nodes, + /// so that a penalised node will never be `pick`ed; + /// - for `fallback`, it wraps the returned iterator, moving all penalised nodes + /// to the end, in a stable way. + /// + /// Penalisation is done based on collected and updated latencies. latency_awareness: Option, + + /// The policy chooses (in `pick`) and shuffles (in `fallback`) replicas and nodes + /// based on random number generator. For sake of deterministic testing, + /// a fixed seed can be used. fixed_seed: Option, } @@ -102,7 +151,7 @@ impl fmt::Debug for DefaultPolicy { .field("is_token_aware", &self.is_token_aware) .field("permit_dc_failover", &self.permit_dc_failover) .field("latency_awareness", &self.latency_awareness) - .field("fixed_shuffle_seed", &self.fixed_seed) + .field("fixed_seed", &self.fixed_seed) .finish_non_exhaustive() } } @@ -113,7 +162,10 @@ impl LoadBalancingPolicy for DefaultPolicy { query: &'a RoutingInfo, cluster: &'a ClusterData, ) -> Option<(NodeRef<'a>, Option)> { + /* For prepared statements, token-aware logic is available, we know what are the replicas + * for the statement, so that we can pick one of them. */ let routing_info = self.routing_info(query, cluster); + if let Some(ref token_with_strategy) = routing_info.token_with_strategy { if self.preferences.datacenter().is_some() && !self.permit_dc_failover @@ -130,11 +182,16 @@ or refrain from preferring datacenters (which may ban all other datacenters, if ); } } + + /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */ let statement_type = if query.is_confirmed_lwt { StatementType::Lwt } else { StatementType::NonLwt }; + + /* Token-aware logic - if routing info is available, we know what are the replicas + * for the statement. Try to pick one of them. */ if let (Some(ts), Some(table_spec)) = (&routing_info.token_with_strategy, query.table) { if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { // Try to pick some alive local rack random replica. @@ -207,55 +264,70 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } }; - // If no token was available (or all the replicas for that token are down), try to pick - // some alive local node. + /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements), + * or no replica was suitable for targeting it (e.g. disabled or down), try to choose + * a random node, not necessarily a replica. */ + + /* We start having not alive nodes filtered out. This is done by `pick_predicate`, + * which always contains `Self::is_alive()`. */ + + // Let's start with local nodes, i.e. those in the preferred datacenter. // If there was no preferred datacenter specified, all nodes are treated as local. - let nodes = self.preferred_node_set(cluster); + let local_nodes = self.preferred_node_set(cluster); if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { - // Try to pick some alive local rack random node. + // Try to pick some alive random local rack node. let rack_predicate = Self::make_rack_predicate( |node| (self.pick_predicate)(node, None), NodeLocationCriteria::DatacenterAndRack(dc, rack), ); - let local_rack_picked = self.pick_node(nodes, rack_predicate); + let local_rack_node_picked = self.pick_node(local_nodes, rack_predicate); - if let Some(alive_local_rack) = local_rack_picked { - return Some((alive_local_rack, None)); + if let Some(alive_local_rack_node) = local_rack_node_picked { + return Some((alive_local_rack_node, None)); } } - // Try to pick some alive local random node. - if let Some(alive_local) = self.pick_node(nodes, |node| (self.pick_predicate)(node, None)) { - return Some((alive_local, None)); + // Try to pick some alive random local node. + let local_node_picked = + self.pick_node(local_nodes, |node| (self.pick_predicate)(node, None)); + if let Some(alive_local_node) = local_node_picked { + return Some((alive_local_node, None)); } let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring(); // If a datacenter failover is possible, loosen restriction about locality. if self.is_datacenter_failover_possible(&routing_info) { - let picked = self.pick_node(all_nodes, |node| (self.pick_predicate)(node, None)); - if let Some(alive_maybe_remote) = picked { - return Some((alive_maybe_remote, None)); + let maybe_remote_node_picked = + self.pick_node(all_nodes, |node| (self.pick_predicate)(node, None)); + if let Some(alive_maybe_remote_node) = maybe_remote_node_picked { + return Some((alive_maybe_remote_node, None)); } } + /* As we are here, we failed to pick any alive node. Now let's consider even down nodes. */ + // Previous checks imply that every node we could have selected is down. // Let's try to return a down node that wasn't disabled. - let picked = self.pick_node(nodes, |node| node.is_enabled()); - if let Some(down_but_enabled_local_node) = picked { + let maybe_down_local_node_picked = self.pick_node(local_nodes, |node| node.is_enabled()); + if let Some(down_but_enabled_local_node) = maybe_down_local_node_picked { return Some((down_but_enabled_local_node, None)); } // If a datacenter failover is possible, loosen restriction about locality. if self.is_datacenter_failover_possible(&routing_info) { - let picked = self.pick_node(all_nodes, |node| node.is_enabled()); - if let Some(down_but_enabled_maybe_remote_node) = picked { + let maybe_down_maybe_remote_node_picked = + self.pick_node(all_nodes, |node| node.is_enabled()); + if let Some(down_but_enabled_maybe_remote_node) = maybe_down_maybe_remote_node_picked { return Some((down_but_enabled_maybe_remote_node, None)); } } // Every node is disabled. This could be due to a bad host filter - configuration error. - nodes.first().map(|node| (node, None)) + // It makes no sense to return disabled nodes (there are no open connections to them anyway), + // so let's return None. `fallback()` will return empty iterator, and so the whole plan + // will be empty. + None } fn fallback<'a>( @@ -263,20 +335,29 @@ or refrain from preferring datacenters (which may ban all other datacenters, if query: &'a RoutingInfo, cluster: &'a ClusterData, ) -> FallbackPlan<'a> { + /* For prepared statements, token-aware logic is available, we know what are the replicas + * for the statement, so that we can pick one of them. */ let routing_info = self.routing_info(query, cluster); + + /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */ let statement_type = if query.is_confirmed_lwt { StatementType::Lwt } else { StatementType::NonLwt }; - // If token is available, get a shuffled list of alive replicas. + /* Token-aware logic - if routing info is available, we know what are the replicas for the statement. + * Get a list of alive replicas: + * - shuffled list in case of non-LWTs, + * - deterministically ordered in case of LWTs. */ let maybe_replicas = if let (Some(ts), Some(table_spec)) = (&routing_info.token_with_strategy, query.table) { + // Iterator over alive local rack replicas (shuffled or deterministically ordered, + // depending on the statement being LWT or not). let maybe_local_rack_replicas = if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { - let local_rack_replicas = self.fallback_replicas( + let local_rack_replicas = self.maybe_shuffled_replicas( ts, NodeLocationCriteria::DatacenterAndRack(dc, rack), |node, shard| Self::is_alive(node, Some(shard)), @@ -289,11 +370,13 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty()) }; + // Iterator over alive local datacenter replicas (shuffled or deterministically ordered, + // depending on the statement being LWT or not). let maybe_local_replicas = if let NodeLocationPreference::DatacenterAndRack(dc, _) | NodeLocationPreference::Datacenter(dc) = &self.preferences { - let local_replicas = self.fallback_replicas( + let local_replicas = self.maybe_shuffled_replicas( ts, NodeLocationCriteria::Datacenter(dc), |node, shard| Self::is_alive(node, Some(shard)), @@ -310,7 +393,9 @@ or refrain from preferring datacenters (which may ban all other datacenters, if let maybe_remote_replicas = if self.preferences.datacenter().is_none() || self.is_datacenter_failover_possible(&routing_info) { - let remote_replicas = self.fallback_replicas( + // Iterator over alive replicas (shuffled or deterministically ordered, + // depending on the statement being LWT or not). + let remote_replicas = self.maybe_shuffled_replicas( ts, NodeLocationCriteria::Any, |node, shard| Self::is_alive(node, Some(shard)), @@ -335,10 +420,16 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty::<(NodeRef<'a>, Option)>()) }; - // Get a list of all local alive nodes, and apply a round robin to it + /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements), + * or no replica is suitable for targeting it (e.g. disabled or down), try targetting nodes + * that are not necessarily replicas. */ + + /* We start having not alive nodes filtered out. */ + + // All nodes in the local datacenter (if one is given). let local_nodes = self.preferred_node_set(cluster); - let maybe_local_rack_nodes = + let robinned_local_rack_nodes = if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences { let rack_predicate = Self::make_rack_predicate( |node| Self::is_alive(node, None), @@ -351,10 +442,12 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } else { Either::Right(std::iter::empty::<(NodeRef<'a>, Option)>()) }; + let robinned_local_nodes = self .round_robin_nodes(local_nodes, |node| Self::is_alive(node, None)) .map(|node| (node, None)); + // All nodes in the cluster. let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring(); // If a datacenter failover is possible, loosen restriction about locality. @@ -385,6 +478,27 @@ or refrain from preferring datacenters (which may ban all other datacenters, if Either::Right(std::iter::empty()) }; + /// *Plan* should return unique elements. It is not however obvious what it means, + /// because some nodes in the plan may have shards and some may not. + /// + /// This helper structure defines equality of plan elements. + /// How the comparison works: + /// - If at least one of elements is shard-less, then compare just nodes. + /// - If both elements have shards, then compare both nodes and shards. + /// + /// Why is it implemented this way? + /// Driver should not attempt to send a request to the same destination twice. + /// If a plan element doesn't have shard specified, then a random shard will be + /// chosen by the driver. If the plan also contains the same node but with + /// a shard present, and we randomly choose the same shard for the shard-less element, + /// then we have duplication. + /// + /// Example: plan is `[(Node1, Some(1)), (Node1, None)]` - if the driver uses + /// the second element and randomly chooses shard 1, then we have duplication. + /// + /// On the other hand, if a plan has a duplicate node, but with different shards, + /// then we want to use both elements - so we can't just make the list unique by node, + /// and so this struct was created. struct DefaultPolicyTargetComparator { host_id: Uuid, shard: Option, @@ -409,9 +523,17 @@ or refrain from preferring datacenters (which may ban all other datacenters, if } } - // Construct a fallback plan as a composition of replicas, local nodes and remote nodes. + // Construct a fallback plan as a composition of: + // - local rack alive replicas, + // - local datacenter alive replicas (or all alive replicas is no DC is preferred), + // - remote alive replicas (if DC failover is enabled), + // - local rack alive nodes, + // - local datacenter alive nodes (or all alive nodes is no DC is preferred), + // - remote alive nodes (if DC failover is enabled), + // - local datacenter nodes, + // - remote nodes (if DC failover is enabled). let plan = maybe_replicas - .chain(maybe_local_rack_nodes) + .chain(robinned_local_rack_nodes) .chain(robinned_local_nodes) .chain(maybe_remote_nodes) .chain(maybe_down_local_nodes) @@ -421,6 +543,8 @@ or refrain from preferring datacenters (which may ban all other datacenters, if shard: *shard, }); + // If latency awareness is enabled, wrap the plan by applying latency penalisation: + // all penalised nodes are moved behind non-penalised nodes, in a stable fashion. if let Some(latency_awareness) = self.latency_awareness.as_ref() { Box::new(latency_awareness.wrap(plan)) } else { @@ -459,6 +583,7 @@ impl DefaultPolicy { DefaultPolicyBuilder::new() } + /// Returns the given routing info processed based on given cluster data. fn routing_info<'a>( &'a self, query: &'a RoutingInfo, @@ -473,6 +598,8 @@ impl DefaultPolicy { routing_info } + /// Returns all nodes in the local datacenter if one is given, + /// or else all nodes in the cluster. fn preferred_node_set<'a>(&'a self, cluster: &'a ClusterData) -> &'a [Arc] { if let Some(preferred_datacenter) = self.preferences.datacenter() { if let Some(nodes) = cluster @@ -493,6 +620,8 @@ impl DefaultPolicy { } } + /// Returns a full replica set for given datacenter (if given, else for all DCs), + /// cluster data and table spec. fn nonfiltered_replica_set<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -535,7 +664,12 @@ impl DefaultPolicy { } } - fn replicas<'a>( + /// Returns iterator over replicas for given token and table spec, filtered + /// by provided location criteria and predicate. + /// Respects requested replica order, i.e. if requested, returns replicas ordered + /// deterministically (i.e. by token ring order or by tablet definition order), + /// else returns replicas in arbitrary order. + fn filtered_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, replica_location: NodeLocationCriteria<'a>, @@ -551,7 +685,7 @@ impl DefaultPolicy { self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec) .into_iter(), ), - ReplicaOrder::RingOrder => Either::Right( + ReplicaOrder::Deterministic => Either::Right( self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec) .into_replicas_ordered() .into_iter(), @@ -560,6 +694,11 @@ impl DefaultPolicy { replica_iter.filter(move |(node, shard): &(NodeRef<'a>, Shard)| predicate(node, *shard)) } + /// Picks a replica for given token and table spec which meets the provided location criteria + /// and the predicate. + /// The replica is chosen randomly over all candidates that meet the criteria + /// unless the query is LWT; if so, the first replica meeting the criteria is chosen + /// to avoid Paxos contention. fn pick_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -579,11 +718,17 @@ impl DefaultPolicy { } } + /// Picks the first (wrt the deterministic order imposed on the keyspace, see comment below) + /// replica for given token and table spec which meets the provided location criteria + /// and the predicate. // This is to be used for LWT optimisation: in order to reduce contention - // caused by Paxos conflicts, we always try to query replicas in the same, ring order. + // caused by Paxos conflicts, we always try to query replicas in the same, + // deterministic order: + // - ring order for token ring keyspaces, + // - tablet definition order for tablet keyspaces. // // If preferred rack and DC are set, then the first (encountered on the ring) replica - // that resides in that rack in that DC **and** satisfies the `predicate` is returned. + // that resides in that rack in that DC **and** satisfies the `predicate` is returned. // // If preferred DC is set, then the first (encountered on the ring) replica // that resides in that DC **and** satisfies the `predicate` is returned. @@ -630,12 +775,12 @@ impl DefaultPolicy { // 2) returns replicas in the ring order (this is not true for the case // when multiple DCs are allowed, because ReplicaSet chains replicas sequences // from different DCs, thus not preserving the global ring order) - self.replicas( + self.filtered_replicas( ts, replica_location, predicate, cluster, - ReplicaOrder::RingOrder, + ReplicaOrder::Deterministic, table_spec, ) .next() @@ -644,6 +789,8 @@ impl DefaultPolicy { } } + /// Picks a random replica for given token and table spec which meets the provided + /// location criteria and the predicate. fn pick_random_replica<'a>( &'a self, ts: &TokenWithStrategy<'a>, @@ -664,7 +811,11 @@ impl DefaultPolicy { } } - fn fallback_replicas<'a>( + /// Returns iterator over replicas for given token and table spec, filtered + /// by provided location criteria and predicate. + /// By default, the replicas are shuffled. + /// For LWTs, though, the replicas are instead returned in a deterministic order. + fn maybe_shuffled_replicas<'a>( &'a self, ts: &TokenWithStrategy<'a>, replica_location: NodeLocationCriteria<'a>, @@ -674,20 +825,22 @@ impl DefaultPolicy { table_spec: &TableSpec, ) -> impl Iterator, Shard)> { let order = match statement_type { - StatementType::Lwt => ReplicaOrder::RingOrder, + StatementType::Lwt => ReplicaOrder::Deterministic, StatementType::NonLwt => ReplicaOrder::Arbitrary, }; - let replicas = self.replicas(ts, replica_location, predicate, cluster, order, table_spec); + let replicas = + self.filtered_replicas(ts, replica_location, predicate, cluster, order, table_spec); match statement_type { // As an LWT optimisation: in order to reduce contention caused by Paxos conflicts, - // we always try to query replicas in the same order. + // we always try to query replicas in the same order. StatementType::Lwt => Either::Left(replicas), StatementType::NonLwt => Either::Right(self.shuffle(replicas)), } } + /// Returns an iterator over the given slice of nodes, rotated by a random shift. fn randomly_rotated_nodes(nodes: &[Arc]) -> impl Iterator> { // Create a randomly rotated slice view let nodes_len = nodes.len(); @@ -704,6 +857,7 @@ impl DefaultPolicy { } } + /// Picks a random node from the slice of nodes. The node must satisfy the given predicate. fn pick_node<'a>( &'a self, nodes: &'a [Arc], @@ -713,6 +867,8 @@ impl DefaultPolicy { Self::randomly_rotated_nodes(nodes).find(|&node| predicate(node)) } + /// Returns an iterator over the given slice of nodes, rotated by a random shift + /// and filtered by given predicate. fn round_robin_nodes<'a>( &'a self, nodes: &'a [Arc], @@ -721,6 +877,7 @@ impl DefaultPolicy { Self::randomly_rotated_nodes(nodes).filter(move |node| predicate(node)) } + /// Wraps a given iterator by shuffling its contents. fn shuffle<'a>( &self, iter: impl Iterator, Shard)>, @@ -737,12 +894,14 @@ impl DefaultPolicy { vec.into_iter() } + /// Returns true iff the node should be considered to be alive. fn is_alive(node: NodeRef, _shard: Option) -> bool { // For now, we leave this as stub, until we have time to improve node events. // node.is_enabled() && !node.is_down() node.is_enabled() } + /// Returns true iff the datacenter failover is permitted for the statement being executed. fn is_datacenter_failover_possible(&self, routing_info: &ProcessedRoutingInfo) -> bool { self.preferences.datacenter().is_some() && self.permit_dc_failover @@ -2464,13 +2623,13 @@ mod latency_awareness { Err(e) => { warn!( "Error while calculating average: {e}. \ - prev_avg_secs: {prev_avg_secs}, \ - last_latency_secs: {last_latency_secs}, \ - prev_weight: {prev_weight}, \ - scaled_delay: {scaled_delay}, \ - delay: {delay}, \ - prev_avg.timestamp: {:?}, \ - now: {now:?}", + prev_avg_secs: {prev_avg_secs}, \ + last_latency_secs: {last_latency_secs}, \ + prev_weight: {prev_weight}, \ + scaled_delay: {scaled_delay}, \ + delay: {delay}, \ + prev_avg.timestamp: {:?}, \ + now: {now:?}", prev_avg.timestamp ); diff --git a/scylla/src/transport/locator/mod.rs b/scylla/src/transport/locator/mod.rs index e0f06e8ba2..2ae46856d1 100644 --- a/scylla/src/transport/locator/mod.rs +++ b/scylla/src/transport/locator/mod.rs @@ -25,12 +25,12 @@ use std::{ }; use tracing::debug; -/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, replication -/// strategy) pair. It does so by either using the precomputed token ranges, or doing the -/// computation on the fly. +/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token, +/// replication strategy, table) tuple. It does so by either using the precomputed +/// token ranges, or doing the computation on the fly (precomputation is configurable). #[derive(Debug, Clone)] pub struct ReplicaLocator { - /// the data based on which `ReplicaLocator` computes replica sets. + /// The data based on which `ReplicaLocator` computes replica sets. replication_data: ReplicationInfo, precomputed_replicas: PrecomputedReplicas, @@ -69,7 +69,7 @@ impl ReplicaLocator { } } - /// Returns a set of nodes that are considered to be replicas for a given token and strategy. + /// Returns a set of nodes that are considered to be replicas for a given token, strategy and table. /// If the `datacenter` parameter is set, the returned `ReplicaSet` is limited only to replicas /// from that datacenter. If a specified datacenter name does not correspond to a valid /// datacenter, an empty set will be returned. diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 7da4c3f54f..b06dced5bb 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -453,7 +453,7 @@ impl Session { /// instead of calling `Session::connect` directly, because it's more convenient. /// # Arguments /// * `config` - Connection configuration - known nodes, Compression, etc. - /// Must contain at least one known node. + /// Must contain at least one known node. /// /// # Example /// ```rust @@ -1334,7 +1334,7 @@ impl Session { /// # Arguments /// /// * `keyspace_name` - keyspace name to use, - /// keyspace names can have up to 48 alphanumeric characters and contain underscores + /// keyspace names can have up to 48 alphanumeric characters and contain underscores /// * `case_sensitive` - if set to true the generated query will put keyspace name in quotes /// # Example /// ```rust diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index f4a49daf85..40c1efb065 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -363,7 +363,14 @@ async fn test_counter_batch() { let session = Arc::new(create_new_session_builder().build().await.unwrap()); let ks = unique_keyspace_name(); - session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + // Need to disable tablets in this test because they don't support counters yet. + // (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e). + let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks); + if scylla_supports_tablets(&session).await { + create_ks += " AND TABLETS = {'enabled': false}" + } + + session.query(create_ks, &[]).await.unwrap(); session .query( format!( diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index b468050c0b..fafa8afdca 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -819,10 +819,16 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result create_peer_from_row(source, row, local_address).await, + Err(err) => { + warn!( + "system.peers or system.local has an invalid row, skipping it: {}", + err + ); + Ok(None) + } + } }); let peers = translated_peers_futures diff --git a/scylla/tests/integration/skip_metadata_optimization.rs b/scylla/tests/integration/skip_metadata_optimization.rs index 66c431d9de..6ee9ea2319 100644 --- a/scylla/tests/integration/skip_metadata_optimization.rs +++ b/scylla/tests/integration/skip_metadata_optimization.rs @@ -14,6 +14,7 @@ use std::sync::Arc; #[cfg(not(scylla_cloud_tests))] async fn test_skip_result_metadata() { setup_tracing(); + use bytes::Bytes; use scylla_proxy::{ResponseOpcode, ResponseRule}; const NO_METADATA_FLAG: i32 = 0x0004; @@ -79,6 +80,70 @@ async fn test_skip_result_metadata() { prepared.set_use_cached_result_metadata(true); test_with_flags_predicate(&session, &prepared, &mut rx, |flags| flags & NO_METADATA_FLAG != 0).await; + // Verify that the optimisation does not break paging + { + let ks = unique_keyspace_name(); + + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session.use_keyspace(ks, true).await.unwrap(); + + type RowT = (i32, i32, String); + session + .query( + "CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))", + &[], + ) + .await + .unwrap(); + + let insert_stmt = session + .prepare("INSERT INTO t2 (a, b, c) VALUES (?, ?, ?)") + .await + .unwrap(); + + for idx in 0..10 { + session + .execute(&insert_stmt, (idx, idx + 1, "Some text")) + .await + .unwrap(); + } + + { + let select_query = "SELECT a, b, c FROM t2"; + + let rs = session + .query(select_query, ()) + .await + .unwrap() + .rows_typed::() + .unwrap() + .collect::, _>>() + .unwrap(); + + let mut results_from_manual_paging: Vec = vec![]; + let mut prepared_paged = session.prepare(select_query).await.unwrap(); + prepared_paged.set_use_cached_result_metadata(true); + prepared_paged.set_page_size(1); + let mut paging_state: Option = None; + let mut watchdog = 0; + loop { + let mut rs_manual = session + .execute_paged(&prepared_paged, &[], paging_state) + .await + .unwrap(); + eprintln!("Paging state: {:?}", rs_manual.paging_state); + paging_state = rs_manual.paging_state.take(); + results_from_manual_paging + .extend(rs_manual.rows_typed::().unwrap().map(Result::unwrap)); + if watchdog > 30 || paging_state.is_none() { + break; + } + watchdog += 1; + } + assert_eq!(results_from_manual_paging, rs); + } + } + running_proxy }).await;