From 3656ef72ba7ddb4fafc07b8380b918df19b038e2 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 2 Nov 2024 13:02:57 -0400 Subject: [PATCH 1/4] Loosen locking around RPC calls for better multi-threading --- rust/src/hdfs/connection.rs | 8 +++-- rust/src/hdfs/proxy.rs | 61 +++++++++++++++++-------------------- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/rust/src/hdfs/connection.rs b/rust/src/hdfs/connection.rs index 861f58e..2005e14 100644 --- a/rust/src/hdfs/connection.rs +++ b/rust/src/hdfs/connection.rs @@ -260,7 +260,11 @@ impl RpcConnection { Ok(()) } - pub(crate) async fn call(&self, method_name: &str, message: &[u8]) -> Result { + pub(crate) async fn call( + &self, + method_name: &str, + message: &[u8], + ) -> Result>> { let call_id = self.get_next_call_id(); let conn_header = self.get_connection_header(call_id, 0); @@ -284,7 +288,7 @@ impl RpcConnection { self.write_messages(&[&conn_header_buf, &header_buf, message]) .await?; - receiver.await.unwrap() + Ok(receiver) } } diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index 1765e79..fb2af23 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -24,7 +24,7 @@ const OBSERVER_RETRY_EXCEPTION: &str = "org.apache.hadoop.ipc.ObserverRetryOnAct #[derive(Debug)] struct ProxyConnection { url: String, - inner: Option, + inner: Arc>>, alignment_context: Arc>, nameservice: Option, } @@ -37,37 +37,42 @@ impl ProxyConnection { ) -> Self { ProxyConnection { url, - inner: None, + inner: Arc::new(tokio::sync::Mutex::new(None)), alignment_context, nameservice, } } - async fn get_connection(&mut self) -> Result<&RpcConnection> { - if self.inner.is_none() || !self.inner.as_ref().unwrap().is_alive() { - self.inner = Some( - RpcConnection::connect( - &self.url, - self.alignment_context.clone(), - self.nameservice.as_deref(), - ) - .await?, - ); - } - Ok(self.inner.as_ref().unwrap()) - } + async fn call(&self, method_name: &str, message: &[u8]) -> Result { + let receiver = { + let mut connection = self.inner.lock().await; + match &mut *connection { + Some(c) if c.is_alive() => (), + c => { + *c = Some( + RpcConnection::connect( + &self.url, + self.alignment_context.clone(), + self.nameservice.as_deref(), + ) + .await?, + ); + } + } - async fn call(&mut self, method_name: &str, message: &[u8]) -> Result { - self.get_connection() - .await? - .call(method_name, message) - .await + connection + .as_ref() + .unwrap() + .call(method_name, message) + .await? + }; + receiver.await.unwrap() } } #[derive(Debug)] pub(crate) struct NameServiceProxy { - proxy_connections: Vec>>, + proxy_connections: Vec, current_index: AtomicUsize, msycned: AtomicBool, } @@ -80,22 +85,14 @@ impl NameServiceProxy { let proxy_connections = if let Some(port) = nameservice.port() { let url = format!("{}:{}", nameservice.host_str().unwrap(), port); - vec![Arc::new(tokio::sync::Mutex::new(ProxyConnection::new( - url, - alignment_context.clone(), - None, - )))] + vec![ProxyConnection::new(url, alignment_context.clone(), None)] } else if let Some(host) = nameservice.host_str() { // TODO: Add check for no configured namenodes config .get_urls_for_nameservice(host)? .into_iter() .map(|url| { - Arc::new(tokio::sync::Mutex::new(ProxyConnection::new( - url, - alignment_context.clone(), - Some(host.to_string()), - ))) + ProxyConnection::new(url, alignment_context.clone(), Some(host.to_string())) }) .collect() } else { @@ -142,8 +139,6 @@ impl NameServiceProxy { let mut attempts = 0; loop { let result = self.proxy_connections[proxy_index] - .lock() - .await .call(method_name, &message) .await; From 02e559c31ad34058d52ddc55174ff5470f6a0827 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 2 Nov 2024 13:43:27 -0400 Subject: [PATCH 2/4] Specifically track known observer nodes and read only RPC calls --- rust/src/hdfs/protocol.rs | 48 +++++++++++++++++------- rust/src/hdfs/proxy.rs | 79 +++++++++++++++++++++++++++++++-------- 2 files changed, 97 insertions(+), 30 deletions(-) diff --git a/rust/src/hdfs/protocol.rs b/rust/src/hdfs/protocol.rs index a5f0fda..30d2965 100644 --- a/rust/src/hdfs/protocol.rs +++ b/rust/src/hdfs/protocol.rs @@ -56,7 +56,11 @@ impl NamenodeProtocol { let response = self .proxy - .call("getFileInfo", message.encode_length_delimited_to_vec()) + .call( + "getFileInfo", + message.encode_length_delimited_to_vec(), + false, + ) .await?; let decoded = hdfs::GetFileInfoResponseProto::decode_length_delimited(response)?; @@ -80,7 +84,11 @@ impl NamenodeProtocol { let response = self .proxy - .call("getListing", message.encode_length_delimited_to_vec()) + .call( + "getListing", + message.encode_length_delimited_to_vec(), + false, + ) .await?; let decoded = hdfs::GetListingResponseProto::decode_length_delimited(response)?; @@ -103,6 +111,7 @@ impl NamenodeProtocol { .call( "getLocatedFileInfo", message.encode_length_delimited_to_vec(), + false, ) .await?; @@ -119,6 +128,7 @@ impl NamenodeProtocol { .call( "getServerDefaults", message.encode_length_delimited_to_vec(), + false, ) .await?; @@ -148,6 +158,7 @@ impl NamenodeProtocol { .call( "getDataEncryptionKey", message.encode_length_delimited_to_vec(), + false, ) .await?; @@ -212,7 +223,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("create", message.encode_length_delimited_to_vec()) + .call("create", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::CreateResponseProto::decode_length_delimited(response)?; @@ -240,7 +251,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("append", message.encode_length_delimited_to_vec()) + .call("append", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::AppendResponseProto::decode_length_delimited(response)?; @@ -266,7 +277,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("addBlock", message.encode_length_delimited_to_vec()) + .call("addBlock", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::AddBlockResponseProto::decode_length_delimited(response)?; @@ -290,7 +301,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("complete", message.encode_length_delimited_to_vec()) + .call("complete", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::CompleteResponseProto::decode_length_delimited(response)?; @@ -316,7 +327,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("mkdirs", message.encode_length_delimited_to_vec()) + .call("mkdirs", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::MkdirsResponseProto::decode_length_delimited(response)?; @@ -340,7 +351,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("rename2", message.encode_length_delimited_to_vec()) + .call("rename2", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::Rename2ResponseProto::decode_length_delimited(response)?; @@ -361,7 +372,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("delete", message.encode_length_delimited_to_vec()) + .call("delete", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::DeleteResponseProto::decode_length_delimited(response)?; @@ -381,7 +392,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("renewLease", message.encode_length_delimited_to_vec()) + .call("renewLease", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::RenewLeaseResponseProto::decode_length_delimited(response)?; @@ -404,7 +415,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("setTimes", message.encode_length_delimited_to_vec()) + .call("setTimes", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::SetTimesResponseProto::decode_length_delimited(response)?; @@ -428,7 +439,7 @@ impl NamenodeProtocol { let response = self .proxy - .call("setOwner", message.encode_length_delimited_to_vec()) + .call("setOwner", message.encode_length_delimited_to_vec(), true) .await?; let decoded = hdfs::SetOwnerResponseProto::decode_length_delimited(response)?; @@ -450,7 +461,11 @@ impl NamenodeProtocol { let response = self .proxy - .call("setPermission", message.encode_length_delimited_to_vec()) + .call( + "setPermission", + message.encode_length_delimited_to_vec(), + true, + ) .await?; let decoded = hdfs::SetPermissionResponseProto::decode_length_delimited(response)?; @@ -472,7 +487,11 @@ impl NamenodeProtocol { let response = self .proxy - .call("setReplication", message.encode_length_delimited_to_vec()) + .call( + "setReplication", + message.encode_length_delimited_to_vec(), + true, + ) .await?; let decoded = hdfs::SetReplicationResponseProto::decode_length_delimited(response)?; @@ -495,6 +514,7 @@ impl NamenodeProtocol { .call( "getContentSummary", message.encode_length_delimited_to_vec(), + false, ) .await?; diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index fb2af23..40d09db 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -1,6 +1,9 @@ -use std::sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use bytes::Bytes; @@ -73,7 +76,8 @@ impl ProxyConnection { #[derive(Debug)] pub(crate) struct NameServiceProxy { proxy_connections: Vec, - current_index: AtomicUsize, + current_active: AtomicUsize, + current_observers: Arc>>, msycned: AtomicBool, } @@ -101,15 +105,16 @@ impl NameServiceProxy { Ok(NameServiceProxy { proxy_connections, - current_index: AtomicUsize::new(0), + current_active: AtomicUsize::new(0), + current_observers: Arc::new(Mutex::new(HashSet::new())), msycned: AtomicBool::new(false), }) } - async fn msync_if_needed(&self) -> Result<()> { - if !self.msycned.fetch_or(true, Ordering::SeqCst) { + async fn msync_if_needed(&self, write: bool) -> Result<()> { + if !self.msycned.fetch_or(true, Ordering::SeqCst) && !write { let msync_msg = hdfs::MsyncRequestProto::default(); - self.call_inner("msync", msync_msg.encode_length_delimited_to_vec()) + self.call_inner("msync", msync_msg.encode_length_delimited_to_vec(), false) .await .map(|_| ()) .or_else(|err| match err { @@ -125,26 +130,60 @@ impl NameServiceProxy { Ok(()) } - pub(crate) async fn call(&self, method_name: &'static str, message: Vec) -> Result { - self.msync_if_needed().await?; - self.call_inner(method_name, message).await + pub(crate) async fn call( + &self, + method_name: &'static str, + message: Vec, + write: bool, + ) -> Result { + self.msync_if_needed(write).await?; + self.call_inner(method_name, message, write).await } fn is_retriable(exception: &str) -> bool { exception == STANDBY_EXCEPTION || exception == OBSERVER_RETRY_EXCEPTION } - async fn call_inner(&self, method_name: &'static str, message: Vec) -> Result { - let mut proxy_index = self.current_index.load(Ordering::SeqCst); + async fn call_inner( + &self, + method_name: &'static str, + message: Vec, + write: bool, + ) -> Result { + let current_active = self.current_active.load(Ordering::SeqCst); + let proxy_indices = if write { + // If we're writing, try the current known active and then loop + // through the rest if that fails + let first = current_active; + let rest = (0..self.proxy_connections.len()) + .filter(|i| *i != first) + .collect::>(); + [vec![first], rest].concat() + } else { + // If we're reading, try all known observers, then the active, then + // any remaining + let mut first = self.current_observers.lock().unwrap().clone(); + if !first.contains(¤t_active) { + first.insert(current_active); + } + let rest = (0..self.proxy_connections.len()).filter(|i| !first.contains(i)); + first.iter().copied().chain(rest).collect() + }; + let mut attempts = 0; loop { + let proxy_index = proxy_indices[attempts]; let result = self.proxy_connections[proxy_index] .call(method_name, &message) .await; match result { Ok(bytes) => { - self.current_index.store(proxy_index, Ordering::SeqCst); + if write { + self.current_active.store(proxy_index, Ordering::SeqCst); + } else { + self.current_observers.lock().unwrap().insert(proxy_index); + } return Ok(bytes); } // RPCError indicates the call was successfully attempted but had an error, so should be returned immediately @@ -153,13 +192,21 @@ impl NameServiceProxy { } Err(_) if attempts >= self.proxy_connections.len() - 1 => return result, // Retriable error, do nothing and try the next connection - Err(HdfsError::RPCError(_, _)) => (), + Err(HdfsError::RPCError(exception, _)) => match exception.as_ref() { + OBSERVER_RETRY_EXCEPTION => { + self.current_observers.lock().unwrap().insert(proxy_index); + } + STANDBY_EXCEPTION => { + self.current_observers.lock().unwrap().remove(&proxy_index); + } + _ => (), + }, Err(e) => { + // Some other error, we will retry but log the error warn!("{:?}", e); } } - proxy_index = (proxy_index + 1) % self.proxy_connections.len(); attempts += 1; } } From ee9eb7f3edb12c5f26230c7b0c3719381eccfb46 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 2 Nov 2024 19:48:04 -0400 Subject: [PATCH 3/4] Fix observer handling --- rust/src/hdfs/proxy.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index 40d09db..980f02e 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -155,16 +155,20 @@ impl NameServiceProxy { // If we're writing, try the current known active and then loop // through the rest if that fails let first = current_active; - let rest = (0..self.proxy_connections.len()) - .filter(|i| *i != first) - .collect::>(); - [vec![first], rest].concat() + let rest = (0..self.proxy_connections.len()).filter(|i| *i != first); + [first].into_iter().chain(rest).collect::>() } else { // If we're reading, try all known observers, then the active, then // any remaining - let mut first = self.current_observers.lock().unwrap().clone(); + let mut first = self + .current_observers + .lock() + .unwrap() + .iter() + .copied() + .collect::>(); if !first.contains(¤t_active) { - first.insert(current_active); + first.push(current_active); } let rest = (0..self.proxy_connections.len()).filter(|i| !first.contains(i)); first.iter().copied().chain(rest).collect() From c8ecf3d8872b45ce26bb8c29a974a96ab0bca8b1 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 3 Nov 2024 11:12:00 -0500 Subject: [PATCH 4/4] Address tests --- rust/minidfs/src/main/java/main/Main.java | 2 +- rust/src/hdfs/proxy.rs | 24 ++++++++++++++--------- rust/src/security/sasl.rs | 7 +------ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/rust/minidfs/src/main/java/main/Main.java b/rust/minidfs/src/main/java/main/Main.java index ee27b67..24cd194 100644 --- a/rust/minidfs/src/main/java/main/Main.java +++ b/rust/minidfs/src/main/java/main/Main.java @@ -136,7 +136,7 @@ public static void main(String args[]) throws Exception { dfs.transitionToActive(2); } else if (flags.contains("ha")) { activeNamenode = 2; - // dfs.transitionToObserver(1); + dfs.transitionToObserver(1); dfs.transitionToActive(activeNamenode); } diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index 980f02e..3875ab3 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -114,7 +114,7 @@ impl NameServiceProxy { async fn msync_if_needed(&self, write: bool) -> Result<()> { if !self.msycned.fetch_or(true, Ordering::SeqCst) && !write { let msync_msg = hdfs::MsyncRequestProto::default(); - self.call_inner("msync", msync_msg.encode_length_delimited_to_vec(), false) + self.call_inner("msync", msync_msg.encode_length_delimited_to_vec(), true) .await .map(|_| ()) .or_else(|err| match err { @@ -196,17 +196,23 @@ impl NameServiceProxy { } Err(_) if attempts >= self.proxy_connections.len() - 1 => return result, // Retriable error, do nothing and try the next connection - Err(HdfsError::RPCError(exception, _)) => match exception.as_ref() { - OBSERVER_RETRY_EXCEPTION => { - self.current_observers.lock().unwrap().insert(proxy_index); - } - STANDBY_EXCEPTION => { - self.current_observers.lock().unwrap().remove(&proxy_index); + Err(HdfsError::RPCError(exception, _)) + | Err(HdfsError::FatalRPCError(exception, _)) + if Self::is_retriable(&exception) => + { + match exception.as_ref() { + OBSERVER_RETRY_EXCEPTION => { + self.current_observers.lock().unwrap().insert(proxy_index); + } + STANDBY_EXCEPTION => { + self.current_observers.lock().unwrap().remove(&proxy_index); + } + _ => (), } - _ => (), - }, + } Err(e) => { // Some other error, we will retry but log the error + self.current_observers.lock().unwrap().remove(&proxy_index); warn!("{:?}", e); } } diff --git a/rust/src/security/sasl.rs b/rust/src/security/sasl.rs index 7fe2746..5e81722 100644 --- a/rust/src/security/sasl.rs +++ b/rust/src/security/sasl.rs @@ -1,6 +1,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use cipher::{KeyIvInit, StreamCipher}; -use log::{debug, warn}; +use log::debug; use prost::Message; use std::io; use std::sync::{Arc, Mutex}; @@ -262,11 +262,6 @@ impl SaslReader { )); } RpcStatusProto::Fatal => { - warn!( - "RPC fatal error: {}: {}", - rpc_response.exception_class_name(), - rpc_response.error_msg() - ); return Err(HdfsError::FatalRPCError( rpc_response.exception_class_name().to_string(), rpc_response.error_msg().to_string(),