From fd647d41ee32a3ebf22a1f9298e2e25dc67bbe64 Mon Sep 17 00:00:00 2001 From: Aaron Opfer Date: Thu, 3 Oct 2024 19:14:07 -0500 Subject: [PATCH 1/5] refactor: stream JLAP repodata reads and writes JLAP previously had up to three copies of repodata simultaneously: the read bytes, the in-memory Serde representation, and the to-be-written bytes. Avoid having multiple copies of repodata in-memory simultaneously by streaming repodata reads and writes, which should bring us down to just one in-memory Serde copy. --- .../src/fetch/jlap/mod.rs | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs index 98ad950fe..0978bc5b6 100644 --- a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs @@ -81,7 +81,6 @@ use blake2::digest::Output; use blake2::digest::{FixedOutput, Update}; -use fs_err as fs; use rattler_digest::{ parse_digest_from_hex, serde::SerializableHash, Blake2b256, Blake2b256Hash, Blake2bMac256, }; @@ -94,7 +93,6 @@ use reqwest_middleware::ClientWithMiddleware; use serde::{Deserialize, Serialize}; use serde_json::Value; use serde_with::serde_as; -use std::io::Write; use std::iter::Iterator; use std::path::Path; use std::str; @@ -537,13 +535,13 @@ fn apply_jlap_patches( reporter.on_jlap_decode_start(index); } + tracing::info!("parsing cached repodata.json as JSON"); // Read the contents of the current repodata to a string - let repo_data_contents = fs::read_to_string(repo_data_path).map_err(JLAPError::FileSystem)?; + let repo_data_file = std::fs::File::open(repo_data_path).map_err(JLAPError::FileSystem)?; + let repo_data_reader = std::io::BufReader::with_capacity(64 * 1024, repo_data_file); - // Parse the JSON so we can manipulate it - tracing::info!("parsing cached repodata.json as JSON"); - let mut repo_data = - serde_json::from_str::(&repo_data_contents).map_err(JLAPError::JSONParse)?; + let mut repo_data: Value = + serde_json::from_reader(repo_data_reader).map_err(JLAPError::JSONParse)?; if let Some((reporter, index)) = report { reporter.on_jlap_decode_completed(index); @@ -569,9 +567,6 @@ fn apply_jlap_patches( reporter.on_jlap_encode_start(index); } - // Convert the json to bytes, but we don't really care about formatting. - let updated_json = serde_json::to_string(&repo_data).map_err(JLAPError::JSONParse)?; - // Write the content to disk and immediately compute the hash of the file contents. tracing::info!("writing patched repodata to disk"); let mut hashing_writer = NamedTempFile::new_in( @@ -581,9 +576,13 @@ fn apply_jlap_patches( ) .map_err(JLAPError::FileSystem) .map(rattler_digest::HashingWriter::<_, Blake2b256>::new)?; - hashing_writer - .write_all(&updated_json.into_bytes()) - .map_err(JLAPError::FileSystem)?; + + serde_json::to_writer( + std::io::BufWriter::with_capacity(64 * 1024, &mut hashing_writer), + &repo_data, + ) + .map_err(JLAPError::JSONParse)?; + let (file, hash) = hashing_writer.finalize(); file.persist(repo_data_path) .map_err(|e| JLAPError::FileSystem(e.error))?; From 0d7b4ad6d565682022e31e4536ca8bb760ee4a94 Mon Sep 17 00:00:00 2001 From: Aaron Opfer Date: Thu, 3 Oct 2024 20:57:41 -0500 Subject: [PATCH 2/5] put back fs_err --- crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs index 0978bc5b6..e4c05f070 100644 --- a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs @@ -81,6 +81,7 @@ use blake2::digest::Output; use blake2::digest::{FixedOutput, Update}; +use fs_err as fs; use rattler_digest::{ parse_digest_from_hex, serde::SerializableHash, Blake2b256, Blake2b256Hash, Blake2bMac256, }; @@ -537,7 +538,7 @@ fn apply_jlap_patches( tracing::info!("parsing cached repodata.json as JSON"); // Read the contents of the current repodata to a string - let repo_data_file = std::fs::File::open(repo_data_path).map_err(JLAPError::FileSystem)?; + let repo_data_file = fs::File::open(repo_data_path).map_err(JLAPError::FileSystem)?; let repo_data_reader = std::io::BufReader::with_capacity(64 * 1024, repo_data_file); let mut repo_data: Value = @@ -576,7 +577,6 @@ fn apply_jlap_patches( ) .map_err(JLAPError::FileSystem) .map(rattler_digest::HashingWriter::<_, Blake2b256>::new)?; - serde_json::to_writer( std::io::BufWriter::with_capacity(64 * 1024, &mut hashing_writer), &repo_data, From 19ce3cbd42a930e9f7bb925bf0e537be4ae6297c Mon Sep 17 00:00:00 2001 From: Aaron Opfer Date: Sat, 5 Oct 2024 18:32:35 -0500 Subject: [PATCH 3/5] switch to memmap; apply PR comments --- crates/rattler_repodata_gateway/Cargo.toml | 1 + .../src/fetch/jlap/mod.rs | 26 ++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/crates/rattler_repodata_gateway/Cargo.toml b/crates/rattler_repodata_gateway/Cargo.toml index 06c34dd9f..b076525b9 100644 --- a/crates/rattler_repodata_gateway/Cargo.toml +++ b/crates/rattler_repodata_gateway/Cargo.toml @@ -56,6 +56,7 @@ url = { workspace = true, features = ["serde"] } zstd = { workspace = true } rattler_cache = { version = "0.2.5", path = "../rattler_cache" } rattler_redaction = { version = "0.1.2", path = "../rattler_redaction", features = ["reqwest", "reqwest-middleware"] } +memmap = "0.7.0" [target.'cfg(unix)'.dependencies] libc = { workspace = true } diff --git a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs index e4c05f070..11ed217fb 100644 --- a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs @@ -82,6 +82,7 @@ use blake2::digest::Output; use blake2::digest::{FixedOutput, Update}; use fs_err as fs; +use memmap; use rattler_digest::{ parse_digest_from_hex, serde::SerializableHash, Blake2b256, Blake2b256Hash, Blake2bMac256, }; @@ -536,13 +537,17 @@ fn apply_jlap_patches( reporter.on_jlap_decode_start(index); } - tracing::info!("parsing cached repodata.json as JSON"); - // Read the contents of the current repodata to a string - let repo_data_file = fs::File::open(repo_data_path).map_err(JLAPError::FileSystem)?; - let repo_data_reader = std::io::BufReader::with_capacity(64 * 1024, repo_data_file); - - let mut repo_data: Value = - serde_json::from_reader(repo_data_reader).map_err(JLAPError::JSONParse)?; + let mut repo_data: Value = { + tracing::info!("parsing cached repodata.json as JSON"); + // Read the contents of the current repodata to a string + let repo_data_file = fs::File::open(repo_data_path).map_err(JLAPError::FileSystem)?; + let mmap = unsafe { + memmap::MmapOptions::new() + .map(repo_data_file.file()) + .map_err(JLAPError::FileSystem)? + }; + serde_json::from_slice(mmap.as_ref()).map_err(JLAPError::JSONParse) + }?; if let Some((reporter, index)) = report { reporter.on_jlap_decode_completed(index); @@ -577,11 +582,8 @@ fn apply_jlap_patches( ) .map_err(JLAPError::FileSystem) .map(rattler_digest::HashingWriter::<_, Blake2b256>::new)?; - serde_json::to_writer( - std::io::BufWriter::with_capacity(64 * 1024, &mut hashing_writer), - &repo_data, - ) - .map_err(JLAPError::JSONParse)?; + serde_json::to_writer(std::io::BufWriter::new(&mut hashing_writer), &repo_data) + .map_err(JLAPError::JSONParse)?; let (file, hash) = hashing_writer.finalize(); file.persist(repo_data_path) From c4a5d9a8421a3a96826c669d86d0c819ef1da689 Mon Sep 17 00:00:00 2001 From: Aaron Opfer Date: Sun, 6 Oct 2024 15:19:37 -0500 Subject: [PATCH 4/5] revert memmap changes and add std::mem::drop --- .../src/fetch/jlap/mod.rs | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs index 11ed217fb..5c0184925 100644 --- a/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/jlap/mod.rs @@ -82,7 +82,6 @@ use blake2::digest::Output; use blake2::digest::{FixedOutput, Update}; use fs_err as fs; -use memmap; use rattler_digest::{ parse_digest_from_hex, serde::SerializableHash, Blake2b256, Blake2b256Hash, Blake2bMac256, }; @@ -537,17 +536,14 @@ fn apply_jlap_patches( reporter.on_jlap_decode_start(index); } - let mut repo_data: Value = { - tracing::info!("parsing cached repodata.json as JSON"); - // Read the contents of the current repodata to a string - let repo_data_file = fs::File::open(repo_data_path).map_err(JLAPError::FileSystem)?; - let mmap = unsafe { - memmap::MmapOptions::new() - .map(repo_data_file.file()) - .map_err(JLAPError::FileSystem)? - }; - serde_json::from_slice(mmap.as_ref()).map_err(JLAPError::JSONParse) - }?; + // Read the contents of the current repodata to a string + let repo_data_contents = fs::read_to_string(repo_data_path).map_err(JLAPError::FileSystem)?; + + // Parse the JSON so we can manipulate it + tracing::info!("parsing cached repodata.json as JSON"); + let mut repo_data = + serde_json::from_str::(&repo_data_contents).map_err(JLAPError::JSONParse)?; + std::mem::drop(repo_data_contents); if let Some((reporter, index)) = report { reporter.on_jlap_decode_completed(index); From cdc1239cbfc22f0954057a0a6bc8a597df7b5d95 Mon Sep 17 00:00:00 2001 From: Bas Zalmstra Date: Mon, 7 Oct 2024 12:15:07 +0200 Subject: [PATCH 5/5] Update crates/rattler_repodata_gateway/Cargo.toml --- crates/rattler_repodata_gateway/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rattler_repodata_gateway/Cargo.toml b/crates/rattler_repodata_gateway/Cargo.toml index b076525b9..06c34dd9f 100644 --- a/crates/rattler_repodata_gateway/Cargo.toml +++ b/crates/rattler_repodata_gateway/Cargo.toml @@ -56,7 +56,6 @@ url = { workspace = true, features = ["serde"] } zstd = { workspace = true } rattler_cache = { version = "0.2.5", path = "../rattler_cache" } rattler_redaction = { version = "0.1.2", path = "../rattler_redaction", features = ["reqwest", "reqwest-middleware"] } -memmap = "0.7.0" [target.'cfg(unix)'.dependencies] libc = { workspace = true }