Skip to content

Commit

Permalink
Add a streaming hash implementation. (#65)
Browse files Browse the repository at this point in the history
Previously files were being hashed by loading them in memory entirely
first, then hashing the resulting buffer. When uploading large files,
this puts a lot of memory pressure on the server.

Thankfully the hash types provide `io::Write`, meaning implementing a
streaming hash is as straightforward as using `io::copy` from the input
into them.

A few adjecent changes, mostly about wrapping some code behind a
`spawn_blocking` call; a lot of our implementation is synchronous, and
calling it directly from the main thread could cause it to block. The
biggest offenders were the aforementioned hashing, and git operations.
  • Loading branch information
plietar authored Aug 29, 2024
1 parent 2631cdb commit b1757e5
Showing 9 changed files with 88 additions and 61 deletions.
13 changes: 9 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
cached = "0.40.0"
walkdir = "2.3.2"
md5 = "0.7.0"
sha2 = "0.10.6"
sha1 = "0.10.5"
lazy_static = "1.4.0"
@@ -36,6 +35,8 @@ futures = "0.3.30"
tower = "0.4.13"
mime = "0.3.17"
git2 = { version = "0.18.2" }
digest = "0.10.7"
md-5 = "0.10.6"

[dev-dependencies]
assert_cmd = "2.0.6"
31 changes: 21 additions & 10 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -141,10 +141,13 @@ async fn add_file(
hash: extract::Path<String>,
file: Upload,
) -> Result<OutpackSuccess<()>, OutpackError> {
store::put_file(&root, file, &hash)
.await
.map_err(OutpackError::from)
.map(OutpackSuccess::from)
tokio::task::spawn_blocking(move || {
store::put_file(&root, file, &hash)
.map_err(OutpackError::from)
.map(OutpackSuccess::from)
})
.await
.unwrap()
}

async fn add_packet(
@@ -159,17 +162,25 @@ async fn add_packet(
}

async fn git_fetch(root: State<PathBuf>) -> Result<OutpackSuccess<()>, OutpackError> {
git::git_fetch(&root)
.map_err(OutpackError::from)
.map(OutpackSuccess::from)
tokio::task::spawn_blocking(move || {
git::git_fetch(&root)
.map_err(OutpackError::from)
.map(OutpackSuccess::from)
})
.await
.unwrap()
}

async fn git_list_branches(
root: State<PathBuf>,
) -> Result<OutpackSuccess<git::BranchResponse>, OutpackError> {
git::git_list_branches(&root)
.map_err(OutpackError::from)
.map(OutpackSuccess::from)
tokio::task::spawn_blocking(move || {
git::git_list_branches(&root)
.map_err(OutpackError::from)
.map(OutpackSuccess::from)
})
.await
.unwrap()
}

#[derive(Serialize, Deserialize)]
38 changes: 27 additions & 11 deletions src/hash.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use digest::Digest;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use sha1::Digest;
use std::fmt;
use std::fmt::LowerHex;
use std::path::Path;
@@ -117,20 +117,37 @@ fn hex_string<T: LowerHex>(digest: T) -> String {
format!("{:x}", digest)
}

pub fn hash_data(data: &[u8], algorithm: HashAlgorithm) -> Hash {
fn hash_stream_impl<D>(mut stream: impl std::io::Read) -> Result<String, std::io::Error>
where
D: Digest + std::io::Write,
digest::Output<D>: LowerHex,
{
let mut hasher = D::new();
std::io::copy(&mut stream, &mut hasher)?;
Ok(hex_string(hasher.finalize()))
}

pub fn hash_stream(
stream: impl std::io::Read,
algorithm: HashAlgorithm,
) -> Result<Hash, std::io::Error> {
let value: String = match algorithm {
HashAlgorithm::Md5 => hex_string(md5::compute(data)),
HashAlgorithm::Sha1 => hex_string(sha1::Sha1::new().chain_update(data).finalize()),
HashAlgorithm::Sha256 => hex_string(sha2::Sha256::new().chain_update(data).finalize()),
HashAlgorithm::Sha384 => hex_string(sha2::Sha384::new().chain_update(data).finalize()),
HashAlgorithm::Sha512 => hex_string(sha2::Sha512::new().chain_update(data).finalize()),
HashAlgorithm::Md5 => hash_stream_impl::<md5::Md5>(stream)?,
HashAlgorithm::Sha1 => hash_stream_impl::<sha1::Sha1>(stream)?,
HashAlgorithm::Sha256 => hash_stream_impl::<sha2::Sha256>(stream)?,
HashAlgorithm::Sha384 => hash_stream_impl::<sha2::Sha384>(stream)?,
HashAlgorithm::Sha512 => hash_stream_impl::<sha2::Sha512>(stream)?,
};
Hash { algorithm, value }
Ok(Hash { algorithm, value })
}

pub fn hash_data(data: &[u8], algorithm: HashAlgorithm) -> Hash {
hash_stream(data, algorithm).expect("reading from memory cannot fail")
}

pub fn hash_file(path: &Path, algorithm: HashAlgorithm) -> Result<Hash, std::io::Error> {
let bytes = std::fs::read(path)?;
Ok(hash_data(&bytes, algorithm))
let file = std::fs::File::open(path)?;
hash_stream(file, algorithm)
}

pub fn validate_hash(found: &Hash, expected: &Hash) -> Result<(), HashError> {
@@ -149,7 +166,6 @@ pub fn validate_hash_data(data: &[u8], expected: &str) -> Result<(), HashError>
validate_hash(&hash_data(data, expected.algorithm), &expected)
}

// This is not yet a streaming hash, which can be done!
pub fn validate_hash_file(path: &Path, expected: &str) -> Result<(), HashError> {
let expected: Hash = expected.parse()?;
validate_hash(&hash_file(path, expected.algorithm)?, &expected)
5 changes: 3 additions & 2 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -289,6 +289,7 @@ mod tests {
use crate::store::file_exists;
use crate::test_utils::tests::{get_temp_outpack_root, start_packet};
use crate::utils::time_as_num;
use md5::Md5;
use serde_json::Value;
use sha2::{Digest, Sha256};

@@ -336,7 +337,7 @@ mod tests {
let digest = get_ids_digest(Path::new("tests/example"), None).unwrap();
let dat = "20170818-164830-33e0ab0120170818-164847-7574883b20180220-095832-16a4bbed\
20180818-164043-7cdcde4b";
let expected = format!("sha256:{:x}", Sha256::new().chain_update(dat).finalize());
let expected = format!("sha256:{:x}", Sha256::digest(dat));
assert_eq!(digest, expected);
}

@@ -345,7 +346,7 @@ mod tests {
let digest = get_ids_digest(Path::new("tests/example"), Some(String::from("md5"))).unwrap();
let dat = "20170818-164830-33e0ab0120170818-164847-7574883b20180220-095832-16a4bbed\
20180818-164043-7cdcde4b";
let expected = format!("md5:{:x}", md5::compute(dat));
let expected = format!("md5:{:x}", Md5::digest(dat));
assert_eq!(digest, expected);
}

4 changes: 2 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -264,8 +264,8 @@ mod tests {

let total_size = data1.len() + data2.len();

put_file(&root, data1, &hash1).await.unwrap();
put_file(&root, data2, &hash2).await.unwrap();
put_file(&root, data1, &hash1).unwrap();
put_file(&root, data2, &hash2).unwrap();

collector.update().unwrap();
assert_eq!(collector.files_total.get(), 2);
36 changes: 18 additions & 18 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ pub fn file_path(root: &Path, hash: &str) -> io::Result<PathBuf> {

pub fn file_exists(root: &Path, hash: &str) -> io::Result<bool> {
let path = file_path(root, hash)?;
Ok(fs::metadata(path).is_ok())
Ok(std::fs::metadata(path).is_ok())
}

pub fn get_missing_files(root: &Path, wanted: &[String]) -> io::Result<Vec<String>> {
@@ -32,20 +32,20 @@ pub fn get_missing_files(root: &Path, wanted: &[String]) -> io::Result<Vec<Strin
.collect()
}

pub async fn put_file(root: &Path, file: impl Into<Upload>, hash: &str) -> io::Result<()> {
pub fn put_file(root: &Path, file: impl Into<Upload>, hash: &str) -> io::Result<()> {
let temp_dir = tempdir_in(root)?;
let temp_path = temp_dir.path().join("data");

file.into().persist(&temp_path).await?;
file.into().persist(&temp_path)?;

hash::validate_hash_file(&temp_path, hash).map_err(hash::hash_error_to_io_error)?;

let path = file_path(root, hash)?;
if !file_exists(root, hash)? {
fs::create_dir_all(path.parent().unwrap())?;
fs::rename(temp_path, path).map(|_| ())
} else {
Ok(())
fs::rename(temp_path, path)?;
}
Ok(())
}

pub fn enumerate_files(root: &Path) -> impl Iterator<Item = DirEntry> {
@@ -87,48 +87,48 @@ mod tests {
assert_eq!(res.unwrap_err().to_string(), "Invalid hash format 'sha256'")
}

#[tokio::test]
async fn put_file_is_idempotent() {
#[test]
fn put_file_is_idempotent() {
let root = get_temp_outpack_root();
let data = b"Testing 123.";
let hash = hash_data(data, HashAlgorithm::Sha256);
let hash_str = hash.to_string();

let res = put_file(&root, data, &hash.to_string()).await;
let res = put_file(&root, data, &hash.to_string());
let expected = file_path(&root, &hash_str).unwrap();
let expected = expected.to_str().unwrap();
assert!(res.is_ok());
assert_eq!(fs::read(expected).unwrap(), data);

let res = put_file(&root, data, &hash_str).await;
let res = put_file(&root, data, &hash_str);
println!("{:?}", res);
assert!(res.is_ok());
}

#[tokio::test]
async fn put_file_validates_hash_format() {
#[test]
fn put_file_validates_hash_format() {
let root = get_temp_outpack_root();
let data = b"Testing 123.";
let res = put_file(&root, data, "badhash").await;
let res = put_file(&root, data, "badhash");
assert_eq!(
res.unwrap_err().to_string(),
"Invalid hash format 'badhash'"
);
}

#[tokio::test]
async fn put_file_validates_hash_match() {
#[test]
fn put_file_validates_hash_match() {
let root = get_temp_outpack_root();
let data = b"Testing 123.";
let res = put_file(&root, data, "md5:abcde").await;
let res = put_file(&root, data, "md5:abcde");
assert_eq!(
res.unwrap_err().to_string(),
"Expected hash 'md5:abcde' but found 'md5:6df8571d7b178e6fbb982ad0f5cd3bc1'"
);
}

#[tokio::test]
async fn enumerate_files_works() {
#[test]
fn enumerate_files_works() {
let root = get_temp_outpack_root();
let files: Vec<_> = enumerate_files(&root)
.map(|entry| entry.file_name().to_owned())
9 changes: 4 additions & 5 deletions src/upload.rs
Original file line number Diff line number Diff line change
@@ -53,14 +53,13 @@ impl Upload {
///
/// The file is moved to the destination path. That path must be located on the same filesystem
/// as the configured upload directory.
pub async fn persist(self, destination: &Path) -> std::io::Result<()> {
pub fn persist(self, destination: &Path) -> std::io::Result<()> {
match self {
Upload::Buffered(data) => {
tokio::fs::write(destination, &data).await?;
std::fs::write(destination, data)?;
}
Upload::File(path) => {
let destination = destination.to_owned();
tokio::task::spawn_blocking(move || path.persist(destination).unwrap()).await?
path.persist(destination)?;
}
}
Ok(())
@@ -156,7 +155,7 @@ mod tests {
}

let destination = root.as_ref().join("hello.txt");
upload.persist(&destination).await.unwrap();
upload.persist(&destination).unwrap();

let contents = tokio::fs::read(&destination).await.unwrap();
assert_eq!(contents, data);
10 changes: 2 additions & 8 deletions tests/test_api.rs
Original file line number Diff line number Diff line change
@@ -553,10 +553,7 @@ async fn missing_files_validates_request_body() {
async fn can_post_file() {
let mut client = get_default_client();
let content = "test";
let hash = format!(
"sha256:{:x}",
Sha256::new().chain_update(content).finalize()
);
let hash = format!("sha256:{:x}", Sha256::digest(content));
let response = client
.post(
format!("/file/{}", hash),
@@ -618,10 +615,7 @@ async fn can_post_metadata() {
"orderly.R"
]
}"#;
let hash = format!(
"sha256:{:x}",
Sha256::new().chain_update(content).finalize()
);
let hash = format!("sha256:{:x}", Sha256::digest(content));
let response = client
.post(format!("/packet/{}", hash), mime::TEXT_PLAIN_UTF_8, content)
.await;

0 comments on commit b1757e5

Please sign in to comment.