Skip to content

Commit 0763fc3

Browse files
committed
Add APIs to process the files in a directory concurrently
They can be used in different contexts, for example, to count the number of files in a directory, the number of its subdirectories or extract some information contained or scattered through different files. They can also be used to compute some metrics that act on files in some way.
1 parent 0a81e78 commit 0763fc3

File tree

4 files changed

+289
-1
lines changed

4 files changed

+289
-1
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@ license = "MPL-2.0"
1212

1313
[dependencies]
1414
aho-corasick = "^0.7"
15+
crossbeam = { version = "^0.8", features = ["crossbeam-channel"] }
1516
fxhash = "0.2"
17+
globset = "^0.4"
1618
lazy_static = "^1.3"
17-
num-format = "^0.4"
1819
num = "^0.4"
1920
num-derive = "^0.3"
21+
num-format = "^0.4"
2022
num-traits = "^0.2"
2123
petgraph = "^0.6"
2224
regex = "^1.5"
2325
serde = { version = "^1.0", features = ["derive"] }
2426
termcolor = "^1.1"
27+
walkdir = "^2.3"
2528

2629
tree-sitter = "=0.19.3"
2730
tree-sitter-java = "=0.19.0"

src/concurrent_files.rs

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
use std::collections::HashMap;
2+
use std::path::{Path, PathBuf};
3+
use std::sync::Arc;
4+
use std::thread;
5+
6+
use crossbeam::channel::{unbounded, Receiver, Sender};
7+
use globset::GlobSet;
8+
use walkdir::{DirEntry, WalkDir};
9+
10+
type ProcFilesFunction<Config> = dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync;
11+
12+
type ProcDirPathsFunction<Config> =
13+
dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync;
14+
15+
type ProcPathFunction<Config> = dyn Fn(&Path, &Config) + Send + Sync;
16+
17+
// Null functions removed at compile time
18+
fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
19+
fn null_proc_path<Config>(_: &Path, _: &Config) {}
20+
21+
struct JobItem<Config> {
22+
path: PathBuf,
23+
cfg: Arc<Config>,
24+
}
25+
26+
type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
27+
type JobSender<Config> = Sender<Option<JobItem<Config>>>;
28+
29+
fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
30+
where
31+
ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
32+
{
33+
while let Ok(job) = receiver.recv() {
34+
if job.is_none() {
35+
break;
36+
}
37+
// Cannot panic because of the check immediately above.
38+
let job = job.unwrap();
39+
let path = job.path.clone();
40+
41+
if let Err(err) = func(job.path, &job.cfg) {
42+
eprintln!("{:?} for file {:?}", err, path);
43+
}
44+
}
45+
}
46+
47+
fn send_file<T>(
48+
path: PathBuf,
49+
cfg: &Arc<T>,
50+
sender: &JobSender<T>,
51+
) -> Result<(), ConcurrentErrors> {
52+
sender
53+
.send(Some(JobItem {
54+
path,
55+
cfg: Arc::clone(cfg),
56+
}))
57+
.map_err(|e| ConcurrentErrors::Sender(e.to_string()))
58+
}
59+
60+
fn is_hidden(entry: &DirEntry) -> bool {
61+
entry
62+
.file_name()
63+
.to_str()
64+
.map(|s| s.starts_with('.'))
65+
.unwrap_or(false)
66+
}
67+
68+
fn explore<Config, ProcDirPaths, ProcPath>(
69+
files_data: FilesData,
70+
cfg: &Arc<Config>,
71+
proc_dir_paths: ProcDirPaths,
72+
proc_path: ProcPath,
73+
sender: &JobSender<Config>,
74+
) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
75+
where
76+
ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
77+
ProcPath: Fn(&Path, &Config) + Send + Sync,
78+
{
79+
let FilesData {
80+
mut paths,
81+
ref include,
82+
ref exclude,
83+
} = files_data;
84+
85+
let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
86+
87+
for path in paths.drain(..) {
88+
let path = PathBuf::from(path);
89+
if !path.exists() {
90+
eprintln!("Warning: File doesn't exist: {:?}", path);
91+
continue;
92+
}
93+
if path.is_dir() {
94+
for entry in WalkDir::new(path)
95+
.into_iter()
96+
.filter_entry(|e| !is_hidden(e))
97+
{
98+
let entry = match entry {
99+
Ok(entry) => entry,
100+
Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
101+
};
102+
let path = entry.path().to_path_buf();
103+
if (include.is_empty() || include.is_match(&path))
104+
&& (exclude.is_empty() || !exclude.is_match(&path))
105+
&& path.is_file()
106+
{
107+
proc_dir_paths(&mut all_files, &path, cfg);
108+
send_file(path, cfg, sender)?;
109+
}
110+
}
111+
} else if (include.is_empty() || include.is_match(&path))
112+
&& (exclude.is_empty() || !exclude.is_match(&path))
113+
&& path.is_file()
114+
{
115+
proc_path(&path, cfg);
116+
send_file(path, cfg, sender)?;
117+
}
118+
}
119+
120+
Ok(all_files)
121+
}
122+
123+
/// Series of errors that might happen when processing files concurrently.
124+
#[derive(Debug)]
125+
pub enum ConcurrentErrors {
126+
/// Producer side error.
127+
///
128+
/// An error occurred inside the producer thread.
129+
Producer(String),
130+
/// Sender side error.
131+
///
132+
/// An error occurred when sending an item.
133+
Sender(String),
134+
/// Receiver side error.
135+
///
136+
/// An error occurred inside one of the receiver threads.
137+
Receiver(String),
138+
/// Thread side error.
139+
///
140+
/// A general error occurred when a thread is being spawned or run.
141+
Thread(String),
142+
}
143+
144+
/// Data related to files.
145+
pub struct FilesData {
146+
/// Kind of files included in a search.
147+
pub include: GlobSet,
148+
/// Kind of files excluded from a search.
149+
pub exclude: GlobSet,
150+
/// List of file paths.
151+
pub paths: Vec<String>,
152+
}
153+
154+
/// A runner to process files concurrently.
155+
pub struct ConcurrentRunner<Config> {
156+
proc_files: Box<ProcFilesFunction<Config>>,
157+
proc_dir_paths: Box<ProcDirPathsFunction<Config>>,
158+
proc_path: Box<ProcPathFunction<Config>>,
159+
num_jobs: usize,
160+
}
161+
162+
impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
163+
/// Creates a new `ConcurrentRunner`.
164+
///
165+
/// * `num_jobs` - Number of jobs utilized to process files concurrently.
166+
/// * `proc_files` - Function that processes each file found during
167+
/// the search.
168+
pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
169+
where
170+
ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
171+
{
172+
let num_jobs = std::cmp::max(2, num_jobs) - 1;
173+
Self {
174+
proc_files: Box::new(proc_files),
175+
proc_dir_paths: Box::new(null_proc_dir_paths),
176+
proc_path: Box::new(null_proc_path),
177+
num_jobs,
178+
}
179+
}
180+
181+
/// Sets the function to process the paths and subpaths contained in a
182+
/// directory.
183+
pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
184+
where
185+
ProcDirPaths:
186+
'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
187+
{
188+
self.proc_dir_paths = Box::new(proc_dir_paths);
189+
self
190+
}
191+
192+
/// Sets the function to process a single path.
193+
pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
194+
where
195+
ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
196+
{
197+
self.proc_path = Box::new(proc_path);
198+
self
199+
}
200+
201+
/// Runs the producer-consumer approach to process the files
202+
/// contained in a directory and in its own subdirectories.
203+
///
204+
/// * `config` - Information used to process a file.
205+
/// * `files_data` - Information about the files to be included or excluded
206+
/// from a search more the number of paths considered in the search.
207+
pub fn run(
208+
self,
209+
config: Config,
210+
files_data: FilesData,
211+
) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
212+
let cfg = Arc::new(config);
213+
214+
let (sender, receiver) = unbounded();
215+
216+
let producer = {
217+
let sender = sender.clone();
218+
219+
match thread::Builder::new()
220+
.name(String::from("Producer"))
221+
.spawn(move || {
222+
explore(
223+
files_data,
224+
&cfg,
225+
self.proc_dir_paths,
226+
self.proc_path,
227+
&sender,
228+
)
229+
}) {
230+
Ok(producer) => producer,
231+
Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
232+
}
233+
};
234+
235+
let mut receivers = Vec::with_capacity(self.num_jobs);
236+
let proc_files = Arc::new(self.proc_files);
237+
for i in 0..self.num_jobs {
238+
let receiver = receiver.clone();
239+
let proc_files = proc_files.clone();
240+
241+
let t = match thread::Builder::new()
242+
.name(format!("Consumer {}", i))
243+
.spawn(move || {
244+
consumer(receiver, proc_files);
245+
}) {
246+
Ok(receiver) => receiver,
247+
Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
248+
};
249+
250+
receivers.push(t);
251+
}
252+
253+
let all_files = match producer.join() {
254+
Ok(res) => res,
255+
Err(_) => {
256+
return Err(ConcurrentErrors::Producer(
257+
"Child thread panicked".to_owned(),
258+
))
259+
}
260+
};
261+
262+
// Poison the receiver, now that the producer is finished.
263+
for _ in 0..self.num_jobs {
264+
if let Err(e) = sender.send(None) {
265+
return Err(ConcurrentErrors::Sender(e.to_string()));
266+
}
267+
}
268+
269+
for receiver in receivers {
270+
if receiver.join().is_err() {
271+
return Err(ConcurrentErrors::Receiver(
272+
"A thread used to process a file panicked".to_owned(),
273+
));
274+
}
275+
}
276+
277+
all_files
278+
}
279+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ pub use crate::langs::*;
107107
mod tools;
108108
pub use crate::tools::*;
109109

110+
mod concurrent_files;
111+
pub use crate::concurrent_files::*;
112+
110113
mod traits;
111114
pub use crate::traits::*;
112115

0 commit comments

Comments
 (0)