Skip to content

Commit

Permalink
remove more unwraps (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
omerbenamram authored Apr 1, 2019
1 parent dba7398 commit 6803cad
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 47 deletions.
17 changes: 10 additions & 7 deletions src/binxml/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<'c> BinXmlDeserializer<'c> {
};

let mut tokens = vec![];
let mut iterator = de.iter_tokens(data_size);
let mut iterator = de.iter_tokens(data_size)?;

loop {
let token = iterator.next();
Expand All @@ -133,17 +133,17 @@ impl<'c> BinXmlDeserializer<'c> {
}

/// Reads `data_size` bytes of binary xml, or until EOF marker.
pub fn iter_tokens(self, data_size: Option<u32>) -> IterTokens<'c> {
pub fn iter_tokens(self, data_size: Option<u32>) -> Result<IterTokens<'c>, Error> {
let mut cursor = Cursor::new(self.data);
cursor.seek(SeekFrom::Start(self.offset)).unwrap();
cursor.seek(SeekFrom::Start(self.offset))?;

IterTokens {
Ok(IterTokens {
cursor,
ctx: Rc::clone(&self.ctx),
data_size,
data_read_so_far: 0,
eof: false,
}
})
}
}

Expand Down Expand Up @@ -338,7 +338,10 @@ mod tests {
(3872_usize + EVTX_RECORD_HEADER_SIZE) as u64,
);

for token in deser.iter_tokens(Some(record_header.data_size - 4 - 4 - 4 - 8 - 8)) {
for token in deser
.iter_tokens(Some(record_header.data_size - 4 - 4 - 4 - 8 - 8))
.unwrap()
{
if let Err(e) = token {
let mut cursor = Cursor::new(chunk.data.as_slice());
println!("{}", e);
Expand All @@ -357,7 +360,7 @@ mod tests {

let chunk = EvtxChunkData::new(from_start_of_chunk.to_vec()).unwrap();

for record in chunk.parse().into_iter().take(1) {
for record in chunk.parse().unwrap().into_iter().take(1) {
assert!(record.is_ok(), record.unwrap())
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/evtx_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ impl EvtxChunkData {

Ok(chunk)
}
pub fn into_records(self) -> Vec<Result<EvtxRecord, failure::Error>> {
self.parse().into_iter().collect()
pub fn into_records(self) -> Result<Vec<Result<EvtxRecord, failure::Error>>, failure::Error> {
Ok(self.parse()?.into_iter().collect())
}

pub fn parse(&self) -> EvtxChunk {
EvtxChunk::new(&self.data, &self.header).unwrap()
pub fn parse(&self) -> Result<EvtxChunk, failure::Error> {
EvtxChunk::new(&self.data, &self.header)
}

pub fn validate_data_checksum(&self) -> bool {
Expand Down Expand Up @@ -188,8 +188,12 @@ impl<'a> Iterator for IterChunkRecords<'a> {
let mut output_builder = XMLOutput::with_writer(record_buffer);

let mut tokens = vec![];
let iter = match deserializer.iter_tokens(Some(binxml_data_size)) {
Ok(iter) => iter,
Err(e) => return Some(Err(format_err!("{}", e))),
};

for token in deserializer.iter_tokens(Some(binxml_data_size)) {
for token in iter {
match token {
Ok(token) => {
trace!("successfully read {:?}", token);
Expand Down
74 changes: 39 additions & 35 deletions src/evtx_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,86 +73,88 @@ impl<T: ReadSeek> EvtxParser<T> {
}

#[cfg(feature = "multithreading")]
pub fn parallel_records(mut self) -> IterRecords<T> {
let first_chunk = Self::allocate_chunk(&mut self.data, 0).expect("Invalid chunk");
let iterators = vec![first_chunk.into_records().into_iter()];

pub fn parallel_records(self) -> IterRecords<T> {
IterRecords {
header: self.header,
data: self.data,
current_chunk_number: 1,
chunk_records: iterators.into_iter().flatten(),
current_chunk_number: 0,
chunk_records: None,
num_threads: current_num_threads(),
}
}

pub fn records(mut self) -> IterRecords<T> {
let first_chunk = Self::allocate_chunk(&mut self.data, 0).expect("Invalid chunk");
let iterators = vec![first_chunk.into_records().into_iter()];

pub fn records(self) -> IterRecords<T> {
IterRecords {
header: self.header,
data: self.data,
current_chunk_number: 1,
chunk_records: iterators.into_iter().flatten(),
current_chunk_number: 0,
chunk_records: None,
num_threads: 1,
}
}
}

impl<T: ReadSeek> IterRecords<T> {
fn allocate_chunk(&mut self) {
fn allocate_chunks(&mut self) -> Result<(), Error> {
let mut chunks = vec![];
for _ in 0..self.num_threads {
if self.current_chunk_number + 1 == self.header.chunk_count {
break;
}

info!("Allocating new chunk {}", self.current_chunk_number);
let chunk = EvtxParser::allocate_chunk(&mut self.data, self.current_chunk_number)
.expect("Invalid chunk");
let chunk = EvtxParser::allocate_chunk(&mut self.data, self.current_chunk_number)?;

chunks.push(chunk);
self.current_chunk_number += 1;
}

#[cfg(feature = "multithreading")]
let iterators: Vec<IntoIter<Result<EvtxRecord, failure::Error>>> = {
let iterators: Result<
Vec<Vec<Result<EvtxRecord, failure::Error>>>,
failure::Error,
> = {
if self.num_threads > 1 {
chunks
.into_par_iter()
.map(|c| c.into_records().into_iter())
.collect()
chunks.into_par_iter().map(|c| c.into_records()).collect()
} else {
chunks
.into_iter()
.map(|c| c.into_records().into_iter())
.collect()
chunks.into_iter().map(|c| c.into_records()).collect()
}
};

#[cfg(not(feature = "multithreading"))]
let iterators: Vec<IntoIter<Result<EvtxRecord, failure::Error>>> = chunks
.into_iter()
.map(|c| c.into_records().into_iter())
.collect();

self.chunk_records = iterators.into_iter().flatten();
let mut iterators: Result<
Vec<Vec<Result<EvtxRecord, failure::Error>>>,
failure::Error,
> = chunks.into_iter().map(|c| c.into_records()).collect();

match iterators {
Ok(inner) => {
self.chunk_records = Some(inner.into_iter().flatten());
Ok(())
}
Err(e) => Err(e),
}
}
}

pub struct IterRecords<T: ReadSeek> {
header: EvtxFileHeader,
data: T,
current_chunk_number: u16,
chunk_records: Flatten<IntoIter<IntoIter<Result<EvtxRecord, failure::Error>>>>,
chunk_records: Option<Flatten<IntoIter<Vec<Result<EvtxRecord, failure::Error>>>>>,
num_threads: usize,
}

impl<T: ReadSeek> Iterator for IterRecords<T> {
type Item = Result<EvtxRecord, Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
let mut next = self.chunk_records.next();
if self.chunk_records.is_none() {
if let Err(e) = self.allocate_chunks() {
return Some(Err(e));
}
}

let mut next = self.chunk_records.as_mut().unwrap().next();

// Need to load a new chunk.
if next.is_none() {
Expand All @@ -161,8 +163,10 @@ impl<T: ReadSeek> Iterator for IterRecords<T> {
return None;
}

self.allocate_chunk();
next = self.chunk_records.next()
if let Err(e) = self.allocate_chunks() {
return Some(Err(e));
}
next = self.chunk_records.as_mut().unwrap().next()
}

next
Expand Down Expand Up @@ -304,7 +308,7 @@ mod tests {

assert!(chunk.validate_checksum());

for record in chunk.parse().into_iter() {
for record in chunk.parse().unwrap().into_iter() {
if let Err(e) = record {
println!("{}", e);
panic!();
Expand Down

0 comments on commit 6803cad

Please sign in to comment.