Skip to content

Commit

Permalink
♻️ Improve stability of journal file reader
Browse files Browse the repository at this point in the history
  • Loading branch information
rster2002 committed May 7, 2024
1 parent 9a93a85 commit 0b511a8
Showing 1 changed file with 177 additions and 31 deletions.
208 changes: 177 additions & 31 deletions ed-journals/src/models/journal_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ where
source: T,
position: usize,
file_read_buffer: String,
entry_buffer: VecDeque<JournalEvent>,
entry_buffer: VecDeque<Result<JournalEvent, JournalReaderError>>,
failing: bool,
}

#[derive(Debug, Error)]
Expand All @@ -61,8 +62,8 @@ pub enum JournalReaderError {
#[error(transparent)]
Utf8Error(#[from] FromUtf8Error),

#[error(transparent)]
SerdeJsonError(#[from] serde_json::Error),
#[error("Failed to parse log line: {0}")]
FailedToParseLine(#[from] serde_json::Error),
}

impl<T> JournalFileReader<T>
Expand All @@ -75,37 +76,32 @@ where

let mut lines = self.file_read_buffer.lines().peekable();

loop {
let Some(line) = lines.next() else {
break;
};

if lines.peek().is_none() {
// if self.file_read_buffer == line {
// break;
// }

// Try to parse the remainder. If it does parse, it also counts processed.
if let Ok(entry) = serde_json::from_str(line) {
self.entry_buffer.push_back(entry);
self.file_read_buffer = String::new();
break;
}

let new_line = format!("{}\n", line);
let _ = mem::replace(&mut self.file_read_buffer, new_line);
break;
while let Some(line) = lines.next() {
// Primarily here to handle the issue that the [position] points to the last character
// of the current line and if you start reading from there again it will include the
// \n that will be added by the new line which causes the first line to always start
// with an \n. This is just a quick fix that won't ever come back to bite my I swear.
if line == "" {
continue;
}

let result = serde_json::from_str(line);
let parse_result = serde_json::from_str(line);

if result.is_err() {
dbg!(&line);
// If the line didn't parse, but the line is the last line that was read, it will not
// error and instead add the current line back into the read buffer to hopefully be
// completed when new lines are added.
if parse_result.is_err() && lines.peek().is_none() {
self.file_read_buffer = line.to_string();
return Ok(());
}

self.entry_buffer.push_back(result?);
self.entry_buffer.push_back(parse_result.map_err(|e| e.into()));
}

// If it reaches this point that means that the whole read buffer has been processed, so it
// can be cleared.
self.file_read_buffer = String::new();

Ok(())
}
}
Expand All @@ -120,6 +116,7 @@ where
position: 0,
file_read_buffer: String::new(),
entry_buffer: VecDeque::new(),
failing: false,
}
}
}
Expand All @@ -131,11 +128,21 @@ where
type Item = Result<JournalEvent, JournalReaderError>;

fn next(&mut self) -> Option<Self::Item> {
if let Err(err) = self.read_next() {
return Some(Err(err));
// If the reader has failed it will not return any new lines.
if self.failing {
return None;
}

let result = self.read_next();

// If an error has been returned at this location that means that it is something that
// cannot be recovered from.
if let Err(error) = result {
self.failing = true;
return Some(Err(error))
}

self.entry_buffer.pop_front().map(Ok)
self.entry_buffer.pop_front()
}
}

Expand All @@ -146,6 +153,7 @@ mod tests {
use std::io::Cursor;

use chrono::{TimeZone, Utc};
use crate::journal_event_content::fss_signal_discovered_event::FSSSignalDiscoveredEvent;

use crate::models::journal_event::JournalEvent;
use crate::models::journal_event_content::{JournalEventContent, JournalEventContentKind};
Expand Down Expand Up @@ -198,7 +206,7 @@ mod tests {
}

#[test]
fn partial_lines_are_read_correctly() {
fn partial_last_lines_are_read_correctly() {
fs::write("a.tmp", "").unwrap();

let file = File::open("a.tmp").unwrap();
Expand Down Expand Up @@ -228,6 +236,144 @@ mod tests {
JournalEventContentKind::Commander
);

assert!(reader.next().is_none());

fs::remove_file("a.tmp").unwrap();
}

#[test]
fn partial_last_lines_are_read_correctly_2() {
fs::write("d.tmp", "").unwrap();

let file = File::open("d.tmp").unwrap();

let mut reader = JournalFileReader::from(file);

assert!(reader.next().is_none());

fs::write(
"d.tmp",
r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"#,
)
.unwrap();

assert!(reader.next().is_none());

fs::write("d.tmp", r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"language":"English/UK","#)
.unwrap();

assert!(reader.next().is_none());

fs::write("d.tmp", r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"language":"English/UK","Odyssey":true,"gameversion":"4.0.0.1450","build":"r286858/r0 "}"#)
.unwrap();

assert_eq!(
reader.next().unwrap().unwrap().content.kind(),
JournalEventContentKind::FileHeader
);

assert!(reader.next().is_none());

fs::remove_file("d.tmp").unwrap();
}

#[test]
fn incorrect_lines_return_an_err_only_when_it_is_expected() {
fs::write("b.tmp", "").unwrap();

let file = File::open("b.tmp").unwrap();

let mut reader = JournalFileReader::from(file);

assert!(reader.next().is_none());

fs::write(
"b.tmp",
r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"language":"English/UK","Odyssey":true,"gameversion":"4.0.0.1450","build":"r286858/r0 "}
{"timestamp":"2022-10-22T15:12:05Z","event":"Commander","FID":"F123456789","Na BADLY FORMATTED
{"timestamp":"2022-10-22T15:12:33Z","event":"FSSSignalDiscovered","SystemAddress":5031654888146,"SignalName":"HMS CHUCKLE PHUCK J6K-8XT","IsStation":true}
{"timestamp":"2022-10-22T15:12:33Z","event":"FSSSignalDiscovered","SystemAddress":5031654888146,"#,
)
.unwrap();

// The first like should parse like expected
assert_eq!(
reader.next().unwrap().unwrap().content.kind(),
JournalEventContentKind::FileHeader
);

// The second line should return an error. The above example it unlikely to happen and this
// is most likely to happen because of some unknown format.
assert!(reader.next().unwrap().is_err());

// The next like should return like normal.
assert_eq!(
reader.next().unwrap().unwrap().content,
JournalEventContent::FSSSignalDiscovered(FSSSignalDiscoveredEvent {
system_address: 5031654888146,
signal_name: "HMS CHUCKLE PHUCK J6K-8XT".to_string(),
signal_name_localized: None,
is_station: true,
}),
);

// The last line, even though it's not correctly formatted should return None as it's the
// last line and could just be impartial.
assert!(reader.next().is_none());

fs::write(
"b.tmp",
r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"language":"English/UK","Odyssey":true,"gameversion":"4.0.0.1450","build":"r286858/r0 "}
{"timestamp":"2022-10-22T15:12:05Z","event":"Commander","FID":"F123456789","Na BADLY FORMATTED
{"timestamp":"2022-10-22T15:12:33Z","event":"FSSSignalDiscovered","SystemAddress":5031654888146,"SignalName":"HMS CHUCKLE PHUCK J6K-8XT","IsStation":true}
{"timestamp":"2022-10-22T15:12:33Z","event":"FSSSignalDiscovered","SystemAddress":5031654888146,"SignalName":"BREAK OF DAWN V3T-G0Y","IsStation":true}"#,
)
.unwrap();

// The when the last line is completed, the line should be parsed and returned correctly.
assert_eq!(
reader.next().unwrap().unwrap().content,
JournalEventContent::FSSSignalDiscovered(FSSSignalDiscoveredEvent {
system_address: 5031654888146,
signal_name: "BREAK OF DAWN V3T-G0Y".to_string(),
signal_name_localized: None,
is_station: true,
}),
);

fs::remove_file("b.tmp").unwrap();
}

#[test]
fn last_lines_are_read_correctly() {
fs::write("c.tmp", "").unwrap();

let file = File::open("c.tmp").unwrap();

let mut reader = JournalFileReader::from(file);

assert!(reader.next().is_none());

fs::write(
"c.tmp",
r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"language":"English/UK","Odyssey":true,"gameversion":"4.0.0.1450","build":"r286858/r0 "}"#,
)
.unwrap();

assert_eq!(
reader.next().unwrap().unwrap().content.kind(),
JournalEventContentKind::FileHeader
);

fs::write("c.tmp", r#"{"timestamp":"2022-10-22T15:10:41Z","event":"Fileheader","part":1,"language":"English/UK","Odyssey":true,"gameversion":"4.0.0.1450","build":"r286858/r0 "}
{"timestamp":"2022-10-22T15:12:05Z","event":"Commander","FID":"F123456789","Name":"TEST"}"#)
.unwrap();

assert_eq!(
reader.next().unwrap().unwrap().content.kind(),
JournalEventContentKind::Commander
);

fs::remove_file("c.tmp").unwrap();
}
}

0 comments on commit 0b511a8

Please sign in to comment.