From 70589d472eed2977a2625422319dbb1e8565f8c7 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sat, 12 Sep 2020 19:14:07 -0700 Subject: [PATCH 01/11] Implement static node list connection pool * This commit adds a connection pool that takes a static list of nodes and distributes the load. * trait for setting connection distribution. Defaults to RoundRobin. --- elasticsearch/src/http/transport.rs | 121 ++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 880c1256..9457b47f 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -43,6 +43,10 @@ use std::{ error, fmt, fmt::Debug, io::{self, Write}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::Duration, }; use url::Url; @@ -401,6 +405,15 @@ impl Transport { Ok(transport) } + /// Creates a new instance of a [Transport] configured with a + /// [StaticNodeListConnectionPool] + pub fn static_node_list(urls: Vec<&str>) -> Result { + let urls = urls.iter().map(|url| Url::parse(url).unwrap()).collect(); + let conn_pool = StaticNodeListConnectionPool::round_robin(urls); + let transport = TransportBuilder::new(conn_pool).build()?; + Ok(transport) + } + /// Creates a new instance of a [Transport] configured for use with /// [Elasticsearch service in Elastic Cloud](https://www.elastic.co/cloud/). /// @@ -672,11 +685,76 @@ impl ConnectionPool for CloudConnectionPool { } } +/// A Connection Pool that manages a static connection of nodes +#[derive(Debug, Clone)] +pub struct StaticNodeListConnectionPool { + connections: Vec, + strategy: TStrategy, +} + +impl ConnectionPool for StaticNodeListConnectionPool +where + TStrategy: Strategy + Clone, +{ + fn next(&self) -> &Connection { + self.strategy.try_next(&self.connections).unwrap() + } +} + +impl StaticNodeListConnectionPool { + /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ + pub fn round_robin(urls: Vec) -> Self { + let connections: Vec<_> = urls.into_iter().map(Connection::new).collect(); + + let strategy = RoundRobin::default(); + + Self { + connections, + strategy, + } + } +} + +/** The strategy selects an address from a given collection. */ +pub trait Strategy: Send + Sync + Debug { + /** Try get the next connection. */ + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error>; +} + +/** A round-robin strategy cycles through nodes sequentially. */ +#[derive(Clone, Debug)] +pub struct RoundRobin { + index: Arc, +} + +impl Default for RoundRobin { + fn default() -> Self { + RoundRobin { + index: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl Strategy for RoundRobin { + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error> { + if connections.is_empty() { + Err(crate::error::lib("Connection list empty")) + } else { + let i = self.index.fetch_add(1, Ordering::Relaxed) % connections.len(); + Ok(&connections[i]) + } + } +} + #[cfg(test)] pub mod tests { use super::*; #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] use crate::auth::ClientCertificate; + use crate::http::transport::{ + CloudId, Connection, ConnectionPool, RoundRobin, SingleNodeConnectionPool, + StaticNodeListConnectionPool, TransportBuilder, + }; use regex::Regex; use url::Url; @@ -825,4 +903,47 @@ pub mod tests { println!("{}", x); assert!(re.is_match(x)); } + + fn round_robin(addresses: Vec) -> StaticNodeListConnectionPool { + StaticNodeListConnectionPool::round_robin(addresses) + } + + fn expected_addresses() -> Vec { + vec!["http://a:9200/", "http://b:9200/", "http://c:9200/"] + .iter() + .map(|addr| Url::parse(addr).unwrap()) + .collect() + } + + #[test] + fn round_robin_next_multi() { + let connections = round_robin(expected_addresses()); + + for _ in 0..10 { + for expected in expected_addresses() { + let actual = connections.next(); + + assert_eq!(expected.as_str(), actual.url.as_str()); + } + } + } + + #[test] + fn round_robin_next_single() { + let expected = Url::parse("http://a:9200/").unwrap(); + let connections = round_robin(vec![expected.clone()]); + + for _ in 0..10 { + let actual = connections.next(); + + assert_eq!(expected.as_str(), actual.url.as_str()); + } + } + + #[test] + #[should_panic] + fn round_robin_next_empty_fails() { + let connections = round_robin(vec![]); + connections.next(); + } } From 5862ae1c30ef4960afa80a8ae2c9b544eea73445 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Tue, 15 Sep 2020 18:05:49 -0700 Subject: [PATCH 02/11] Fail fast if iterations fail for url parsing on static nodes --- elasticsearch/src/http/transport.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 9457b47f..656a70b0 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -408,7 +408,10 @@ impl Transport { /// Creates a new instance of a [Transport] configured with a /// [StaticNodeListConnectionPool] pub fn static_node_list(urls: Vec<&str>) -> Result { - let urls = urls.iter().map(|url| Url::parse(url).unwrap()).collect(); + let urls: Vec = urls + .iter() + .map(|url| Url::parse(url)) + .collect::, _>>()?; let conn_pool = StaticNodeListConnectionPool::round_robin(urls); let transport = TransportBuilder::new(conn_pool).build()?; Ok(transport) From 8908481f15183e0f4fcd3dc4abbd924cde0abee2 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sun, 20 Sep 2020 22:06:56 -0700 Subject: [PATCH 03/11] Rename connection and return cloned object * The connection should be owned by the current user of said connection --- elasticsearch/src/http/transport.rs | 44 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 656a70b0..f8514208 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -45,7 +45,7 @@ use std::{ io::{self, Write}, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, RwLock, }, time::Duration, }; @@ -412,7 +412,7 @@ impl Transport { .iter() .map(|url| Url::parse(url)) .collect::, _>>()?; - let conn_pool = StaticNodeListConnectionPool::round_robin(urls); + let conn_pool = MultiNodeConnectionPool::round_robin(urls); let transport = TransportBuilder::new(conn_pool).build()?; Ok(transport) } @@ -532,7 +532,7 @@ impl Default for Transport { /// dynamically at runtime, based upon the response to API calls. pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { /// Gets a reference to the next [Connection] - fn next(&self) -> &Connection; + fn next(&self) -> Connection; } clone_trait_object!(ConnectionPool); @@ -561,8 +561,8 @@ impl Default for SingleNodeConnectionPool { impl ConnectionPool for SingleNodeConnectionPool { /// Gets a reference to the next [Connection] - fn next(&self) -> &Connection { - &self.connection + fn next(&self) -> Connection { + self.connection.clone() } } @@ -683,31 +683,33 @@ impl CloudConnectionPool { impl ConnectionPool for CloudConnectionPool { /// Gets a reference to the next [Connection] - fn next(&self) -> &Connection { - &self.connection + fn next(&self) -> Connection { + self.connection.clone() } } /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] -pub struct StaticNodeListConnectionPool { - connections: Vec, +pub struct MultiNodeConnectionPool { + connections: Arc>>, strategy: TStrategy, } -impl ConnectionPool for StaticNodeListConnectionPool +impl ConnectionPool for MultiNodeConnectionPool where TStrategy: Strategy + Clone, { - fn next(&self) -> &Connection { - self.strategy.try_next(&self.connections).unwrap() + fn next(&self) -> Connection { + let inner = self.connections.read().expect("lock poisoned"); + self.strategy.try_next(&inner).unwrap() } } -impl StaticNodeListConnectionPool { +impl MultiNodeConnectionPool { /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ pub fn round_robin(urls: Vec) -> Self { - let connections: Vec<_> = urls.into_iter().map(Connection::new).collect(); + let connections: Arc>> = + Arc::new(RwLock::new(urls.into_iter().map(Connection::new).collect())); let strategy = RoundRobin::default(); @@ -721,7 +723,7 @@ impl StaticNodeListConnectionPool { /** The strategy selects an address from a given collection. */ pub trait Strategy: Send + Sync + Debug { /** Try get the next connection. */ - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error>; + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result; } /** A round-robin strategy cycles through nodes sequentially. */ @@ -739,12 +741,12 @@ impl Default for RoundRobin { } impl Strategy for RoundRobin { - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error> { + fn try_next<'a>(&self, connections: &'a [Connection]) -> Result { if connections.is_empty() { Err(crate::error::lib("Connection list empty")) } else { let i = self.index.fetch_add(1, Ordering::Relaxed) % connections.len(); - Ok(&connections[i]) + Ok(connections[i].clone()) } } } @@ -755,8 +757,8 @@ pub mod tests { #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] use crate::auth::ClientCertificate; use crate::http::transport::{ - CloudId, Connection, ConnectionPool, RoundRobin, SingleNodeConnectionPool, - StaticNodeListConnectionPool, TransportBuilder, + CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, RoundRobin, + SingleNodeConnectionPool, TransportBuilder, }; use regex::Regex; use url::Url; @@ -907,8 +909,8 @@ pub mod tests { assert!(re.is_match(x)); } - fn round_robin(addresses: Vec) -> StaticNodeListConnectionPool { - StaticNodeListConnectionPool::round_robin(addresses) + fn round_robin(addresses: Vec) -> MultiNodeConnectionPool { + MultiNodeConnectionPool::round_robin(addresses) } fn expected_addresses() -> Vec { From 05ddf4fc0abb494ca96debe057384358224a55b5 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sun, 20 Sep 2020 22:52:15 -0700 Subject: [PATCH 04/11] Allow reseeding of nodes on MultiNodeConnection --- elasticsearch/src/http/transport.rs | 125 +++++++++++++++++++++++----- 1 file changed, 106 insertions(+), 19 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index f8514208..c3f6045a 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -47,7 +47,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, RwLock, }, - time::Duration, + time::{Duration, Instant}, }; use url::Url; @@ -412,7 +412,7 @@ impl Transport { .iter() .map(|url| Url::parse(url)) .collect::, _>>()?; - let conn_pool = MultiNodeConnectionPool::round_robin(urls); + let conn_pool = MultiNodeConnectionPool::round_robin(urls, None); let transport = TransportBuilder::new(conn_pool).build()?; Ok(transport) } @@ -443,6 +443,10 @@ impl Transport { B: Body, Q: Serialize + ?Sized, { + if self.conn_pool.reseedable() { + // Reseed nodes + println!("Reseeding!"); + } let connection = self.conn_pool.next(); let url = connection.url.join(path.trim_start_matches('/'))?; let reqwest_method = self.method(method); @@ -533,6 +537,13 @@ impl Default for Transport { pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { /// Gets a reference to the next [Connection] fn next(&self) -> Connection; + + fn reseedable(&self) -> bool { + false + } + + // NOOP by default + fn reseed(&self, _connection: Vec) {} } clone_trait_object!(ConnectionPool); @@ -691,31 +702,63 @@ impl ConnectionPool for CloudConnectionPool { /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] pub struct MultiNodeConnectionPool { - connections: Arc>>, + inner: Arc>, + wait: Option, strategy: TStrategy, } +#[derive(Debug, Clone)] +pub struct MultiNodeConnectionPoolInner { + last_update: Option, + connections: Vec, +} + impl ConnectionPool for MultiNodeConnectionPool where TStrategy: Strategy + Clone, { fn next(&self) -> Connection { - let inner = self.connections.read().expect("lock poisoned"); - self.strategy.try_next(&inner).unwrap() + let inner = self.inner.read().expect("lock poisoned"); + self.strategy.try_next(&inner.connections).unwrap() + } + + fn reseedable(&self) -> bool { + let inner = self.inner.read().expect("lock poisoned"); + let wait = match self.wait { + Some(wait) => wait, + None => return false, + }; + let last_update_is_stale = inner + .last_update + .as_ref() + .map(|last_update| last_update.elapsed() > wait); + last_update_is_stale.unwrap_or(true) + } + + fn reseed(&self, mut connection: Vec) { + let mut inner = self.inner.write().expect("lock poisoned"); + inner.last_update = Some(Instant::now()); + inner.connections.clear(); + inner.connections.append(&mut connection); } } impl MultiNodeConnectionPool { /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ - pub fn round_robin(urls: Vec) -> Self { - let connections: Arc>> = - Arc::new(RwLock::new(urls.into_iter().map(Connection::new).collect())); + pub fn round_robin(urls: Vec, wait: Option) -> Self { + let connections = urls.into_iter().map(Connection::new).collect(); - let strategy = RoundRobin::default(); + let inner: Arc> = + Arc::new(RwLock::new(MultiNodeConnectionPoolInner { + last_update: None, + connections, + })); + let strategy = RoundRobin::default(); Self { - connections, + inner, strategy, + wait, } } } @@ -757,10 +800,11 @@ pub mod tests { #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] use crate::auth::ClientCertificate; use crate::http::transport::{ - CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, RoundRobin, - SingleNodeConnectionPool, TransportBuilder, + CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, SingleNodeConnectionPool, + TransportBuilder, }; use regex::Regex; + use std::time::{Duration, Instant}; use url::Url; #[test] @@ -909,10 +953,6 @@ pub mod tests { assert!(re.is_match(x)); } - fn round_robin(addresses: Vec) -> MultiNodeConnectionPool { - MultiNodeConnectionPool::round_robin(addresses) - } - fn expected_addresses() -> Vec { vec!["http://a:9200/", "http://b:9200/", "http://c:9200/"] .iter() @@ -920,9 +960,56 @@ pub mod tests { .collect() } + #[test] + fn test_reseedable_false_on_no_duration() { + let connections = MultiNodeConnectionPool::round_robin(expected_addresses(), None); + assert!(!connections.reseedable()); + } + + #[test] + fn test_reseed() { + let connection_pool = + MultiNodeConnectionPool::round_robin(vec![], Some(Duration::from_secs(28800))); + + let connections = expected_addresses() + .into_iter() + .map(Connection::new) + .collect(); + + connection_pool.reseeding(); + connection_pool.reseed(connections); + for _ in 0..10 { + for expected in expected_addresses() { + let actual = connection_pool.next(); + + assert_eq!(expected.as_str(), actual.url.as_str()); + } + } + // Check connection pool not reseedable after reseed + assert!(!connection_pool.reseedable()); + + let inner = connection_pool.inner.read().expect("lock poisoned"); + assert!(!inner.reseeding); + } + + #[test] + fn test_reseedable_after_duration() { + let connection_pool = MultiNodeConnectionPool::round_robin( + expected_addresses(), + Some(Duration::from_secs(30)), + ); + + // Set internal last_update to a minute ago + let mut inner = connection_pool.inner.write().expect("lock poisoned"); + inner.last_update = Some(Instant::now() - Duration::from_secs(60)); + drop(inner); + + assert!(connection_pool.reseedable()); + } + #[test] fn round_robin_next_multi() { - let connections = round_robin(expected_addresses()); + let connections = MultiNodeConnectionPool::round_robin(expected_addresses(), None); for _ in 0..10 { for expected in expected_addresses() { @@ -936,7 +1023,7 @@ pub mod tests { #[test] fn round_robin_next_single() { let expected = Url::parse("http://a:9200/").unwrap(); - let connections = round_robin(vec![expected.clone()]); + let connections = MultiNodeConnectionPool::round_robin(vec![expected.clone()], None); for _ in 0..10 { let actual = connections.next(); @@ -948,7 +1035,7 @@ pub mod tests { #[test] #[should_panic] fn round_robin_next_empty_fails() { - let connections = round_robin(vec![]); + let connections = MultiNodeConnectionPool::round_robin(vec![], None); connections.next(); } } From 6e7b5e7b59a8c1b9d267824e859cbbbbfc97feb6 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sun, 20 Sep 2020 23:22:30 -0700 Subject: [PATCH 05/11] Implement Sniff Nodes request * make review edits --- elasticsearch/src/http/transport.rs | 129 ++++++++++++++++++++++------ 1 file changed, 104 insertions(+), 25 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index c3f6045a..5224e4a8 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -39,6 +39,7 @@ use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::Encode use bytes::BytesMut; use lazy_static::lazy_static; use serde::Serialize; +use serde_json::Value; use std::{ error, fmt, fmt::Debug, @@ -331,7 +332,7 @@ impl Default for TransportBuilder { /// A connection to an Elasticsearch node, used to send an API request #[derive(Debug, Clone)] pub struct Connection { - url: Url, + url: Arc, } impl Connection { @@ -345,8 +346,14 @@ impl Connection { url.set_path(&format!("{}/", url.path())); } + let url = Arc::new(url); + Self { url } } + + pub fn url(&self) -> Arc { + self.url.clone() + } } /// A HTTP transport responsible for making the API requests to Elasticsearch, @@ -429,27 +436,22 @@ impl Transport { Ok(transport) } - /// Creates an asynchronous request that can be awaited - pub async fn send( + pub fn request_builder( &self, + connection: &Connection, method: Method, path: &str, headers: HeaderMap, query_string: Option<&Q>, body: Option, timeout: Option, - ) -> Result + ) -> Result where B: Body, Q: Serialize + ?Sized, { - if self.conn_pool.reseedable() { - // Reseed nodes - println!("Reseeding!"); - } - let connection = self.conn_pool.next(); - let url = connection.url.join(path.trim_start_matches('/'))?; let reqwest_method = self.method(method); + let url = connection.url.join(path.trim_start_matches('/'))?; let mut request_builder = self.client.request(reqwest_method, url); if let Some(t) = timeout { @@ -513,6 +515,70 @@ impl Transport { if let Some(q) = query_string { request_builder = request_builder.query(q); } + Ok(request_builder) + } + + /// Creates an asynchronous request that can be awaited + pub async fn send( + &self, + method: Method, + path: &str, + headers: HeaderMap, + query_string: Option<&Q>, + body: Option, + timeout: Option, + ) -> Result + where + B: Body, + Q: Serialize + ?Sized, + { + // Threads will execute against old connection pool during reseed + if self.conn_pool.reseedable() { + // Set as reseeding prevents another thread from attempting + // to reseed during es request and reseed + self.conn_pool.reseeding(); + + let connection = self.conn_pool.next(); + let scheme = &connection.url.scheme(); + // Build node info request + let node_request = self.request_builder( + &connection, + Method::Get, + "_nodes/_all/http", + headers.clone(), + None::<&Q>, + None::, + timeout, + )?; + let resp = node_request.send().await?; + let json: Value = resp.json().await?; + let connections: Vec = json["nodes"] + .as_object() + .unwrap() + .iter() + .map(|h| { + let url = format!( + "{}://{}", + scheme, + h.1["http"]["publish_address"].as_str().unwrap() + ); + let url = Url::parse(&url).unwrap(); + Connection::new(url) + }) + .collect(); + self.conn_pool.reseed(connections); + } + + let connection = self.conn_pool.next(); + let request_builder = self.request_builder( + &connection, + method, + path, + headers, + query_string, + body, + timeout, + )?; let response = request_builder.send().await; match response { @@ -542,6 +608,9 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { false } + // NOOP + fn reseeding(&self) {} + // NOOP by default fn reseed(&self, _connection: Vec) {} } @@ -701,38 +770,46 @@ impl ConnectionPool for CloudConnectionPool { /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] -pub struct MultiNodeConnectionPool { +pub struct MultiNodeConnectionPool { inner: Arc>, - wait: Option, - strategy: TStrategy, + reseed_frequency: Option, + load_balancing_strategy: LoadBalancing, } #[derive(Debug, Clone)] pub struct MultiNodeConnectionPoolInner { + reseeding: bool, last_update: Option, connections: Vec, } -impl ConnectionPool for MultiNodeConnectionPool +impl ConnectionPool for MultiNodeConnectionPool where - TStrategy: Strategy + Clone, + LoadBalancing: LoadBalancingStrategy + Clone, { fn next(&self) -> Connection { let inner = self.inner.read().expect("lock poisoned"); - self.strategy.try_next(&inner.connections).unwrap() + self.load_balancing_strategy + .try_next(&inner.connections) + .unwrap() } fn reseedable(&self) -> bool { let inner = self.inner.read().expect("lock poisoned"); - let wait = match self.wait { + let reseed_frequency = match self.reseed_frequency { Some(wait) => wait, None => return false, }; let last_update_is_stale = inner .last_update .as_ref() - .map(|last_update| last_update.elapsed() > wait); - last_update_is_stale.unwrap_or(true) + .map(|last_update| last_update.elapsed() > reseed_frequency); + last_update_is_stale.unwrap_or(true) && !inner.reseeding + } + + fn reseeding(&self) { + let mut inner = self.inner.write().expect("Lock Poisoned"); + inner.reseeding = true } fn reseed(&self, mut connection: Vec) { @@ -740,31 +817,33 @@ where inner.last_update = Some(Instant::now()); inner.connections.clear(); inner.connections.append(&mut connection); + inner.reseeding = false; } } impl MultiNodeConnectionPool { /** Use a round-robin strategy for balancing traffic over the given set of nodes. */ - pub fn round_robin(urls: Vec, wait: Option) -> Self { + pub fn round_robin(urls: Vec, reseed_frequency: Option) -> Self { let connections = urls.into_iter().map(Connection::new).collect(); let inner: Arc> = Arc::new(RwLock::new(MultiNodeConnectionPoolInner { + reseeding: false, last_update: None, connections, })); - let strategy = RoundRobin::default(); + let load_balancing_strategy = RoundRobin::default(); Self { inner, - strategy, - wait, + load_balancing_strategy, + reseed_frequency, } } } /** The strategy selects an address from a given collection. */ -pub trait Strategy: Send + Sync + Debug { +pub trait LoadBalancingStrategy: Send + Sync + Debug { /** Try get the next connection. */ fn try_next<'a>(&self, connections: &'a [Connection]) -> Result; } @@ -783,7 +862,7 @@ impl Default for RoundRobin { } } -impl Strategy for RoundRobin { +impl LoadBalancingStrategy for RoundRobin { fn try_next<'a>(&self, connections: &'a [Connection]) -> Result { if connections.is_empty() { Err(crate::error::lib("Connection list empty")) From 4a26653afa36d9ce3be32095fcfe765f30959a3b Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Tue, 22 Sep 2020 20:53:18 -0700 Subject: [PATCH 06/11] Create constructors for static and sniffing node pool transports --- elasticsearch/src/http/transport.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 5224e4a8..4536393d 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -413,7 +413,7 @@ impl Transport { } /// Creates a new instance of a [Transport] configured with a - /// [StaticNodeListConnectionPool] + /// [MultiNodeConnectionPool] that does not refresh pub fn static_node_list(urls: Vec<&str>) -> Result { let urls: Vec = urls .iter() @@ -424,6 +424,23 @@ impl Transport { Ok(transport) } + /// Creates a new instance of a [Transport] configured with a + /// [MultiNodeConnectionPool] + /// + /// * `reseed_frequency` - frequency at which connections should be refreshed in seconds + pub fn sniffing_node_list( + urls: Vec<&str>, + reseed_frequency: Duration, + ) -> Result { + let urls: Vec = urls + .iter() + .map(|url| Url::parse(url)) + .collect::, _>>()?; + let conn_pool = MultiNodeConnectionPool::round_robin(urls, Some(reseed_frequency)); + let transport = TransportBuilder::new(conn_pool).build()?; + Ok(transport) + } + /// Creates a new instance of a [Transport] configured for use with /// [Elasticsearch service in Elastic Cloud](https://www.elastic.co/cloud/). /// From e77218f021982c34bed608e025b52385c8828e7e Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Wed, 23 Sep 2020 15:13:00 -0700 Subject: [PATCH 07/11] Introduce AtomicBool to prevent multiple reseeds across threads --- elasticsearch/src/http/transport.rs | 42 ++++++++++++++--------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 4536393d..f2b7d312 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -45,7 +45,7 @@ use std::{ fmt::Debug, io::{self, Write}, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, }, time::{Duration, Instant}, @@ -551,10 +551,6 @@ impl Transport { { // Threads will execute against old connection pool during reseed if self.conn_pool.reseedable() { - // Set as reseeding prevents another thread from attempting - // to reseed during es request and reseed - self.conn_pool.reseeding(); - let connection = self.conn_pool.next(); let scheme = &connection.url.scheme(); // Build node info request @@ -625,9 +621,6 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send { false } - // NOOP - fn reseeding(&self) {} - // NOOP by default fn reseed(&self, _connection: Vec) {} } @@ -791,11 +784,11 @@ pub struct MultiNodeConnectionPool { inner: Arc>, reseed_frequency: Option, load_balancing_strategy: LoadBalancing, + reseeding: Arc, } #[derive(Debug, Clone)] pub struct MultiNodeConnectionPoolInner { - reseeding: bool, last_update: Option, connections: Vec, } @@ -821,12 +814,17 @@ where .last_update .as_ref() .map(|last_update| last_update.elapsed() > reseed_frequency); - last_update_is_stale.unwrap_or(true) && !inner.reseeding - } + let reseedable = last_update_is_stale.unwrap_or(true); - fn reseeding(&self) { - let mut inner = self.inner.write().expect("Lock Poisoned"); - inner.reseeding = true + return if !reseedable { + false + } else { + // Check if refreshing is false if so, sets to true atomically and returns old value (false) meaning refreshable is true + // If refreshing is set to true, do nothing and return true, meaning refreshable is false + !self + .reseeding + .compare_and_swap(false, true, Ordering::Relaxed) + }; } fn reseed(&self, mut connection: Vec) { @@ -834,7 +832,7 @@ where inner.last_update = Some(Instant::now()); inner.connections.clear(); inner.connections.append(&mut connection); - inner.reseeding = false; + self.reseeding.store(false, Ordering::Relaxed); } } @@ -845,16 +843,17 @@ impl MultiNodeConnectionPool { let inner: Arc> = Arc::new(RwLock::new(MultiNodeConnectionPoolInner { - reseeding: false, last_update: None, connections, })); + let reseeding = Arc::new(AtomicBool::new(false)); let load_balancing_strategy = RoundRobin::default(); Self { inner, load_balancing_strategy, reseed_frequency, + reseeding, } } } @@ -901,6 +900,10 @@ pub mod tests { }; use regex::Regex; use std::time::{Duration, Instant}; + use std::{ + sync::atomic::Ordering, + time::{Duration, Instant}, + }; use url::Url; #[test] @@ -1071,8 +1074,6 @@ pub mod tests { .into_iter() .map(Connection::new) .collect(); - - connection_pool.reseeding(); connection_pool.reseed(connections); for _ in 0..10 { for expected in expected_addresses() { @@ -1083,9 +1084,7 @@ pub mod tests { } // Check connection pool not reseedable after reseed assert!(!connection_pool.reseedable()); - - let inner = connection_pool.inner.read().expect("lock poisoned"); - assert!(!inner.reseeding); + assert!(!connection_pool.reseeding.load(Ordering::Relaxed)); } #[test] @@ -1101,6 +1100,7 @@ pub mod tests { drop(inner); assert!(connection_pool.reseedable()); + assert!(connection_pool.reseeding.load(Ordering::Relaxed)); } #[test] From 3673fc3c44cd280f0804e94c0b5fd03d8822eac8 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Wed, 23 Sep 2020 23:32:39 -0700 Subject: [PATCH 08/11] Add regex parsing bound_address to URL * Style changes per code review --- elasticsearch/Cargo.toml | 2 +- elasticsearch/src/http/transport.rs | 84 +++++++++++++++++++++++------ elasticsearch/src/lib.rs | 3 ++ 3 files changed, 71 insertions(+), 18 deletions(-) diff --git a/elasticsearch/Cargo.toml b/elasticsearch/Cargo.toml index 337850e7..b016fe45 100644 --- a/elasticsearch/Cargo.toml +++ b/elasticsearch/Cargo.toml @@ -32,6 +32,7 @@ dyn-clone = "1" lazy_static = "1" percent-encoding = "2" reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] } +regex="1" url = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -47,7 +48,6 @@ http = "1" axum = "0.7" hyper = { version = "1", features = ["server", "http1"] } os_type = "2" -regex="1" #sysinfo = "0.31" textwrap = "0.16" tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] } diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index f2b7d312..a7b83e1f 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -38,6 +38,7 @@ use crate::{ use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine}; use bytes::BytesMut; use lazy_static::lazy_static; +use regex::Regex; use serde::Serialize; use serde_json::Value; use std::{ @@ -102,6 +103,10 @@ impl fmt::Display for BuildError { /// Default address to Elasticsearch running on `https://localhost:9200` pub static DEFAULT_ADDRESS: &str = "https://localhost:9200"; +lazy_static! { + static ref ADDRESS_REGEX: Regex = + Regex::new(r"((?P[^/]+)/)?(?P[^:]+|\[[\da-fA-F:\.]+\]):(?P\d+)$").unwrap(); +} lazy_static! { /// Client metadata header: service, language, transport, followed by additional information @@ -453,7 +458,7 @@ impl Transport { Ok(transport) } - pub fn request_builder( + fn request_builder( &self, connection: &Connection, method: Method, @@ -535,6 +540,28 @@ impl Transport { Ok(request_builder) } + fn parse_to_url(address: &str, scheme: &str) -> Result { + if address.is_empty() { + return Err(crate::error::lib("Bound Address is empty")); + } + + let matches = ADDRESS_REGEX + .captures(address) + .ok_or_else(|| crate::lib(format!("error parsing address into url: {}", address)))?; + + let host = matches + .name("fqdn") + .or_else(|| Some(matches.name("ip").unwrap())) + .unwrap() + .as_str() + .trim(); + let port = matches.name("port").unwrap().as_str().trim(); + + Ok(Url::parse( + format!("{}://{}:{}", scheme, host, port).as_str(), + )?) + } + /// Creates an asynchronous request that can be awaited pub async fn send( &self, @@ -557,7 +584,7 @@ impl Transport { let node_request = self.request_builder( &connection, Method::Get, - "_nodes/_all/http", + "_nodes/http?filter_path=nodes.*.http", headers.clone(), None::<&Q>, None::, @@ -570,12 +597,17 @@ impl Transport { .unwrap() .iter() .map(|h| { - let url = format!( - "{}://{}", - scheme, - h.1["http"]["publish_address"].as_str().unwrap() - ); - let url = Url::parse(&url).unwrap(); + let address = h.1["http"]["publish_address"] + .as_str() + .or_else(|| { + Some( + h.1["http"]["bound_address"].as_array().unwrap()[0] + .as_str() + .unwrap(), + ) + }) + .unwrap(); + let url = Self::parse_to_url(address, scheme).unwrap(); Connection::new(url) }) .collect(); @@ -780,10 +812,10 @@ impl ConnectionPool for CloudConnectionPool { /// A Connection Pool that manages a static connection of nodes #[derive(Debug, Clone)] -pub struct MultiNodeConnectionPool { +pub struct MultiNodeConnectionPool { inner: Arc>, reseed_frequency: Option, - load_balancing_strategy: LoadBalancing, + load_balancing_strategy: ConnSelector, reseeding: Arc, } @@ -793,9 +825,9 @@ pub struct MultiNodeConnectionPoolInner { connections: Vec, } -impl ConnectionPool for MultiNodeConnectionPool +impl ConnectionPool for MultiNodeConnectionPool where - LoadBalancing: LoadBalancingStrategy + Clone, + ConnSelector: ConnectionSelector + Clone, { fn next(&self) -> Connection { let inner = self.inner.read().expect("lock poisoned"); @@ -859,9 +891,9 @@ impl MultiNodeConnectionPool { } /** The strategy selects an address from a given collection. */ -pub trait LoadBalancingStrategy: Send + Sync + Debug { +pub trait ConnectionSelector: Send + Sync + Debug { /** Try get the next connection. */ - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result; + fn try_next(&self, connections: &[Connection]) -> Result; } /** A round-robin strategy cycles through nodes sequentially. */ @@ -878,8 +910,8 @@ impl Default for RoundRobin { } } -impl LoadBalancingStrategy for RoundRobin { - fn try_next<'a>(&self, connections: &'a [Connection]) -> Result { +impl ConnectionSelector for RoundRobin { + fn try_next(&self, connections: &[Connection]) -> Result { if connections.is_empty() { Err(crate::error::lib("Connection list empty")) } else { @@ -896,7 +928,7 @@ pub mod tests { use crate::auth::ClientCertificate; use crate::http::transport::{ CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, SingleNodeConnectionPool, - TransportBuilder, + Transport, TransportBuilder, }; use regex::Regex; use std::time::{Duration, Instant}; @@ -942,6 +974,24 @@ pub mod tests { ); } + #[test] + fn test_url_parsing_where_hostname_and_ip_present() { + let url = Transport::parse_to_url("localhost/127.0.0.1:9200", "http").unwrap(); + assert_eq!(url.into_string(), "http://localhost:9200/"); + } + + #[test] + fn test_url_parsing_where_only_ip_present() { + let url = Transport::parse_to_url("127.0.0.1:9200", "http").unwrap(); + assert_eq!(url.into_string(), "http://127.0.0.1:9200/"); + } + + #[test] + fn test_url_parsing_where_only_hostname_present() { + let url = Transport::parse_to_url("localhost:9200", "http").unwrap(); + assert_eq!(url.into_string(), "http://localhost:9200/"); + } + #[test] fn can_parse_cloud_id_without_kibana_uuid() { let base64 = diff --git a/elasticsearch/src/lib.rs b/elasticsearch/src/lib.rs index 326ea282..f9e2cd80 100644 --- a/elasticsearch/src/lib.rs +++ b/elasticsearch/src/lib.rs @@ -370,6 +370,9 @@ mod readme { #[macro_use] extern crate dyn_clone; +#[macro_use] +extern crate lazy_static; + pub mod auth; pub mod cert; pub mod http; From d595547de03856bb4950b0e557886bbb99f17512 Mon Sep 17 00:00:00 2001 From: Stephen Leyva Date: Sat, 26 Sep 2020 14:49:12 -0700 Subject: [PATCH 09/11] Reseed connections on seperate thread --- elasticsearch/Cargo.toml | 2 +- elasticsearch/src/http/transport.rs | 66 +++++++++++++++++------------ 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/elasticsearch/Cargo.toml b/elasticsearch/Cargo.toml index b016fe45..fa2f531f 100644 --- a/elasticsearch/Cargo.toml +++ b/elasticsearch/Cargo.toml @@ -37,6 +37,7 @@ url = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" serde_with = "3" +tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] } void = "1" [dev-dependencies] @@ -50,7 +51,6 @@ hyper = { version = "1", features = ["server", "http1"] } os_type = "2" #sysinfo = "0.31" textwrap = "0.16" -tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] } xml-rs = "0.8" [build-dependencies] diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index a7b83e1f..797d43f0 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -49,6 +49,7 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, }, + thread::spawn, time::{Duration, Instant}, }; use url::Url; @@ -320,7 +321,7 @@ impl TransportBuilder { let client = client_builder.build()?; Ok(Transport { client, - conn_pool: self.conn_pool, + conn_pool: Arc::new(self.conn_pool), credentials: self.credentials, send_meta: self.meta_header, }) @@ -367,7 +368,7 @@ impl Connection { pub struct Transport { client: reqwest::Client, credentials: Option, - conn_pool: Box, + conn_pool: Arc>, send_meta: bool, } @@ -578,8 +579,9 @@ impl Transport { { // Threads will execute against old connection pool during reseed if self.conn_pool.reseedable() { - let connection = self.conn_pool.next(); - let scheme = &connection.url.scheme(); + let local_conn_pool = self.conn_pool.clone(); + let connection = local_conn_pool.next(); + // Build node info request let node_request = self.request_builder( &connection, @@ -590,28 +592,36 @@ impl Transport { None::, timeout, )?; - let resp = node_request.send().await?; - let json: Value = resp.json().await?; - let connections: Vec = json["nodes"] - .as_object() - .unwrap() - .iter() - .map(|h| { - let address = h.1["http"]["publish_address"] - .as_str() - .or_else(|| { - Some( - h.1["http"]["bound_address"].as_array().unwrap()[0] - .as_str() - .unwrap(), - ) + + spawn(move || { + // TODO: Log reseed failures + let mut rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime"); + rt.block_on(async { + let scheme = connection.url.scheme(); + let resp = node_request.send().await.unwrap(); + let json: Value = resp.json().await.unwrap(); + let connections: Vec = json["nodes"] + .as_object() + .unwrap() + .iter() + .map(|h| { + let address = h.1["http"]["publish_address"] + .as_str() + .or_else(|| { + Some( + h.1["http"]["bound_address"].as_array().unwrap()[0] + .as_str() + .unwrap(), + ) + }) + .unwrap(); + let url = Self::parse_to_url(address, scheme).unwrap(); + Connection::new(url) }) - .unwrap(); - let url = Self::parse_to_url(address, scheme).unwrap(); - Connection::new(url) + .collect(); + local_conn_pool.reseed(connections); }) - .collect(); - self.conn_pool.reseed(connections); + }); } let connection = self.conn_pool.next(); @@ -815,7 +825,7 @@ impl ConnectionPool for CloudConnectionPool { pub struct MultiNodeConnectionPool { inner: Arc>, reseed_frequency: Option, - load_balancing_strategy: ConnSelector, + connection_selector: ConnSelector, reseeding: Arc, } @@ -831,7 +841,7 @@ where { fn next(&self) -> Connection { let inner = self.inner.read().expect("lock poisoned"); - self.load_balancing_strategy + self.connection_selector .try_next(&inner.connections) .unwrap() } @@ -880,10 +890,10 @@ impl MultiNodeConnectionPool { })); let reseeding = Arc::new(AtomicBool::new(false)); - let load_balancing_strategy = RoundRobin::default(); + let connection_selector = RoundRobin::default(); Self { inner, - load_balancing_strategy, + connection_selector, reseed_frequency, reseeding, } From 8354172594932deafb2e9422ac9eaca2b828273a Mon Sep 17 00:00:00 2001 From: Tom Milligan Date: Mon, 22 Nov 2021 16:05:13 +0000 Subject: [PATCH 10/11] [fixup] clippy, deprecations, imports from rebase --- elasticsearch/src/http/transport.rs | 18 +++++++++--------- elasticsearch/src/lib.rs | 3 --- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 797d43f0..4eb6ca63 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -595,7 +595,7 @@ impl Transport { spawn(move || { // TODO: Log reseed failures - let mut rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime"); + let rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime"); rt.block_on(async { let scheme = connection.url.scheme(); let resp = node_request.send().await.unwrap(); @@ -865,7 +865,10 @@ where // If refreshing is set to true, do nothing and return true, meaning refreshable is false !self .reseeding - .compare_and_swap(false, true, Ordering::Relaxed) + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + // This can be replaced with `.into_ok_or_err` once stable. + // https://doc.rust-lang.org/std/result/enum.Result.html#method.into_ok_or_err + .unwrap_or(true) }; } @@ -941,11 +944,8 @@ pub mod tests { Transport, TransportBuilder, }; use regex::Regex; + use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; - use std::{ - sync::atomic::Ordering, - time::{Duration, Instant}, - }; use url::Url; #[test] @@ -987,19 +987,19 @@ pub mod tests { #[test] fn test_url_parsing_where_hostname_and_ip_present() { let url = Transport::parse_to_url("localhost/127.0.0.1:9200", "http").unwrap(); - assert_eq!(url.into_string(), "http://localhost:9200/"); + assert_eq!(url, Url::parse("http://localhost:9200/").unwrap()); } #[test] fn test_url_parsing_where_only_ip_present() { let url = Transport::parse_to_url("127.0.0.1:9200", "http").unwrap(); - assert_eq!(url.into_string(), "http://127.0.0.1:9200/"); + assert_eq!(url, Url::parse("http://127.0.0.1:9200/").unwrap()); } #[test] fn test_url_parsing_where_only_hostname_present() { let url = Transport::parse_to_url("localhost:9200", "http").unwrap(); - assert_eq!(url.into_string(), "http://localhost:9200/"); + assert_eq!(url, Url::parse("http://localhost:9200/").unwrap()); } #[test] diff --git a/elasticsearch/src/lib.rs b/elasticsearch/src/lib.rs index f9e2cd80..326ea282 100644 --- a/elasticsearch/src/lib.rs +++ b/elasticsearch/src/lib.rs @@ -370,9 +370,6 @@ mod readme { #[macro_use] extern crate dyn_clone; -#[macro_use] -extern crate lazy_static; - pub mod auth; pub mod cert; pub mod http; From 2f64bea73fd9692fc8a9c6da35f7c45fd3e7a1ef Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Fri, 23 Aug 2024 18:38:00 +0200 Subject: [PATCH 11/11] Simplify address resolution, spawn reseed as a Tokio task --- elasticsearch/Cargo.toml | 2 +- elasticsearch/src/http/transport.rs | 90 +++++++++++++---------------- 2 files changed, 42 insertions(+), 50 deletions(-) diff --git a/elasticsearch/Cargo.toml b/elasticsearch/Cargo.toml index fa2f531f..7926702a 100644 --- a/elasticsearch/Cargo.toml +++ b/elasticsearch/Cargo.toml @@ -32,7 +32,6 @@ dyn-clone = "1" lazy_static = "1" percent-encoding = "2" reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] } -regex="1" url = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -49,6 +48,7 @@ http = "1" axum = "0.7" hyper = { version = "1", features = ["server", "http1"] } os_type = "2" +regex="1" #sysinfo = "0.31" textwrap = "0.16" xml-rs = "0.8" diff --git a/elasticsearch/src/http/transport.rs b/elasticsearch/src/http/transport.rs index 4eb6ca63..22100fe9 100644 --- a/elasticsearch/src/http/transport.rs +++ b/elasticsearch/src/http/transport.rs @@ -38,7 +38,6 @@ use crate::{ use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine}; use bytes::BytesMut; use lazy_static::lazy_static; -use regex::Regex; use serde::Serialize; use serde_json::Value; use std::{ @@ -49,7 +48,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, }, - thread::spawn, time::{Duration, Instant}, }; use url::Url; @@ -104,10 +102,6 @@ impl fmt::Display for BuildError { /// Default address to Elasticsearch running on `https://localhost:9200` pub static DEFAULT_ADDRESS: &str = "https://localhost:9200"; -lazy_static! { - static ref ADDRESS_REGEX: Regex = - Regex::new(r"((?P[^/]+)/)?(?P[^:]+|\[[\da-fA-F:\.]+\]):(?P\d+)$").unwrap(); -} lazy_static! { /// Client metadata header: service, language, transport, followed by additional information @@ -459,6 +453,7 @@ impl Transport { Ok(transport) } + #[allow(clippy::too_many_arguments)] fn request_builder( &self, connection: &Connection, @@ -546,17 +541,18 @@ impl Transport { return Err(crate::error::lib("Bound Address is empty")); } - let matches = ADDRESS_REGEX - .captures(address) - .ok_or_else(|| crate::lib(format!("error parsing address into url: {}", address)))?; + let mut host_port = None; + if let Some((host, tail)) = address.split_once('/') { + if let Some((_, port)) = tail.rsplit_once(':') { + host_port = Some((host, port)); + } + } else { + host_port = address.rsplit_once(':'); + } - let host = matches - .name("fqdn") - .or_else(|| Some(matches.name("ip").unwrap())) - .unwrap() - .as_str() - .trim(); - let port = matches.name("port").unwrap().as_str().trim(); + let (host, port) = host_port.ok_or_else(|| { + crate::error::lib(format!("error parsing address into url: {}", address)) + })?; Ok(Url::parse( format!("{}://{}:{}", scheme, host, port).as_str(), @@ -577,10 +573,10 @@ impl Transport { B: Body, Q: Serialize + ?Sized, { - // Threads will execute against old connection pool during reseed + // Requests will execute against old connection pool during reseed if self.conn_pool.reseedable() { - let local_conn_pool = self.conn_pool.clone(); - let connection = local_conn_pool.next(); + let conn_pool = self.conn_pool.clone(); + let connection = conn_pool.next(); // Build node info request let node_request = self.request_builder( @@ -593,34 +589,30 @@ impl Transport { timeout, )?; - spawn(move || { - // TODO: Log reseed failures - let rt = tokio::runtime::Runtime::new().expect("Cannot create tokio runtime"); - rt.block_on(async { - let scheme = connection.url.scheme(); - let resp = node_request.send().await.unwrap(); - let json: Value = resp.json().await.unwrap(); - let connections: Vec = json["nodes"] - .as_object() - .unwrap() - .iter() - .map(|h| { - let address = h.1["http"]["publish_address"] - .as_str() - .or_else(|| { - Some( - h.1["http"]["bound_address"].as_array().unwrap()[0] - .as_str() - .unwrap(), - ) - }) - .unwrap(); - let url = Self::parse_to_url(address, scheme).unwrap(); - Connection::new(url) - }) - .collect(); - local_conn_pool.reseed(connections); - }) + tokio::spawn(async move { + let scheme = connection.url.scheme(); + let resp = node_request.send().await.unwrap(); + let json: Value = resp.json().await.unwrap(); + let connections: Vec = json["nodes"] + .as_object() + .unwrap() + .iter() + .map(|(_, node)| { + let address = node["http"]["publish_address"] + .as_str() + .or_else(|| { + Some( + node["http"]["bound_address"].as_array().unwrap()[0] + .as_str() + .unwrap(), + ) + }) + .unwrap(); + let url = Self::parse_to_url(address, scheme).unwrap(); + Connection::new(url) + }) + .collect(); + conn_pool.reseed(connections); }); } @@ -858,7 +850,7 @@ where .map(|last_update| last_update.elapsed() > reseed_frequency); let reseedable = last_update_is_stale.unwrap_or(true); - return if !reseedable { + if !reseedable { false } else { // Check if refreshing is false if so, sets to true atomically and returns old value (false) meaning refreshable is true @@ -869,7 +861,7 @@ where // This can be replaced with `.into_ok_or_err` once stable. // https://doc.rust-lang.org/std/result/enum.Result.html#method.into_ok_or_err .unwrap_or(true) - }; + } } fn reseed(&self, mut connection: Vec) {