From 5f27ea1fce94e7756ad53cc469d263e97d841056 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Tue, 23 Aug 2022 20:01:36 -0400 Subject: [PATCH] Optimize `CreateDigest` implementation. (#16617) Closes #16570 * Use a `DigestTrie` to create all snapshots at once, instead of creating them individually * Store all in-memory file contents in a single (batched) call, instead of storing them individually [ci skip-build-wheels] --- I restored the benchmark script from #14569 to test this. | size | create_digest_before | create_digest_after | | --- | --- | --- | | 20000 | 608 | 130 | | 40000 | 1164 | 268 | | 60000 | 2260 | 475 | | 80000 | 3582 | 674 | | 100000 | 5085 | 862 | | 120000 | 6765 | 1057 | | 140000 | 8818 | 1067 | | 160000 | 10752 | 1361 | | 180000 | 12619 | 1604 | --- src/rust/engine/fs/store/src/lib.rs | 17 ++++++ src/rust/engine/src/intrinsics.rs | 80 ++++++++++++++++------------- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 2410b5026ff..534b5e499cc 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -365,6 +365,23 @@ impl Store { .await } + /// + /// A convenience method for storing batches of small files. + /// + /// NB: This method should not be used for large blobs: prefer to stream them from their source + /// using `store_file`. + /// + pub async fn store_file_bytes_batch( + &self, + items: Vec<(Option, Bytes)>, + initial_lease: bool, + ) -> Result, String> { + self + .local + .store_bytes_batch(EntryType::File, items, initial_lease) + .await + } + /// /// Store a file locally by streaming its contents. /// diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index bb3c4d37c8e..ae2224c6283 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -1,6 +1,7 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::str::FromStr; @@ -18,14 +19,15 @@ use crate::tasks::Intrinsic; use crate::types::Types; use crate::Failure; -use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; +use bytes::Bytes; +use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; use pyo3::{PyAny, PyRef, Python, ToPyObject}; use tokio::process; -use fs::{DirectoryDigest, RelativePath}; -use hashing::Digest; +use fs::{DigestTrie, DirectoryDigest, PathStat, RelativePath}; +use hashing::{Digest, EMPTY_DIGEST}; use process_execution::local::{apply_chroot, create_sandbox, prepare_workdir, KeepSandboxes}; use process_execution::ManagedChild; use stdio::TryCloneAsFile; @@ -406,6 +408,8 @@ fn create_digest_to_digest( context: Context, args: Vec, ) -> BoxFuture<'static, NodeResult> { + let mut new_file_count = 0; + let items: Vec = { let gil = Python::acquire_gil(); let py = gil.python(); @@ -419,6 +423,7 @@ fn create_digest_to_digest( if obj.hasattr("content").unwrap() { let bytes = bytes::Bytes::from(externs::getattr::>(obj, "content").unwrap()); let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); + new_file_count += 1; CreateDigestItem::FileContent(path, bytes, is_executable) } else if obj.hasattr("file_digest").unwrap() { let py_file_digest: PyFileDigest = externs::getattr(obj, "file_digest").unwrap(); @@ -431,45 +436,46 @@ fn create_digest_to_digest( .collect() }; - // TODO: Rather than creating independent Digests and then merging them, this should use - // `DigestTrie::from_path_stats`. - // see https://github.com/pantsbuild/pants/pull/14569#issuecomment-1057286943 - let digest_futures: Vec<_> = items - .into_iter() - .map(|item| { - let store = context.core.store(); - async move { - match item { - CreateDigestItem::FileContent(path, bytes, is_executable) => { - let digest = store.store_file_bytes(bytes, true).await?; - let snapshot = store - .snapshot_of_one_file(path, digest, is_executable) - .await?; - let res: Result = Ok(snapshot.into()); - res - } - CreateDigestItem::FileEntry(path, digest, is_executable) => { - let snapshot = store - .snapshot_of_one_file(path, digest, is_executable) - .await?; - let res: Result<_, String> = Ok(snapshot.into()); - res - } - CreateDigestItem::Dir(path) => store - .create_empty_dir(&path) - .await - .map_err(|e| e.to_string()), - } + let mut path_stats: Vec = Vec::with_capacity(items.len()); + let mut file_digests: HashMap = HashMap::with_capacity(items.len()); + let mut bytes_to_store: Vec<(Option, Bytes)> = Vec::with_capacity(new_file_count); + + for item in items { + match item { + CreateDigestItem::FileContent(path, bytes, is_executable) => { + let digest = Digest::of_bytes(&bytes); + bytes_to_store.push((Some(digest), bytes)); + let stat = fs::File { + path: path.to_path_buf(), + is_executable, + }; + path_stats.push(PathStat::file(path.to_path_buf(), stat)); + file_digests.insert(path.to_path_buf(), digest); } - }) - .collect(); + CreateDigestItem::FileEntry(path, digest, is_executable) => { + let stat = fs::File { + path: path.to_path_buf(), + is_executable, + }; + path_stats.push(PathStat::file(path.to_path_buf(), stat)); + file_digests.insert(path.to_path_buf(), digest); + } + CreateDigestItem::Dir(path) => { + let stat = fs::Dir(path.to_path_buf()); + path_stats.push(PathStat::dir(path.to_path_buf(), stat)); + file_digests.insert(path.to_path_buf(), EMPTY_DIGEST); + } + } + } let store = context.core.store(); async move { - let digests = future::try_join_all(digest_futures).await?; - let digest = store.merge(digests).await?; + // The digests returned here are already in the `file_digests` map. + let _ = store.store_file_bytes_batch(bytes_to_store, true).await?; + let trie = DigestTrie::from_path_stats(path_stats, &file_digests)?; + let gil = Python::acquire_gil(); - let value = Snapshot::store_directory_digest(gil.python(), digest)?; + let value = Snapshot::store_directory_digest(gil.python(), trie.into())?; Ok(value) } .boxed()