This repository has been archived by the owner on Jul 11, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathiterator.rs
235 lines (200 loc) · 8.67 KB
/
iterator.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
use disk_buffer::*;
use arguments::errors::{FileErr};
use super::InputIteratorErr;
use itoa;
use time;
use std::io::{self, Write, Read};
use std::path::{Path, PathBuf};
use std::str;
pub struct ETA {
pub left: u64,
pub time: u64,
pub average: u64,
}
impl ETA {
pub fn write_to_stderr(&self, completed: usize) {
let stderr = io::stderr();
let mut stderr = stderr.lock();
// Write the ETA to the standard error
let _ = stderr.write(b"ETA: ");
let _ = itoa::write(&mut stderr, self.time / 1_000_000_000);
// Write the number of inputs left to process to standard error
let _ = stderr.write(b"s Left: ");
let _ = itoa::write(&mut stderr, self.left);
// Write the average runtime of processes (with two decimal places) to standard error
let _ = stderr.write(b" AVG: ");
let _ = itoa::write(&mut stderr, self.average / 1_000_000_000);
let _ = stderr.write(b".");
let remainder = (self.average % 1_000_000_000) / 10_000_000;
let _ = itoa::write(&mut stderr, remainder);
if remainder < 10 { let _ = stderr.write(b"0"); }
// Write the number of completed units so far to standard error
let _ = stderr.write(b"s Completed: ");
let _ = itoa::write(&mut stderr, completed);
let _ = stderr.write(b"\n");
}
}
/// The `InputIterator` tracks the total number of arguments, the current argument counter, and
/// takes ownership of an `InputBuffer` which buffers input arguments from the disk when arguments
/// stored in memory are depleted.
pub struct InputIterator<IO: Read> {
pub total_arguments: usize,
pub curr_argument: usize,
pub completed: usize,
start_time: u64,
average_time: u64,
input_buffer: InputBuffer<IO>,
}
impl<IO: Read> InputIterator<IO> {
pub fn new(path: &Path, file: IO, args: usize) -> Result<InputIterator<IO>, FileErr> {
// Create an `InputBuffer` from the unprocessed file.
let disk_buffer = DiskBufferReader::new(path, file);
let input_buffer = InputBuffer::new(disk_buffer)?;
Ok(InputIterator {
total_arguments: args,
curr_argument: 0,
completed: 0,
input_buffer: input_buffer,
start_time: time::precise_time_ns(),
average_time: 0,
})
}
fn buffer(&mut self) -> Result<(), InputIteratorErr> {
// Read the next set of arguments from the unprocessed file, but only read as many bytes
// as the buffer can hold without overwriting the unused bytes that was shifted to the left.
self.input_buffer.disk_buffer.buffer(self.input_buffer.capacity).map_err(|why| {
InputIteratorErr::FileRead(PathBuf::from(self.input_buffer.disk_buffer.path.clone()), why)
})?;
let bytes_read = self.input_buffer.disk_buffer.capacity;
// Update the recorded number of arguments and indices.
self.input_buffer.start = self.input_buffer.end + 1;
count_arguments(&mut self.input_buffer, bytes_read);
self.input_buffer.index = 0;
Ok(())
}
pub fn eta(&self) -> ETA {
let left = self.total_arguments as u64 - self.completed as u64;
ETA {
left: left,
time: left * self.average_time,
average: self.average_time
}
}
pub fn next_value(&mut self, buffer: &mut String) -> Option<Result<(), InputIteratorErr>> {
if self.curr_argument == self.total_arguments {
// If all arguments have been depleted, return `None`.
return None
} else if self.curr_argument == self.input_buffer.end {
// If the next argument is not stored in the internal buffer, update the buffer.
if let Err(err) = self.buffer() { return Some(Err(err)); }
}
// Obtain the start and end indices to know where to find the input in the array.
let end = self.input_buffer.indices[self.input_buffer.index + 1];
let start = if self.input_buffer.index == 0 {
self.input_buffer.indices[self.input_buffer.index]
} else {
self.input_buffer.indices[self.input_buffer.index] + 1
};
// Update times
match self.completed {
0 => (),
1 => self.average_time = time::precise_time_ns() - self.start_time,
_ => self.average_time = (time::precise_time_ns() - self.start_time) / self.completed as u64,
}
// Increment the iterator's state.
self.curr_argument += 1;
self.input_buffer.index += 1;
// Copy the input from the buffer into a `String` and return it
buffer.truncate(0);
unsafe { buffer.push_str(str::from_utf8_unchecked(&self.input_buffer.disk_buffer.data[start..end])); }
Some(Ok(()))
}
}
// Implement the `Iterator` trait for `InputIterator` to gain access to all the `Iterator` methods for free.
impl<IO: Read> Iterator for InputIterator<IO> {
type Item = Result<String, InputIteratorErr>;
fn next(&mut self) -> Option<Result<String, InputIteratorErr>> {
if self.curr_argument == self.total_arguments {
// If all arguments have been depleted, return `None`.
return None
} else if self.curr_argument == self.input_buffer.end {
// If the next argument is not stored in the internal buffer, update the buffer.
if let Err(err) = self.buffer() { return Some(Err(err)); }
}
// Obtain the start and end indices to know where to find the input in the array.
let end = self.input_buffer.indices[self.input_buffer.index + 1];
let start = if self.input_buffer.index == 0 {
self.input_buffer.indices[self.input_buffer.index]
} else {
self.input_buffer.indices[self.input_buffer.index] + 1
};
// Update times
match self.completed {
0 => (),
1 => self.average_time = time::precise_time_ns() - self.start_time,
_ => self.average_time = (time::precise_time_ns() - self.start_time) / self.completed as u64,
}
// Increment the iterator's state.
self.curr_argument += 1;
self.input_buffer.index += 1;
// Copy the input from the buffer into a `String` and return it
Some(Ok(String::from_utf8_lossy(&self.input_buffer.disk_buffer.data[start..end]).into_owned()))
}
}
/// Higher level buffer implementation which keeps track of how many inputs are currently
/// stored in the buffer, where all of the indices of the input delimiters are, and which
/// segment of the complete set of arguments are currently buffered.
struct InputBuffer<IO: Read> {
index: usize,
start: usize,
end: usize,
capacity: usize,
disk_buffer: DiskBufferReader<IO>,
indices: [usize; BUFFER_SIZE / 2],
}
impl<IO: Read> InputBuffer<IO> {
/// Takes ownership of a `DiskBufferReader` and transforms it into a higher level
/// `InputBuffer` which will track additional information about the disk buffer.
fn new(mut unprocessed: DiskBufferReader<IO>) -> Result<InputBuffer<IO>, FileErr> {
unprocessed.buffer(0).map_err(|why| FileErr::Read(unprocessed.path.clone(), why))?;
let bytes_read = unprocessed.capacity;
let mut temp = InputBuffer {
index: 0,
start: 0,
end: 0,
capacity: 0,
disk_buffer: unprocessed,
indices: [0usize; BUFFER_SIZE / 2]
};
count_arguments(&mut temp, bytes_read);
Ok(temp)
}
}
/// Counts the number of arguments that are stored in the buffer, marking the location of
/// the indices and the actual capacity of the buffer's useful information.
fn count_arguments<IO: Read>(buffer: &mut InputBuffer<IO>, bytes_read: usize) {
let mut newlines = 1;
buffer.capacity = 0;
for (indice, _) in buffer.disk_buffer.data.iter().take(bytes_read).enumerate().filter(|&(_, byte)| *byte == b'\n') {
buffer.indices[newlines] = indice;
newlines += 1;
}
newlines -= 1;
buffer.capacity = buffer.indices[newlines];
buffer.end += newlines;
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
#[test]
fn test_input_iterator() {
let file = File::open("tests/buffer.dat").unwrap();
let iterator = InputIterator::new(Path::new("tests/buffer.dat"), file, 4096).unwrap();
assert_eq!(0, iterator.input_buffer.start);
assert_eq!(1859, iterator.input_buffer.end);
for (actual, expected) in iterator.zip((1..4096)) {
assert_eq!(actual.unwrap(), expected.to_string());
}
}
}