-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdu-parallel.rs
127 lines (109 loc) · 3.39 KB
/
du-parallel.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//! Disk usage for an ext4 filesystem.
use al_crunch_pool::{execute, Options, Sender};
use al_mmap::Mmap;
use ap_storage::{
attr::{Attributes, ID, SIZE},
directory::DirIterator,
file::File,
file::FileType,
Error, FileSystem,
};
use ap_storage_ext4_ro::{file::Ext4File, Ext4Fs};
use ap_storage_memory::ReadSlice;
use gumdrop::Options as GumdropOptions;
use std::rc::Rc;
#[derive(Debug, GumdropOptions)]
struct CommandOptions {
/// Display help.
help: bool,
/// Direct acccess.
no_direct: bool,
/// Leaf optimization
leaf_optimization: bool,
/// Number of parallel threads.
threads: Option<usize>,
/// Number of slots in the queue per thread.
#[options(default = "8")]
slots: usize,
/// Number of repeats.
#[options(default = "1")]
repeat: usize,
/// Start directory.
#[options(default = "/")]
start: String,
}
/// The state every worker keeps on its own.
pub struct WorkerState {
fs: Rc<Ext4Fs<'static>>,
size: u64,
count: usize,
}
fn visit(sender: &Sender<WorkerState>, nr: u64, worker: &mut WorkerState) {
let fs = Rc::clone(&worker.fs);
let dir = Ext4File::new(&fs, nr).unwrap();
let Some(mut iter) = dir.dir() else { return };
while let Ok(Some(entry)) = iter.next(&mut []) {
if matches!(entry.typ, FileType::Unknown | FileType::Parent) {
continue;
}
worker.count += 1;
let Ok(child) = dir.open(entry.offset) else {
continue;
};
worker.size += child.attr().get(SIZE, &mut []).and_then(|x| x.as_u64()).unwrap_or(0);
if entry.typ == FileType::Directory {
let sender2 = sender.clone();
sender.send(worker, move |state| {
visit(&sender2, entry.id, state);
});
}
}
}
fn main() -> Result<(), Error> {
let opts = CommandOptions::parse_args_default_or_exit();
let mmap = Mmap::new("/dev/stdin", !opts.no_direct, 0, 0)?;
let disk = ReadSlice(mmap.0);
let disk: &(dyn ap_storage::Read + Sync) = &disk;
let options = Options::default()
.one_is_zero()
.io_bound()
.threads(opts.threads)
.slots(opts.slots);
// a function to produce the state for every worker
let make_state = |_| {
WorkerState {
// XXX we don't handle the lifetimes correctly
fs: Rc::new(Ext4Fs::new(unsafe { std::mem::transmute(disk) }, opts.leaf_optimization).unwrap()),
size: 0,
count: 0,
}
};
// the bounded Job queue
for _i in 0..opts.repeat {
let state = execute(
options.clone(),
make_state,
|state| (state.count, state.size),
|sender| {
let mut state = make_state(0);
let root = state.fs.root().unwrap();
let child = root
.lookup_path(opts.start.as_bytes())
.expect("start directory not found");
visit(
sender,
child.attr().get(ID, &mut []).expect("no ID").as_u64().unwrap(),
&mut state,
);
(state.count, state.size)
},
|mut x, y| {
x.0 += y.0;
x.1 += y.1;
x
},
);
println!("{}\t{}\t{}", opts.start, state.0, state.1);
}
Ok(())
}