Skip to content

Commit

Permalink
local_working_copy: temporarily remove parallelism from visit_directo…
Browse files Browse the repository at this point in the history
…ry()

This patch turns .into_par_iter() into a plain for loop. I'm going to add
per-directory parallel execution instead.
  • Loading branch information
yuja committed Dec 5, 2024
1 parent 8a731cc commit 9b01af9
Showing 1 changed file with 77 additions and 85 deletions.
162 changes: 77 additions & 85 deletions lib/src/local_working_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ use itertools::Itertools;
use once_cell::unsync::OnceCell;
use pollster::FutureExt;
use prost::Message;
use rayon::iter::IntoParallelIterator;
use rayon::prelude::ParallelIterator;
use tempfile::NamedTempFile;
use thiserror::Error;
use tracing::instrument;
Expand Down Expand Up @@ -1052,96 +1050,90 @@ impl FileSnapshotter<'_> {

let git_ignore = git_ignore
.chain_with_file(&dir.to_internal_dir_string(), disk_dir.join(".gitignore"))?;
let dir_entries: Vec<_> = disk_dir
.read_dir()
.and_then(|entries| entries.try_collect())
.map_err(|err| SnapshotError::Other {
message: format!("Failed to read directory {}", disk_dir.display()),
err: err.into(),
})?;
dir_entries
.into_par_iter()
.try_for_each(|entry| -> Result<(), SnapshotError> {
let file_type = entry.file_type().unwrap();
let file_name = entry.file_name();
let name = file_name
.to_str()
.ok_or_else(|| SnapshotError::InvalidUtf8Path {
path: file_name.clone(),
})?;
let to_read_dir_err = |err: io::Error| SnapshotError::Other {
message: format!("Failed to read directory {}", disk_dir.display()),
err: err.into(),
};
for entry in disk_dir.read_dir().map_err(to_read_dir_err)? {
let entry = entry.map_err(to_read_dir_err)?;
let file_type = entry.file_type().unwrap();
let file_name = entry.file_name();
let name = file_name
.to_str()
.ok_or_else(|| SnapshotError::InvalidUtf8Path {
path: file_name.clone(),
})?;

if RESERVED_DIR_NAMES.contains(&name) {
return Ok(());
}
let path = dir.join(RepoPathComponent::new(name));
let maybe_current_file_state = file_states.get(&path);
if let Some(file_state) = &maybe_current_file_state {
if file_state.file_type == FileType::GitSubmodule {
return Ok(());
}
if RESERVED_DIR_NAMES.contains(&name) {
continue;
}
let path = dir.join(RepoPathComponent::new(name));
let maybe_current_file_state = file_states.get(&path);
if let Some(file_state) = &maybe_current_file_state {
if file_state.file_type == FileType::GitSubmodule {
continue;
}
}

if file_type.is_dir() {
let file_states = file_states.prefixed(&path);
if git_ignore.matches(&path.to_internal_dir_string())
|| self.start_tracking_matcher.visit(&path).is_nothing()
{
// TODO: Report this directory to the caller if there are unignored paths we
// should not start tracking.
if file_type.is_dir() {
let file_states = file_states.prefixed(&path);
if git_ignore.matches(&path.to_internal_dir_string())
|| self.start_tracking_matcher.visit(&path).is_nothing()
{
// TODO: Report this directory to the caller if there are unignored paths we
// should not start tracking.

// If the whole directory is ignored, visit only paths we're already
// tracking.
self.visit_tracked_files(file_states)?;
} else {
let directory_to_visit = DirectoryToVisit {
dir: path,
disk_dir: entry.path(),
git_ignore: git_ignore.clone(),
file_states,
};
self.visit_directory(directory_to_visit)?;
}
} else if self.matcher.matches(&path) {
if let Some(progress) = self.progress {
progress(&path);
}
if maybe_current_file_state.is_none()
&& git_ignore.matches(path.as_internal_file_string())
{
// If it wasn't already tracked and it matches
// the ignored paths, then ignore it.
} else if maybe_current_file_state.is_none()
&& !self.start_tracking_matcher.matches(&path)
// If the whole directory is ignored, visit only paths we're already
// tracking.
self.visit_tracked_files(file_states)?;
} else {
let directory_to_visit = DirectoryToVisit {
dir: path,
disk_dir: entry.path(),
git_ignore: git_ignore.clone(),
file_states,
};
self.visit_directory(directory_to_visit)?;
}
} else if self.matcher.matches(&path) {
if let Some(progress) = self.progress {
progress(&path);
}
if maybe_current_file_state.is_none()
&& git_ignore.matches(path.as_internal_file_string())
{
// If it wasn't already tracked and it matches
// the ignored paths, then ignore it.
} else if maybe_current_file_state.is_none()
&& !self.start_tracking_matcher.matches(&path)
{
// Leave the file untracked
// TODO: Report this path to the caller
} else {
let metadata = entry.metadata().map_err(|err| SnapshotError::Other {
message: format!("Failed to stat file {}", entry.path().display()),
err: err.into(),
})?;
if maybe_current_file_state.is_none() && metadata.len() > self.max_new_file_size
{
// Leave the file untracked
// TODO: Report this path to the caller
} else {
let metadata = entry.metadata().map_err(|err| SnapshotError::Other {
message: format!("Failed to stat file {}", entry.path().display()),
err: err.into(),
})?;
if maybe_current_file_state.is_none()
&& metadata.len() > self.max_new_file_size
{
// TODO: Maybe leave the file untracked instead
return Err(SnapshotError::NewFileTooLarge {
path: entry.path().clone(),
size: HumanByteSize(metadata.len()),
max_size: HumanByteSize(self.max_new_file_size),
});
}
if let Some(new_file_state) = file_state(&metadata) {
self.process_present_file(
path,
&entry.path(),
maybe_current_file_state.as_ref(),
new_file_state,
)?;
}
// TODO: Maybe leave the file untracked instead
return Err(SnapshotError::NewFileTooLarge {
path: entry.path().clone(),
size: HumanByteSize(metadata.len()),
max_size: HumanByteSize(self.max_new_file_size),
});
}
if let Some(new_file_state) = file_state(&metadata) {
self.process_present_file(
path,
&entry.path(),
maybe_current_file_state.as_ref(),
new_file_state,
)?;
}
}
Ok(())
})?;
}
}
Ok(())
}

Expand Down

0 comments on commit 9b01af9

Please sign in to comment.