Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better observer namenode support #149

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/minidfs/src/main/java/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public static void main(String args[]) throws Exception {
dfs.transitionToActive(2);
} else if (flags.contains("ha")) {
activeNamenode = 2;
// dfs.transitionToObserver(1);
dfs.transitionToObserver(1);
dfs.transitionToActive(activeNamenode);
}

Expand Down
48 changes: 34 additions & 14 deletions rust/src/hdfs/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("getFileInfo", message.encode_length_delimited_to_vec())
.call(
"getFileInfo",
message.encode_length_delimited_to_vec(),
false,
)
.await?;

let decoded = hdfs::GetFileInfoResponseProto::decode_length_delimited(response)?;
Expand All @@ -80,7 +84,11 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("getListing", message.encode_length_delimited_to_vec())
.call(
"getListing",
message.encode_length_delimited_to_vec(),
false,
)
.await?;

let decoded = hdfs::GetListingResponseProto::decode_length_delimited(response)?;
Expand All @@ -103,6 +111,7 @@ impl NamenodeProtocol {
.call(
"getLocatedFileInfo",
message.encode_length_delimited_to_vec(),
false,
)
.await?;

Expand All @@ -119,6 +128,7 @@ impl NamenodeProtocol {
.call(
"getServerDefaults",
message.encode_length_delimited_to_vec(),
false,
)
.await?;

Expand Down Expand Up @@ -148,6 +158,7 @@ impl NamenodeProtocol {
.call(
"getDataEncryptionKey",
message.encode_length_delimited_to_vec(),
false,
)
.await?;

Expand Down Expand Up @@ -212,7 +223,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("create", message.encode_length_delimited_to_vec())
.call("create", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::CreateResponseProto::decode_length_delimited(response)?;
Expand Down Expand Up @@ -240,7 +251,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("append", message.encode_length_delimited_to_vec())
.call("append", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::AppendResponseProto::decode_length_delimited(response)?;
Expand All @@ -266,7 +277,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("addBlock", message.encode_length_delimited_to_vec())
.call("addBlock", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::AddBlockResponseProto::decode_length_delimited(response)?;
Expand All @@ -290,7 +301,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("complete", message.encode_length_delimited_to_vec())
.call("complete", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::CompleteResponseProto::decode_length_delimited(response)?;
Expand All @@ -316,7 +327,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("mkdirs", message.encode_length_delimited_to_vec())
.call("mkdirs", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::MkdirsResponseProto::decode_length_delimited(response)?;
Expand All @@ -340,7 +351,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("rename2", message.encode_length_delimited_to_vec())
.call("rename2", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::Rename2ResponseProto::decode_length_delimited(response)?;
Expand All @@ -361,7 +372,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("delete", message.encode_length_delimited_to_vec())
.call("delete", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::DeleteResponseProto::decode_length_delimited(response)?;
Expand All @@ -381,7 +392,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("renewLease", message.encode_length_delimited_to_vec())
.call("renewLease", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::RenewLeaseResponseProto::decode_length_delimited(response)?;
Expand All @@ -404,7 +415,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("setTimes", message.encode_length_delimited_to_vec())
.call("setTimes", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::SetTimesResponseProto::decode_length_delimited(response)?;
Expand All @@ -428,7 +439,7 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("setOwner", message.encode_length_delimited_to_vec())
.call("setOwner", message.encode_length_delimited_to_vec(), true)
.await?;

let decoded = hdfs::SetOwnerResponseProto::decode_length_delimited(response)?;
Expand All @@ -450,7 +461,11 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("setPermission", message.encode_length_delimited_to_vec())
.call(
"setPermission",
message.encode_length_delimited_to_vec(),
true,
)
.await?;

let decoded = hdfs::SetPermissionResponseProto::decode_length_delimited(response)?;
Expand All @@ -472,7 +487,11 @@ impl NamenodeProtocol {

let response = self
.proxy
.call("setReplication", message.encode_length_delimited_to_vec())
.call(
"setReplication",
message.encode_length_delimited_to_vec(),
true,
)
.await?;

let decoded = hdfs::SetReplicationResponseProto::decode_length_delimited(response)?;
Expand All @@ -495,6 +514,7 @@ impl NamenodeProtocol {
.call(
"getContentSummary",
message.encode_length_delimited_to_vec(),
false,
)
.await?;

Expand Down
89 changes: 73 additions & 16 deletions rust/src/hdfs/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
use std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
};

use bytes::Bytes;
Expand Down Expand Up @@ -73,7 +76,8 @@ impl ProxyConnection {
#[derive(Debug)]
pub(crate) struct NameServiceProxy {
proxy_connections: Vec<ProxyConnection>,
current_index: AtomicUsize,
current_active: AtomicUsize,
current_observers: Arc<Mutex<HashSet<usize>>>,
msycned: AtomicBool,
}

Expand Down Expand Up @@ -101,15 +105,16 @@ impl NameServiceProxy {

Ok(NameServiceProxy {
proxy_connections,
current_index: AtomicUsize::new(0),
current_active: AtomicUsize::new(0),
current_observers: Arc::new(Mutex::new(HashSet::new())),
msycned: AtomicBool::new(false),
})
}

async fn msync_if_needed(&self) -> Result<()> {
if !self.msycned.fetch_or(true, Ordering::SeqCst) {
async fn msync_if_needed(&self, write: bool) -> Result<()> {
if !self.msycned.fetch_or(true, Ordering::SeqCst) && !write {
let msync_msg = hdfs::MsyncRequestProto::default();
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec())
self.call_inner("msync", msync_msg.encode_length_delimited_to_vec(), true)
.await
.map(|_| ())
.or_else(|err| match err {
Expand All @@ -125,26 +130,64 @@ impl NameServiceProxy {
Ok(())
}

pub(crate) async fn call(&self, method_name: &'static str, message: Vec<u8>) -> Result<Bytes> {
self.msync_if_needed().await?;
self.call_inner(method_name, message).await
pub(crate) async fn call(
&self,
method_name: &'static str,
message: Vec<u8>,
write: bool,
) -> Result<Bytes> {
self.msync_if_needed(write).await?;
self.call_inner(method_name, message, write).await
}

fn is_retriable(exception: &str) -> bool {
exception == STANDBY_EXCEPTION || exception == OBSERVER_RETRY_EXCEPTION
}

async fn call_inner(&self, method_name: &'static str, message: Vec<u8>) -> Result<Bytes> {
let mut proxy_index = self.current_index.load(Ordering::SeqCst);
async fn call_inner(
&self,
method_name: &'static str,
message: Vec<u8>,
write: bool,
) -> Result<Bytes> {
let current_active = self.current_active.load(Ordering::SeqCst);
let proxy_indices = if write {
// If we're writing, try the current known active and then loop
// through the rest if that fails
let first = current_active;
let rest = (0..self.proxy_connections.len()).filter(|i| *i != first);
[first].into_iter().chain(rest).collect::<Vec<_>>()
} else {
// If we're reading, try all known observers, then the active, then
// any remaining
let mut first = self
.current_observers
.lock()
.unwrap()
.iter()
.copied()
.collect::<Vec<_>>();
if !first.contains(&current_active) {
first.push(current_active);
}
let rest = (0..self.proxy_connections.len()).filter(|i| !first.contains(i));
first.iter().copied().chain(rest).collect()
};

let mut attempts = 0;
loop {
let proxy_index = proxy_indices[attempts];
let result = self.proxy_connections[proxy_index]
.call(method_name, &message)
.await;

match result {
Ok(bytes) => {
self.current_index.store(proxy_index, Ordering::SeqCst);
if write {
self.current_active.store(proxy_index, Ordering::SeqCst);
} else {
self.current_observers.lock().unwrap().insert(proxy_index);
}
return Ok(bytes);
}
// RPCError indicates the call was successfully attempted but had an error, so should be returned immediately
Expand All @@ -153,13 +196,27 @@ impl NameServiceProxy {
}
Err(_) if attempts >= self.proxy_connections.len() - 1 => return result,
// Retriable error, do nothing and try the next connection
Err(HdfsError::RPCError(_, _)) => (),
Err(HdfsError::RPCError(exception, _))
| Err(HdfsError::FatalRPCError(exception, _))
if Self::is_retriable(&exception) =>
{
match exception.as_ref() {
OBSERVER_RETRY_EXCEPTION => {
self.current_observers.lock().unwrap().insert(proxy_index);
}
STANDBY_EXCEPTION => {
self.current_observers.lock().unwrap().remove(&proxy_index);
}
_ => (),
}
}
Err(e) => {
// Some other error, we will retry but log the error
self.current_observers.lock().unwrap().remove(&proxy_index);
warn!("{:?}", e);
}
}

proxy_index = (proxy_index + 1) % self.proxy_connections.len();
attempts += 1;
}
}
Expand Down
7 changes: 1 addition & 6 deletions rust/src/security/sasl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use cipher::{KeyIvInit, StreamCipher};
use log::{debug, warn};
use log::debug;
use prost::Message;
use std::io;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -262,11 +262,6 @@ impl SaslReader {
));
}
RpcStatusProto::Fatal => {
warn!(
"RPC fatal error: {}: {}",
rpc_response.exception_class_name(),
rpc_response.error_msg()
);
return Err(HdfsError::FatalRPCError(
rpc_response.exception_class_name().to_string(),
rpc_response.error_msg().to_string(),
Expand Down
Loading