Skip to content

Commit 2e4ed61

Browse files
committed
Add APIs to process the files in a directory concurrently
They can be used in different contexts, for example, to count the number of files in a directory, the number of its subdirectories or extract some information contained or scattered through different files. They can also be used to compute some metrics that act on files in some way.
1 parent a6fb78b commit 2e4ed61

File tree

4 files changed

+291
-1
lines changed

4 files changed

+291
-1
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@ license = "MPL-2.0"
1212

1313
[dependencies]
1414
aho-corasick = "^0.7"
15+
crossbeam = "^0.8"
1516
enum-iterator = "^0.7"
1617
fxhash = "0.2"
18+
globset = "^0.4"
1719
lazy_static = "^1.3"
18-
num-format = "^0.4"
1920
num = "^0.4"
2021
num-derive = "^0.3"
22+
num-format = "^0.4"
2123
num-traits = "^0.2"
2224
petgraph = "^0.6"
2325
regex = "^1.5"
2426
serde = { version = "^1.0", features = ["derive"] }
2527
termcolor = "^1.1"
28+
walkdir = "^2.3"
2629

2730
tree-sitter = "0.19.3"
2831
tree-sitter-java = "0.19.0"

src/concurrent_files.rs

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
use std::collections::HashMap;
2+
use std::path::{Path, PathBuf};
3+
use std::sync::Arc;
4+
use std::thread;
5+
6+
use crossbeam::channel::{unbounded, Receiver, Sender};
7+
use globset::GlobSet;
8+
use walkdir::{DirEntry, WalkDir};
9+
10+
type ProcFiles<Config> = Box<dyn Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync>;
11+
12+
type ProcDirPaths<Config> =
13+
Box<dyn Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync>;
14+
15+
type ProcPath<Config> = Box<dyn Fn(&Path, &Config) + Send + Sync>;
16+
17+
// Null functions removed at compile time
18+
fn null_proc_dir_paths<Config>(_: &mut HashMap<String, Vec<PathBuf>>, _: &Path, _: &Config) {}
19+
fn null_proc_path<Config>(_: &Path, _: &Config) {}
20+
21+
struct JobItem<Config> {
22+
path: PathBuf,
23+
cfg: Arc<Config>,
24+
}
25+
26+
type JobReceiver<Config> = Receiver<Option<JobItem<Config>>>;
27+
type JobSender<Config> = Sender<Option<JobItem<Config>>>;
28+
29+
fn consumer<Config, ProcFiles>(receiver: JobReceiver<Config>, func: Arc<ProcFiles>)
30+
where
31+
ProcFiles: Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
32+
{
33+
while let Ok(job) = receiver.recv() {
34+
if job.is_none() {
35+
break;
36+
}
37+
// Cannot panic because of the check immediately above.
38+
let job = job.unwrap();
39+
let path = job.path.clone();
40+
41+
if let Err(err) = func(job.path, &job.cfg) {
42+
eprintln!("{:?} for file {:?}", err, path);
43+
}
44+
}
45+
}
46+
47+
fn send_file<T>(
48+
path: PathBuf,
49+
cfg: &Arc<T>,
50+
sender: &JobSender<T>,
51+
) -> Result<(), ConcurrentErrors> {
52+
if let Err(e) = sender.send(Some(JobItem {
53+
path,
54+
cfg: Arc::clone(cfg),
55+
})) {
56+
return Err(ConcurrentErrors::Sender(e.to_string()));
57+
}
58+
59+
Ok(())
60+
}
61+
62+
fn is_hidden(entry: &DirEntry) -> bool {
63+
entry
64+
.file_name()
65+
.to_str()
66+
.map(|s| s.starts_with('.'))
67+
.unwrap_or(false)
68+
}
69+
70+
fn explore<Config, ProcDirPaths, ProcPath>(
71+
files_data: FilesData,
72+
cfg: &Arc<Config>,
73+
proc_dir_paths: ProcDirPaths,
74+
proc_path: ProcPath,
75+
sender: &JobSender<Config>,
76+
) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors>
77+
where
78+
ProcDirPaths: Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
79+
ProcPath: Fn(&Path, &Config) + Send + Sync,
80+
{
81+
let FilesData {
82+
mut paths,
83+
ref include,
84+
ref exclude,
85+
} = files_data;
86+
87+
let mut all_files: HashMap<String, Vec<PathBuf>> = HashMap::new();
88+
89+
for path in paths.drain(..) {
90+
let path = PathBuf::from(path);
91+
if !path.exists() {
92+
eprintln!("Warning: File doesn't exist: {:?}", path);
93+
continue;
94+
}
95+
if path.is_dir() {
96+
for entry in WalkDir::new(path)
97+
.into_iter()
98+
.filter_entry(|e| !is_hidden(e))
99+
{
100+
let entry = match entry {
101+
Ok(entry) => entry,
102+
Err(e) => return Err(ConcurrentErrors::Sender(e.to_string())),
103+
};
104+
let path = entry.path().to_path_buf();
105+
if (include.is_empty() || include.is_match(&path))
106+
&& (exclude.is_empty() || !exclude.is_match(&path))
107+
&& path.is_file()
108+
{
109+
proc_dir_paths(&mut all_files, &path, cfg);
110+
send_file(path, cfg, sender)?;
111+
}
112+
}
113+
} else if (include.is_empty() || include.is_match(&path))
114+
&& (exclude.is_empty() || !exclude.is_match(&path))
115+
&& path.is_file()
116+
{
117+
proc_path(&path, cfg);
118+
send_file(path, cfg, sender)?;
119+
}
120+
}
121+
122+
Ok(all_files)
123+
}
124+
125+
/// Series of errors that might happen when processing files concurrently.
126+
#[derive(Debug)]
127+
pub enum ConcurrentErrors {
128+
/// Producer side error.
129+
///
130+
/// An error occurred inside the producer thread.
131+
Producer(String),
132+
/// Sender side error.
133+
///
134+
/// An error occurred when sending an item.
135+
Sender(String),
136+
/// Receiver side error.
137+
///
138+
/// An error occurred inside one of the receiver threads.
139+
Receiver(String),
140+
/// Thread side error.
141+
///
142+
/// A general error occurred when a thread is being spawned or run.
143+
Thread(String),
144+
}
145+
146+
/// Data related to files.
147+
pub struct FilesData {
148+
/// Kind of files included in a search.
149+
pub include: GlobSet,
150+
/// Kind of files excluded from a search.
151+
pub exclude: GlobSet,
152+
/// List of file paths.
153+
pub paths: Vec<String>,
154+
}
155+
156+
/// A runner to process files concurrently.
157+
pub struct ConcurrentRunner<Config> {
158+
proc_files: ProcFiles<Config>,
159+
proc_dir_paths: ProcDirPaths<Config>,
160+
proc_path: ProcPath<Config>,
161+
num_jobs: usize,
162+
}
163+
164+
impl<Config: 'static + Send + Sync> ConcurrentRunner<Config> {
165+
/// Creates a new `ConcurrentRunner`.
166+
///
167+
/// * `num_jobs` - Number of jobs utilized to process files concurrently.
168+
/// * `proc_files` - Function that processes each file found during
169+
/// the search.
170+
pub fn new<ProcFiles>(num_jobs: usize, proc_files: ProcFiles) -> Self
171+
where
172+
ProcFiles: 'static + Fn(PathBuf, &Config) -> std::io::Result<()> + Send + Sync,
173+
{
174+
let num_jobs = std::cmp::max(2, num_jobs) - 1;
175+
Self {
176+
proc_files: Box::new(proc_files),
177+
proc_dir_paths: Box::new(null_proc_dir_paths),
178+
proc_path: Box::new(null_proc_path),
179+
num_jobs,
180+
}
181+
}
182+
183+
/// Sets the function to process the paths and subpaths contained in a
184+
/// directory.
185+
pub fn set_proc_dir_paths<ProcDirPaths>(mut self, proc_dir_paths: ProcDirPaths) -> Self
186+
where
187+
ProcDirPaths:
188+
'static + Fn(&mut HashMap<String, Vec<PathBuf>>, &Path, &Config) + Send + Sync,
189+
{
190+
self.proc_dir_paths = Box::new(proc_dir_paths);
191+
self
192+
}
193+
194+
/// Sets the function to process a single path.
195+
pub fn set_proc_path<ProcPath>(mut self, proc_path: ProcPath) -> Self
196+
where
197+
ProcPath: 'static + Fn(&Path, &Config) + Send + Sync,
198+
{
199+
self.proc_path = Box::new(proc_path);
200+
self
201+
}
202+
203+
/// Runs the producer-consumer approach to process the files
204+
/// contained in a directory and in its own subdirectories.
205+
///
206+
/// * `config` - Information used to process a file.
207+
/// * `files_data` - Information about the files to be included or excluded
208+
/// from a search more the number of paths considered in the search.
209+
pub fn run(
210+
self,
211+
config: Config,
212+
files_data: FilesData,
213+
) -> Result<HashMap<String, Vec<PathBuf>>, ConcurrentErrors> {
214+
let cfg = Arc::new(config);
215+
216+
let (sender, receiver) = unbounded();
217+
218+
let producer = {
219+
let sender = sender.clone();
220+
221+
match thread::Builder::new()
222+
.name(String::from("Producer"))
223+
.spawn(move || {
224+
explore(
225+
files_data,
226+
&cfg,
227+
self.proc_dir_paths,
228+
self.proc_path,
229+
&sender,
230+
)
231+
}) {
232+
Ok(producer) => producer,
233+
Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
234+
}
235+
};
236+
237+
let mut receivers = Vec::with_capacity(self.num_jobs);
238+
let proc_files = Arc::new(self.proc_files);
239+
for i in 0..self.num_jobs {
240+
let receiver = receiver.clone();
241+
let proc_files = proc_files.clone();
242+
243+
let t = match thread::Builder::new()
244+
.name(format!("Consumer {}", i))
245+
.spawn(move || {
246+
consumer(receiver, proc_files);
247+
}) {
248+
Ok(receiver) => receiver,
249+
Err(e) => return Err(ConcurrentErrors::Thread(e.to_string())),
250+
};
251+
252+
receivers.push(t);
253+
}
254+
255+
let all_files = match producer.join() {
256+
Ok(res) => res,
257+
Err(_) => {
258+
return Err(ConcurrentErrors::Producer(
259+
"Child thread panicked".to_owned(),
260+
))
261+
}
262+
};
263+
264+
// Poison the receiver, now that the producer is finished.
265+
for _ in 0..self.num_jobs {
266+
if let Err(e) = sender.send(None) {
267+
return Err(ConcurrentErrors::Sender(e.to_string()));
268+
}
269+
}
270+
271+
for receiver in receivers {
272+
if receiver.join().is_err() {
273+
return Err(ConcurrentErrors::Receiver(
274+
"A thread used to process a file panicked".to_owned(),
275+
));
276+
}
277+
}
278+
279+
all_files
280+
}
281+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ pub use crate::langs::*;
107107
mod tools;
108108
pub use crate::tools::*;
109109

110+
mod concurrent_files;
111+
pub use crate::concurrent_files::*;
112+
110113
mod traits;
111114
pub use crate::traits::*;
112115

0 commit comments

Comments
 (0)