Skip to content

Commit

Permalink
Add support for encrypting data transfers with AES (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Apr 5, 2024
1 parent f8183c3 commit 0e90092
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 93 deletions.
3 changes: 3 additions & 0 deletions crates/hdfs-native/minidfs/src/main/java/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ public static void main(String args[]) throws Exception {
conf.set(HADOOP_RPC_PROTECTION, "privacy");
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
if (flags.contains("data_transfer_encryption")) {
// Force encryption for all connections
conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
}
if (flags.contains("aes")) {
conf.set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, "AES/CTR/NoPadding");
}
} else if (flags.contains("integrity")) {
Expand Down
14 changes: 4 additions & 10 deletions crates/hdfs-native/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,9 @@ impl DatanodeConnection {
.await?;
self.writer.flush().await?;

let msg_length = self.reader.read_length_delimiter().await?;
let message = self.reader.read_proto().await?;

let mut response_buf = BytesMut::zeroed(msg_length);
self.reader.read_exact(&mut response_buf).await?;

let response = hdfs::BlockOpResponseProto::decode(response_buf.freeze())?;
let response = hdfs::BlockOpResponseProto::decode(message)?;
Ok(response)
}

Expand Down Expand Up @@ -630,12 +627,9 @@ pub(crate) struct DatanodeReader {

impl DatanodeReader {
pub(crate) async fn read_ack(&mut self) -> Result<hdfs::PipelineAckProto> {
let ack_length = self.reader.read_length_delimiter().await?;

let mut response_buf = BytesMut::zeroed(ack_length);
self.reader.read_exact(&mut response_buf).await?;
let message = self.reader.read_proto().await?;

let response = hdfs::PipelineAckProto::decode(response_buf.freeze())?;
let response = hdfs::PipelineAckProto::decode(message)?;
Ok(response)
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/hdfs-native/src/minidfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum DfsFeatures {
Token,
Integrity,
Privacy,
AES,
HA,
ViewFS,
EC,
Expand All @@ -28,6 +29,7 @@ impl DfsFeatures {
DfsFeatures::Privacy => "privacy",
DfsFeatures::Security => "security",
DfsFeatures::Integrity => "integrity",
DfsFeatures::AES => "aes",
DfsFeatures::Token => "token",
DfsFeatures::RBF => "rbf",
}
Expand Down
8 changes: 8 additions & 0 deletions crates/hdfs-native/src/security/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ impl DigestSaslSession {
server: kis,
}
}

pub(crate) fn supports_encryption(&self) -> bool {
match &self.state {
DigestState::Stepped(ctx) => matches!(ctx.qop, Qop::AuthConf),
DigestState::Completed(ctx) => ctx.as_ref().is_some_and(|c| c.encryptor.is_some()),
_ => false,
}
}
}

impl SaslSession for DigestSaslSession {
Expand Down
Loading

0 comments on commit 0e90092

Please sign in to comment.