-
-
Notifications
You must be signed in to change notification settings - Fork 672
Description
An ExecuteProcessRequest
has an output_directories
field:
pub output_directories: BTreeSet<PathBuf>, |
The idea here is that the output_files
and output_directories
fields represent the files and directories which should be captured after the process runs. They should be merged into one Directory
, and the Digest
of that Directory
is populated on the ExecuteProcessResult
here:
pub output_directory: hashing::Digest, |
This is currently implemented for local process execution.
Remote process execution only currently fetches the output_files
, and ignores the directories. This code does the fetching and populating:
pants/src/rust/engine/process_execution/src/remote.rs
Lines 443 to 537 in d0aeac5
fn extract_output_files( | |
&self, | |
execute_response: &bazel_protos::remote_execution::ExecuteResponse, | |
) -> BoxFuture<Digest, ExecutionError> { | |
let mut futures = vec![]; | |
let path_map = Arc::new(Mutex::new(HashMap::new())); | |
let path_map_2 = path_map.clone(); | |
let path_stats_result: Result<Vec<PathStat>, String> = execute_response | |
.get_result() | |
.get_output_files() | |
.into_iter() | |
.map(|output_file| { | |
let output_file_path_buf = PathBuf::from(output_file.get_path()); | |
if output_file.has_digest() { | |
let digest: Result<Digest, String> = output_file.get_digest().into(); | |
let mut underlying_path_map = path_map.lock().unwrap(); | |
underlying_path_map.insert(output_file_path_buf.clone(), digest?); | |
} else { | |
let raw_content = output_file.content.clone(); | |
let path_map_3 = path_map.clone(); | |
let output_file_path_buf_2 = output_file_path_buf.clone(); | |
let output_file_path_buf_3 = output_file_path_buf_2.clone(); | |
futures.push( | |
self | |
.store | |
.store_file_bytes(raw_content, false) | |
.map_err(move |error| { | |
ExecutionError::Fatal(format!( | |
"Error storing raw content for output file {:?}: {:?}", | |
output_file_path_buf_3, error | |
)) | |
}) | |
.map(move |digest| { | |
let mut underlying_path_map = path_map_3.lock().unwrap(); | |
underlying_path_map.insert(output_file_path_buf_2, digest); | |
}), | |
); | |
} | |
Ok(PathStat::file( | |
output_file_path_buf.clone(), | |
File { | |
path: output_file_path_buf.clone(), | |
is_executable: output_file.get_is_executable(), | |
}, | |
)) | |
}) | |
.collect(); | |
let path_stats = try_future!(path_stats_result.map_err(|err| ExecutionError::Fatal(err))); | |
#[derive(Clone)] | |
struct StoreOneOffRemoteDigest { | |
map_of_paths_to_digests: HashMap<PathBuf, Digest>, | |
} | |
impl StoreOneOffRemoteDigest { | |
pub fn new(map: HashMap<PathBuf, Digest>) -> StoreOneOffRemoteDigest { | |
StoreOneOffRemoteDigest { | |
map_of_paths_to_digests: map, | |
} | |
} | |
} | |
impl fs::StoreFileByDigest<String> for StoreOneOffRemoteDigest { | |
fn store_by_digest(&self, file: File) -> BoxFuture<Digest, String> { | |
match self.map_of_paths_to_digests.get(&file.path) { | |
Some(digest) => future::ok(digest.clone()), | |
None => future::err(format!( | |
"Didn't know digest for path in remote execution response: {:?}", | |
file.path | |
)), | |
}.to_boxed() | |
} | |
} | |
let store = self.store.clone(); | |
future::join_all(futures) | |
.and_then(|_| { | |
// The unwrap() below is safe because we have joined any futures that had references to the Arc | |
let path_wrap_mutex = Arc::try_unwrap(path_map_2).unwrap(); | |
let underlying_path_map = path_wrap_mutex.into_inner().unwrap(); | |
fs::Snapshot::digest_from_path_stats( | |
store, | |
StoreOneOffRemoteDigest::new(underlying_path_map), | |
path_stats, | |
).map_err(move |error| { | |
ExecutionError::Fatal(format!( | |
"Error when storing the output file directory info in the remote CAS: {:?}", | |
error | |
)) | |
}) | |
}) | |
.to_boxed() | |
} | |
} |
We already have code to merge multiple Directory
objects here:
pants/src/rust/engine/fs/src/snapshot.rs
Lines 197 to 201 in d0aeac5
/// | |
/// Given Digest(s) representing Directory instances, merge them recursively into a single | |
/// output Directory Digest. Fails for collisions. | |
/// | |
pub fn merge_directories(store: Store, dir_digests: Vec<Digest>) -> BoxFuture<Digest, String> { |
So we need to:
- Set the expected output directories on the remote request, as we do for files here:
pants/src/rust/engine/process_execution/src/remote.rs
Lines 560 to 570 in d0aeac5
let mut output_files = req .output_files .iter() .map(|p| { p.to_str() .map(|s| s.to_owned()) .ok_or_else(|| format!("Non-UTF8 output file path: {:?}", p)) }) .collect::<Result<Vec<String>, String>>()?; output_files.sort(); action.set_output_files(protobuf::repeated::RepeatedField::from_vec(output_files)); - Merge our
Directory
we construct of the files with theDirectory
entries which are returned from the remote response. - Write some tests using the mock server, as can be found in the bottom of remote.rs, showing that the merging worked as we expect.