From bef52ef177c0b6427d57d57b79c08316f40317ec Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 15 Mar 2021 21:06:03 -0700 Subject: [PATCH] chore(performance): `file-source` testing and benchmarks (#6742) * Begin rework of `file-source` This commit is the start of a process to address #6730. The major changes introduced in this commit so far are application of clippy suggestions and model checks of the `read_until_with_max_size` test. I have a pretty good idea of how that function works now and will be introducing benchmarks. Signed-off-by: Brian L. Troutwine * Clippy fixes Signed-off-by: Brian L. Troutwine * Introduce benchmark In this commit I have introduce a benchmark for the read_until etc function. To do this I've had to make it part of the public API of the crate, but since the crate sits inside a larger project I'm less chuffed about this. I have fiddled with the test layout some as well. Signed-off-by: Brian L. Troutwine * end Cargo.toml in newline Signed-off-by: Brian L. Troutwine * Slim down the feature list slightly, relax bounds Signed-off-by: Brian L. Troutwine * Reverse 'bytes' upgrade This crate looks to be challenging to upgrade. Best to do once tokio is updated in this project. Signed-off-by: Brian L. Troutwine * Use libc on Windows Signed-off-by: Brian L. Troutwine * remove unused file Signed-off-by: Brian L. Troutwine --- Cargo.lock | 2 +- lib/file-source/Cargo.toml | 77 +++- lib/file-source/benches/buffer.rs | 76 ++++ lib/file-source/src/buffer.rs | 231 ++++++++++ lib/file-source/src/checkpointer.rs | 93 ++-- lib/file-source/src/file_server.rs | 17 +- .../{file_watcher.rs => file_watcher/mod.rs} | 158 +------ .../src/file_watcher/tests/experiment.rs | 133 ++++++ .../tests/experiment_no_truncations.rs | 99 +++++ lib/file-source/src/file_watcher/tests/mod.rs | 189 ++++++++ lib/file-source/src/fingerprinter.rs | 10 +- lib/file-source/src/lib.rs | 418 +----------------- 12 files changed, 874 insertions(+), 629 deletions(-) create mode 100644 lib/file-source/benches/buffer.rs create mode 100644 lib/file-source/src/buffer.rs rename lib/file-source/src/{file_watcher.rs => file_watcher/mod.rs} (62%) create mode 100644 lib/file-source/src/file_watcher/tests/experiment.rs create mode 100644 lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs create mode 100644 lib/file-source/src/file_watcher/tests/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 4b3930fe0a6ea..26781b34be898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2166,12 +2166,12 @@ dependencies = [ "bytes 0.5.6", "chrono", "crc", + "criterion", "dashmap 4.0.2", "flate2", "futures 0.3.13", "glob 0.3.0", "indexmap", - "libc", "quickcheck", "scan_fmt", "serde", diff --git a/lib/file-source/Cargo.toml b/lib/file-source/Cargo.toml index f0ffc95a9bcf6..0880cf40454ce 100644 --- a/lib/file-source/Cargo.toml +++ b/lib/file-source/Cargo.toml @@ -6,24 +6,75 @@ edition = "2018" publish = false license = "MIT" +[target.'cfg(windows)'.dependencies] +libc = "0.2" +winapi = { version = "0.3", features = ["winioctl"] } + [dependencies] -bstr = "0.2" -bytes = "0.5" -chrono = { version = "0.4.19", features = ["serde"] } crc = "1.8.1" -dashmap = "4.0.2" -flate2 = "1.0.19" -futures = { version = "0.3", default-features = false, features = ["executor"] } glob = "0.3.0" -indexmap = {version = "1.6.2", features = ["serde"]} -libc = "0.2" scan_fmt = "0.2.6" -serde = { version = "1.0.117", features = ["derive"] } -serde_json = "1.0.33" -tokio = { version = "0.2.13", features = ["rt-core", "blocking", "time"] } -tracing = "0.1.15" -winapi = { version = "0.3", features = ["winioctl"] } + +[dependencies.bstr] +version = "0.2" +default-features = false +features = [] + +[dependencies.bytes] +version = "0.5" +default-features = false +features = [] + +[dependencies.chrono] +version = "0.4" +default-features = false +features = ["clock", "serde"] + +[dependencies.dashmap] +version = "4.0" +default-features = false +features = [] + +[dependencies.indexmap] +version = "1.6" +default-features = false +features = ["serde"] + +[dependencies.flate2] +version = "1.0" +default-features = false +features = ["rust_backend"] + +[dependencies.futures] +version = "0.3" +default-features = false +features = ["executor"] + +[dependencies.serde] +version = "1.0" +default-features = false +features = ["derive"] + +[dependencies.serde_json] +version = "1.0" +default-features = false +features = [] + +[dependencies.tracing] +version = "0.1" +default-features = false +features = [] + +[dependencies.tokio] +version = "0.2" +default-features = false +features = ["rt-core", "blocking", "time"] [dev-dependencies] +criterion = "0.3" quickcheck = "1" tempfile = "3.1.0" + +[[bench]] +name = "buffer" +harness = false diff --git a/lib/file-source/benches/buffer.rs b/lib/file-source/benches/buffer.rs new file mode 100644 index 0000000000000..c0895db1ab977 --- /dev/null +++ b/lib/file-source/benches/buffer.rs @@ -0,0 +1,76 @@ +use bytes::BytesMut; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use file_source::buffer::read_until_with_max_size; +use std::fmt; +use std::io::Cursor; + +struct Parameters { + bytes: Vec, + delim_offsets: Vec, + delim: u8, + bytes_before_first_delim: usize, + max_size: u8, +} + +impl fmt::Display for Parameters { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "bytes_before_first_delim: {}", + self.bytes_before_first_delim, + ) + } +} + +fn read_until_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("read_until"); + + let mut parameters = vec![ + Parameters { + bytes: vec![0; 1024], + delim_offsets: vec![100, 500, 502], + delim: 1, + bytes_before_first_delim: 501, + max_size: 1, + }, + Parameters { + bytes: vec![0; 1024], + delim_offsets: vec![900, 999, 1004, 1021, 1023], + delim: 1, + bytes_before_first_delim: 1022, + max_size: 1, + }, + ]; + + for param in &mut parameters { + for offset in ¶m.delim_offsets { + param.bytes[*offset] = param.delim; + } + } + + for param in ¶meters { + group.throughput(Throughput::Bytes(param.bytes_before_first_delim as u64)); + + let mut position = 0; + let mut buffer = BytesMut::with_capacity(param.max_size as usize); + let mut reader = Cursor::new(¶m.bytes); + let delimiter: [u8; 1] = [param.delim]; + group.bench_with_input(BenchmarkId::from_parameter(¶m), ¶m, |b, _| { + b.iter(|| { + let _ = read_until_with_max_size( + &mut reader, + &mut position, + &delimiter, + &mut buffer, + param.max_size as usize, + ); + reader.set_position(0); + }) + }); + } +} + +criterion_group!(name = benches; + config = Criterion::default(); + targets = read_until_bench); +criterion_main!(benches); diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs new file mode 100644 index 0000000000000..ab1edb2639e54 --- /dev/null +++ b/lib/file-source/src/buffer.rs @@ -0,0 +1,231 @@ +use crate::FilePosition; +use bstr::Finder; +use bytes::BytesMut; +use std::io::{self, BufRead}; +use tracing::warn; + +/// Read up to `max_size` bytes from `reader`, splitting by `delim` +/// +/// The function reads up to `max_size` bytes from `reader`, splitting the input +/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes +/// are read then the reader is polled until `delim` is found and the results +/// are discarded. Else, the result is written into `buf`. +/// +/// The return is unusual. In the Err case this function has not written into +/// `buf` and the caller should not examine its contents. In the Ok case if the +/// inner value is None the caller should retry the call as the buffering read +/// hit the end of the buffer but did not find a `delim` yet, indicating that +/// we've sheered a write or that there were no bytes available in the `reader` +/// and the `reader` was very sure about it. If the inner value is Some the +/// interior `usize` is the number of bytes written into `buf`. +/// +/// Tweak of +/// https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471 +/// +/// # Performance +/// +/// Benchmarks indicate that this function processes in the high single-digit +/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this +/// the overhead of setup dominates our benchmarks. +pub fn read_until_with_max_size( + reader: &mut R, + position: &mut FilePosition, + delim: &[u8], + buf: &mut BytesMut, + max_size: usize, +) -> io::Result> { + let mut total_read = 0; + let mut discarding = false; + let delim_finder = Finder::new(delim); + let delim_len = delim.len(); + loop { + let available: &[u8] = match reader.fill_buf() { + Ok(n) => n, + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + + let (done, used) = { + match delim_finder.find(available) { + Some(i) => { + if !discarding { + buf.extend_from_slice(&available[..i]); + } + (true, i + delim_len) + } + None => { + if !discarding { + buf.extend_from_slice(available); + } + (false, available.len()) + } + } + }; + reader.consume(used); + *position += used as u64; // do this at exactly same time + total_read += used; + + if !discarding && buf.len() > max_size { + warn!( + message = "Found line that exceeds max_line_bytes; discarding.", + internal_log_rate_secs = 30 + ); + discarding = true; + } + + if done { + if !discarding { + return Ok(Some(total_read)); + } else { + discarding = false; + buf.clear(); + } + } else if used == 0 { + // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes + // us to observe an incomplete write. We return None here and let the loop continue + // next time the method is called. This is safe because the buffer is specific to this + // FileWatcher. + return Ok(None); + } + } +} + +#[cfg(test)] +mod test { + use super::read_until_with_max_size; + use bytes::{BufMut, BytesMut}; + use quickcheck::{QuickCheck, TestResult}; + use std::io::Cursor; + use std::num::NonZeroU8; + use std::ops::Range; + + fn qc_inner(chunks: Vec>, delim: u8, max_size: NonZeroU8) -> TestResult { + // The `global_data` is the view of `chunks` as a single contiguous + // block of memory. Where `chunks` simulates what happens when bytes are + // fitfully available `global_data` is the view of all chunks assembled + // after every byte is available. + let mut global_data = BytesMut::new(); + + // `DelimDetails` describes the nature of each delimiter found in the + // `chunks`. + #[derive(Clone)] + struct DelimDetails { + /// Index in `global_data`, absolute offset + global_index: usize, + /// Index in each `chunk`, relative offset + interior_index: usize, + /// Whether this delimiter was within `max_size` of its previous + /// peer + within_max_size: bool, + /// Which chunk in `chunks` this delimiter was found in + chunk_index: usize, + /// The range of bytes that this delimiter bounds with its previous + /// peer + byte_range: Range, + } + + // Move through the `chunks` and discover at what positions an instance + // of `delim` exists in the chunk stream and whether that `delim` is + // more than `max_size` bytes away from the previous `delim`. This loop + // constructs the `facts` vector that holds `DelimDetails` instances and + // also populates `global_data`. + let mut facts: Vec = Vec::new(); + let mut global_index: usize = 0; + let mut previous_offset: usize = 0; + for (i, chunk) in chunks.iter().enumerate() { + for (interior_index, byte) in chunk.iter().enumerate() { + global_data.put_u8(*byte); + if *byte == delim { + let span = global_index - previous_offset; + let within_max_size = span <= max_size.get() as usize; + facts.push(DelimDetails { + global_index, + within_max_size, + chunk_index: i, + interior_index, + byte_range: (previous_offset..global_index), + }); + previous_offset = global_index + 1; + } + global_index += 1; + } + } + + // Our model is only concerned with the first valid delimiter in the + // chunk stream. As such, discover that first valid delimiter and record + // it specially. + let mut first_delim: Option = None; + for fact in &facts { + if fact.within_max_size { + first_delim = Some(fact.clone()); + break; + } + } + + let mut position = 0; + let mut buffer = BytesMut::with_capacity(max_size.get() as usize); + // NOTE: The delimiter may be multiple bytes wide but for the purpose of + // this model a single byte does well enough. + let delimiter: [u8; 1] = [delim]; + for (idx, chunk) in chunks.iter().enumerate() { + let mut reader = Cursor::new(&chunk); + + match read_until_with_max_size( + &mut reader, + &mut position, + &delimiter, + &mut buffer, + max_size.get() as usize, + ) + .unwrap() + { + None => { + // Subject only returns None if this is the last chunk _and_ + // the chunk did not contain a delimiter _or_ the delimiter + // was outside the max_size range _or_ the current chunk is empty. + let has_valid_delimiter = facts + .iter() + .any(|details| ((details.chunk_index == idx) && details.within_max_size)); + assert!(chunk.is_empty() || !has_valid_delimiter) + } + Some(total_read) => { + // Now that the function has returned we confirm that the + // returned details match our `first_delim` and also that + // the `buffer` is populated correctly. + assert!(first_delim.is_some()); + assert_eq!( + first_delim.clone().unwrap().global_index + 1, + position as usize + ); + assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read); + assert_eq!( + buffer.get(..), + global_data.get(first_delim.unwrap().byte_range) + ); + break; + } + } + } + + TestResult::passed() + } + + #[test] + fn qc_read_until_with_max_size() { + // The `read_until_with_max` function is intended to be called + // multiple times until it returns Ok(Some(usize)). The function + // should never return error in this test. If the return is None we + // will call until it is not. Once return is Some the interior value + // should be the position of the first delim in the buffer, unless + // that delim is past the max_size barrier in which case it will be + // the position of the first delim that is within some multiple of + // max_size. + // + // I think I will adjust the function to have a richer return + // type. This will help in the transition to async. + QuickCheck::new() + .tests(1_000) + .max_tests(2_000) + .quickcheck(qc_inner as fn(Vec>, u8, NonZeroU8) -> TestResult); + } +} diff --git a/lib/file-source/src/checkpointer.rs b/lib/file-source/src/checkpointer.rs index 640023047b85b..aede8b30bca44 100644 --- a/lib/file-source/src/checkpointer.rs +++ b/lib/file-source/src/checkpointer.rs @@ -8,13 +8,15 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; +use tracing::{error, info, warn}; const TMP_FILE_NAME: &str = "checkpoints.new.json"; const STABLE_FILE_NAME: &str = "checkpoints.json"; -/// This enum represents the file format of checkpoints persisted to disk. Right now there is only -/// one variant, but any incompatible changes will require and additional variant to be added here -/// and handled anywhere that we transit this format. +/// This enum represents the file format of checkpoints persisted to disk. Right +/// now there is only one variant, but any incompatible changes will require and +/// additional variant to be added here and handled anywhere that we transit +/// this format. #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "version", rename_all = "snake_case")] enum State { @@ -22,8 +24,8 @@ enum State { V1 { checkpoints: Vec }, } -/// A simple JSON-friendly struct of the fingerprint/position pair, since fingerprints as objects -/// cannot be keys in a plain JSON map. +/// A simple JSON-friendly struct of the fingerprint/position pair, since +/// fingerprints as objects cannot be keys in a plain JSON map. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] struct Checkpoint { @@ -40,7 +42,8 @@ pub struct Checkpointer { checkpoints: Arc, } -/// A thread-safe handle for reading and writing checkpoints in-memory across multiple threads. +/// A thread-safe handle for reading and writing checkpoints in-memory across +/// multiple threads. #[derive(Debug, Default)] pub struct CheckpointsView { checkpoints: DashMap, @@ -86,8 +89,9 @@ impl CheckpointsView { pub fn remove_expired(&self) { let now = Utc::now(); - // Collect all of the expired keys. Removing them while iterating can lead to deadlocks, - // the set should be small, and this is not a performance-sensitive path. + // Collect all of the expired keys. Removing them while iterating can + // lead to deadlocks, the set should be small, and this is not a + // performance-sensitive path. let to_remove = self .removed_times .iter() @@ -184,9 +188,9 @@ impl Checkpointer { /// Encode a fingerprint to a file name, including legacy Unknown values /// - /// For each of the non-legacy variants, prepend an identifier byte that falls outside of the - /// hex range used by the legacy implementation. This allows them to be differentiated by - /// simply peeking at the first byte. + /// For each of the non-legacy variants, prepend an identifier byte that + /// falls outside of the hex range used by the legacy implementation. This + /// allows them to be differentiated by simply peeking at the first byte. #[cfg(test)] fn encode(&self, fng: FileFingerprint, pos: FilePosition) -> PathBuf { use FileFingerprint::*; @@ -203,9 +207,10 @@ impl Checkpointer { /// Decode a fingerprint from a file name, accounting for unknowns due to the legacy /// implementation. /// - /// The trick here is to rely on the hex encoding of the legacy format. Because hex encoding - /// only allows [0-9a-f], we can use any character outside of that range as a magic byte - /// identifier for the newer formats. + /// The trick here is to rely on the hex encoding of the legacy + /// format. Because hex encoding only allows [0-9a-f], we can use any + /// character outside of that range as a magic byte identifier for the newer + /// formats. fn decode(&self, path: &Path) -> (FileFingerprint, FilePosition) { use FileFingerprint::*; @@ -242,37 +247,43 @@ impl Checkpointer { self.checkpoints.get(fng) } - /// Scan through a given list of fresh fingerprints (i.e. not legacy Unknown) to see if any - /// match an existing legacy fingerprint. If so, upgrade the existing fingerprint. + /// Scan through a given list of fresh fingerprints (i.e. not legacy + /// Unknown) to see if any match an existing legacy fingerprint. If so, + /// upgrade the existing fingerprint. pub fn maybe_upgrade(&mut self, fresh: impl Iterator) { self.checkpoints.maybe_upgrade(fresh) } - /// Persist the current checkpoints state to disk, making our best effort to do so in an atomic - /// way that allow for recovering the previous state in the event of a crash. + /// Persist the current checkpoints state to disk, making our best effort to + /// do so in an atomic way that allow for recovering the previous state in + /// the event of a crash. pub fn write_checkpoints(&self) -> Result { - // First drop any checkpoints for files that were removed more than 60 seconds ago. This - // keeps our working set as small as possible and makes sure we don't spend time and IO - // writing checkpoints that don't matter anymore. + // First drop any checkpoints for files that were removed more than 60 + // seconds ago. This keeps our working set as small as possible and + // makes sure we don't spend time and IO writing checkpoints that don't + // matter anymore. self.checkpoints.remove_expired(); - // Write the new checkpoints to a tmp file and flush it fully to disk. If vector - // dies anywhere during this section, the existing stable file will still be in its current - // valid state and we'll be able to recover. + // Write the new checkpoints to a tmp file and flush it fully to + // disk. If vector dies anywhere during this section, the existing + // stable file will still be in its current valid state and we'll be + // able to recover. let mut f = io::BufWriter::new(fs::File::create(&self.tmp_file_path)?); serde_json::to_writer(&mut f, &self.checkpoints.get_state())?; f.into_inner()?.sync_all()?; - // Once the temp file is fully flushed, rename the tmp file to replace the previous stable - // file. This is an atomic operation on POSIX systems (and the stdlib claims to provide - // equivalent behavior on Windows), which should prevent scenarios where we don't have at - // least one full valid file to recover from. + // Once the temp file is fully flushed, rename the tmp file to replace + // the previous stable file. This is an atomic operation on POSIX + // systems (and the stdlib claims to provide equivalent behavior on + // Windows), which should prevent scenarios where we don't have at least + // one full valid file to recover from. fs::rename(&self.tmp_file_path, &self.stable_file_path)?; Ok(self.checkpoints.checkpoints.len()) } - /// Write checkpoints to disk in the legacy format. Used for compatibility testing only. + /// Write checkpoints to disk in the legacy format. Used for compatibility + /// testing only. #[cfg(test)] pub fn write_legacy_checkpoints(&mut self) -> Result { fs::remove_dir_all(&self.directory).ok(); @@ -283,19 +294,21 @@ impl Checkpointer { Ok(self.checkpoints.checkpoints.len()) } - /// Read persisted checkpoints from disk, preferring the new JSON file format but falling back - /// to the legacy system when those files are found instead. + /// Read persisted checkpoints from disk, preferring the new JSON file + /// format but falling back to the legacy system when those files are found + /// instead. pub fn read_checkpoints(&mut self, ignore_before: Option>) { - // First try reading from the tmp file location. If this works, it means that the previous - // process was interrupted in the process of checkpointing and the tmp file should contain - // more recent data that should be preferred. + // First try reading from the tmp file location. If this works, it means + // that the previous process was interrupted in the process of + // checkpointing and the tmp file should contain more recent data that + // should be preferred. match self.read_checkpoints_file(&self.tmp_file_path) { Ok(state) => { warn!(message = "Recovered checkpoint data from interrupted process."); self.checkpoints.set_state(state, ignore_before); - // Try to move this tmp file to the stable location so we don't immediately overwrite - // it when we next persist checkpoints. + // Try to move this tmp file to the stable location so we don't + // immediately overwrite it when we next persist checkpoints. if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path) { warn!(message = "Error persisting recovered checkpoint file.", %error); } @@ -309,8 +322,9 @@ impl Checkpointer { } } - // Next, attempt to read checkpoints from the stable file location. This is the - // expected location, so warn more aggressively if something goes wrong. + // Next, attempt to read checkpoints from the stable file location. This + // is the expected location, so warn more aggressively if something goes + // wrong. match self.read_checkpoints_file(&self.stable_file_path) { Ok(state) => { info!(message = "Loaded checkpoint data."); @@ -326,7 +340,8 @@ impl Checkpointer { } } - // If we haven't returned yet, go ahead and look for the legacy files and try to read them. + // If we haven't returned yet, go ahead and look for the legacy files + // and try to read them. info!("Attempting to read legacy checkpoint files."); self.read_legacy_checkpoints(ignore_before); diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 434cc99d896b9..062eb8f4b80bf 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -1,3 +1,4 @@ +use crate::paths_provider::PathsProvider; use crate::{ checkpointer::{Checkpointer, CheckpointsView}, file_watcher::FileWatcher, @@ -13,6 +14,7 @@ use futures::{ }; use indexmap::IndexMap; use std::{ + cmp, collections::{BTreeMap, HashSet}, fs::{self, remove_file}, path::PathBuf, @@ -20,8 +22,7 @@ use std::{ time::{self, Duration}, }; use tokio::time::delay_for; - -use crate::paths_provider::PathsProvider; +use tracing::{debug, error, info, trace}; /// `FileServer` is a Source which cooperatively schedules reads over files, /// converting the lines of said files into `LogLine` structures. As @@ -136,8 +137,9 @@ where // Spawn the checkpoint writer task // - // We have to do a lot of cloning here to convince the compiler that we aren't going to get - // away with anything, but none of it should have any perf impact. + // We have to do a lot of cloning here to convince the compiler that we + // aren't going to get away with anything, but none of it should have + // any perf impact. let mut shutdown = shutdown.shared(); let shutdown2 = shutdown.clone(); let emitter = self.emitter.clone(); @@ -354,12 +356,7 @@ where // minimum on the assumption that next time through there will be // more lines to read promptly. if global_bytes_read == 0 { - let lim = backoff_cap.saturating_mul(2); - if lim > 2_048 { - backoff_cap = 2_048; - } else { - backoff_cap = lim; - } + backoff_cap = cmp::min(2_048, backoff_cap.saturating_mul(2)); } else { backoff_cap = 1; } diff --git a/lib/file-source/src/file_watcher.rs b/lib/file-source/src/file_watcher/mod.rs similarity index 62% rename from lib/file-source/src/file_watcher.rs rename to lib/file-source/src/file_watcher/mod.rs index 3f07d216df4eb..91bc909a04f7a 100644 --- a/lib/file-source/src/file_watcher.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -1,5 +1,6 @@ +use crate::buffer::read_until_with_max_size; +use crate::metadata_ext::PortableFileExt; use crate::{FilePosition, ReadFrom}; -use bstr::Finder; use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use flate2::bufread::MultiGzDecoder; @@ -9,8 +10,9 @@ use std::{ path::PathBuf, time::{Duration, Instant}, }; - -use crate::metadata_ext::PortableFileExt; +use tracing::debug; +#[cfg(test)] +mod tests; /// The `FileWatcher` struct defines the polling based state machine which reads /// from a file path, transparently updating the underlying file descriptor when @@ -220,18 +222,22 @@ impl FileWatcher { } } + #[inline] fn track_read_attempt(&mut self) { self.last_read_attempt = Instant::now(); } + #[inline] fn track_read_success(&mut self) { self.last_read_success = Instant::now(); } + #[inline] pub fn last_read_success(&self) -> Instant { self.last_read_success } + #[inline] pub fn should_read(&self) -> bool { self.last_read_success.elapsed() < Duration::from_secs(10) || self.last_read_attempt.elapsed() > Duration::from_secs(10) @@ -240,153 +246,11 @@ impl FileWatcher { fn is_gzipped(r: &mut io::BufReader) -> io::Result { let header_bytes = r.fill_buf()?; + // WARN: The paired `BufReader::consume` is not called intentionally. If we + // do we'll chop a decent part of the potential gzip stream off. Ok(header_bytes.starts_with(&[0x1f, 0x8b])) } fn null_reader() -> impl BufRead { io::Cursor::new(Vec::new()) } - -// Tweak of https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471 -// After more than max_size bytes are read as part of a single line, this discard the remaining bytes -// in that line, and then starts again on the next line. -fn read_until_with_max_size( - r: &mut R, - p: &mut FilePosition, - delim: &[u8], - buf: &mut BytesMut, - max_size: usize, -) -> io::Result> { - let mut total_read = 0; - let mut discarding = false; - let delim_finder = Finder::new(delim); - let delim_len = delim.len(); - loop { - let available = match r.fill_buf() { - Ok(n) => n, - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, - Err(e) => return Err(e), - }; - - let (done, used) = { - match delim_finder.find(available) { - Some(i) => { - if !discarding { - buf.extend_from_slice(&available[..i]); - } - (true, i + delim_len) - } - None => { - if !discarding { - buf.extend_from_slice(available); - } - (false, available.len()) - } - } - }; - r.consume(used); - *p += used as u64; // do this at exactly same time - total_read += used; - - if !discarding && buf.len() > max_size { - warn!( - message = "Found line that exceeds max_line_bytes; discarding.", - internal_log_rate_secs = 30 - ); - discarding = true; - } - - if done { - if !discarding { - return Ok(Some(total_read)); - } else { - discarding = false; - buf.clear(); - } - } else if used == 0 { - // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes - // us to observe an incomplete write. We return None here and let the loop continue - // next time the method is called. This is safe because the buffer is specific to this - // FileWatcher. - return Ok(None); - } - } -} - -#[cfg(test)] -mod test { - use super::read_until_with_max_size; - use bytes::BytesMut; - use std::io::Cursor; - - #[test] - fn test_read_until_with_max_size() { - let mut buf = Cursor::new(&b"12"[..]); - let mut pos = 0; - let mut v = BytesMut::new(); - let p = read_until_with_max_size(&mut buf, &mut pos, b"3", &mut v, 1000).unwrap(); - assert_eq!(pos, 2); - assert_eq!(p, None); - assert_eq!(&*v, b"12"); - let mut buf = Cursor::new(&b"34"[..]); - let p = read_until_with_max_size(&mut buf, &mut pos, b"3", &mut v, 1000).unwrap(); - assert_eq!(pos, 3); - assert_eq!(p, Some(1)); - assert_eq!(&*v, b"12"); - - let mut buf = Cursor::new(&b"1233"[..]); - let mut pos = 0; - let mut v = BytesMut::new(); - let p = read_until_with_max_size(&mut buf, &mut pos, b"3", &mut v, 1000).unwrap(); - assert_eq!(pos, 3); - assert_eq!(p, Some(3)); - assert_eq!(&*v, b"12"); - v.truncate(0); - let p = read_until_with_max_size(&mut buf, &mut pos, b"3", &mut v, 1000).unwrap(); - assert_eq!(pos, 4); - assert_eq!(p, Some(1)); - assert_eq!(&*v, b""); - v.truncate(0); - let p = read_until_with_max_size(&mut buf, &mut pos, b"3", &mut v, 1000).unwrap(); - assert_eq!(pos, 4); - assert_eq!(p, None); - assert_eq!(&*v, [0; 0]); - - let mut buf = Cursor::new(&b"short\nthis is too long\nexact size\n11 eleven11\n"[..]); - let mut pos = 0; - let mut v = BytesMut::new(); - let p = read_until_with_max_size(&mut buf, &mut pos, b"\n", &mut v, 10).unwrap(); - assert_eq!(pos, 6); - assert_eq!(p, Some(6)); - assert_eq!(&*v, b"short"); - v.truncate(0); - let p = read_until_with_max_size(&mut buf, &mut pos, b"\n", &mut v, 10).unwrap(); - assert_eq!(pos, 34); - assert_eq!(p, Some(28)); - assert_eq!(&*v, b"exact size"); - v.truncate(0); - let p = read_until_with_max_size(&mut buf, &mut pos, b"\n", &mut v, 10).unwrap(); - assert_eq!(pos, 46); - assert_eq!(p, None); - assert_eq!(&*v, [0; 0]); - - let mut buf = - Cursor::new(&b"short\r\nthis is too long\r\nexact size\r\n11 eleven11\r\n"[..]); - let mut pos = 0; - let mut v = BytesMut::new(); - let p = read_until_with_max_size(&mut buf, &mut pos, b"\r\n", &mut v, 10).unwrap(); - assert_eq!(pos, 7); - assert_eq!(p, Some(7)); - assert_eq!(&*v, b"short"); - v.truncate(0); - let p = read_until_with_max_size(&mut buf, &mut pos, b"\r\n", &mut v, 10).unwrap(); - assert_eq!(pos, 37); - assert_eq!(p, Some(30)); - assert_eq!(&*v, b"exact size"); - v.truncate(0); - let p = read_until_with_max_size(&mut buf, &mut pos, b"\r\n", &mut v, 10).unwrap(); - assert_eq!(pos, 50); - assert_eq!(p, None); - assert_eq!(&*v, [0; 0]); - } -} diff --git a/lib/file-source/src/file_watcher/tests/experiment.rs b/lib/file-source/src/file_watcher/tests/experiment.rs new file mode 100644 index 0000000000000..3d0c3c8012397 --- /dev/null +++ b/lib/file-source/src/file_watcher/tests/experiment.rs @@ -0,0 +1,133 @@ +use crate::file_watcher::tests::*; +use crate::file_watcher::FileWatcher; +use crate::ReadFrom; +use bytes::Bytes; +use quickcheck::{QuickCheck, TestResult}; +use std::fs; +use std::io::Write; +#[cfg(unix)] +use std::os::unix::fs::MetadataExt; +#[cfg(windows)] +use std::os::windows::fs::MetadataExt; + +// Interpret all FWActions, including truncation +// +// In the presence of truncation we cannot accurately determine which writes +// will eventually be read. This is because of the presence of buffered +// reading in file_watcher which pulls an unknown amount from the underlying +// disk. This is _good_ in the sense that we reduce the total number of file +// system reads and potentially retain data that would otherwise be lost +// during a truncation but is bad on account of we cannot guarantee _which_ +// writes are lost in the presence of truncation. +// +// What we can do, though, is drive our FWFile model and the SUT at the same +// time, recording the total number of reads/writes. The SUT reads should be +// bounded below by the model reads, bounded above by the writes. +fn experiment(actions: Vec) { + let dir = tempfile::TempDir::new().expect("could not create tempdir"); + let path = dir.path().join("a_file.log"); + let mut fp = fs::File::create(&path).expect("could not create"); + let mut rotation_count = 0; + let mut fw = FileWatcher::new( + path.clone(), + ReadFrom::Beginning, + None, + 100_000, + Bytes::from("\n"), + ) + .expect("must be able to create"); + + let mut writes = 0; + let mut sut_reads = 0; + let mut model_reads = 0; + + let mut fwfiles: Vec = vec![]; + fwfiles.push(FWFile::new()); + let mut read_index = 0; + for action in actions.iter() { + match *action { + FWAction::DeleteFile => { + let _ = fs::remove_file(&path); + assert!(!path.exists()); + fwfiles[0].reset(); + break; + } + FWAction::TruncateFile => { + fwfiles[0].truncate(); + fp = fs::OpenOptions::new() + .read(true) + .write(true) + .truncate(true) + .open(&path) + .expect("could not truncate"); + #[cfg(unix)] + assert_eq!(fp.metadata().expect("could not get metadata").size(), 0); + #[cfg(windows)] + assert_eq!( + fp.metadata().expect("could not get metadata").file_size(), + 0 + ); + assert!(path.exists()); + } + FWAction::Pause(ps) => delay(ps), + FWAction::Exit => break, + FWAction::WriteLine(ref s) => { + fwfiles[0].write_line(s); + assert!(fp.write(s.as_bytes()).is_ok()); + assert!(fp.write(b"\n").is_ok()); + assert!(fp.flush().is_ok()); + writes += 1; + } + FWAction::RotateFile => { + let mut new_path = path.clone(); + new_path.set_extension(format!("log.{}", rotation_count)); + rotation_count += 1; + fs::rename(&path, &new_path).expect("could not rename"); + fp = fs::File::create(&path).expect("could not create"); + fwfiles.insert(0, FWFile::new()); + read_index += 1; + } + FWAction::Read => { + let mut attempts = 10; + while attempts > 0 { + match fw.read_line() { + Err(_) => { + unreachable!(); + } + Ok(Some(line)) if line.is_empty() => { + attempts -= 1; + continue; + } + Ok(None) => { + attempts -= 1; + continue; + } + Ok(_) => { + sut_reads += 1; + let psv = fwfiles[read_index].read_line(); + if psv.is_some() { + model_reads += 1; + break; + } + break; + } + } + } + } + } + } + assert!(writes >= sut_reads); + assert!(sut_reads >= model_reads); +} + +#[test] +fn file_watcher_with_truncation() { + fn inner(actions: Vec) -> TestResult { + experiment(actions); + TestResult::passed() + } + QuickCheck::new() + .tests(10000) + .max_tests(100000) + .quickcheck(inner as fn(Vec) -> TestResult); +} diff --git a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs new file mode 100644 index 0000000000000..d5a801c317a63 --- /dev/null +++ b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs @@ -0,0 +1,99 @@ +use crate::file_watcher::tests::*; +use crate::file_watcher::FileWatcher; +use crate::ReadFrom; +use bytes::Bytes; +use quickcheck::{QuickCheck, TestResult}; +use std::fs; +use std::io::Write; +#[cfg(windows)] +use std::os::windows::fs::MetadataExt; + +// Interpret all FWActions, excluding truncation +// +// This interpretation is the happy case. When there are no truncations our +// model and SUT should agree exactly. To that end, we confirm that every +// read from SUT exactly matches the reads from the model. +fn experiment_no_truncations(actions: Vec) { + let dir = tempfile::TempDir::new().expect("could not create tempdir"); + let path = dir.path().join("a_file.log"); + let mut fp = fs::File::create(&path).expect("could not create"); + let mut rotation_count = 0; + let mut fw = FileWatcher::new( + path.clone(), + ReadFrom::Beginning, + None, + 100_000, + Bytes::from("\n"), + ) + .expect("must be able to create"); + + let mut fwfiles: Vec = vec![]; + fwfiles.push(FWFile::new()); + let mut read_index = 0; + for action in actions.iter() { + match *action { + FWAction::DeleteFile => { + let _ = fs::remove_file(&path); + assert!(!path.exists()); + fwfiles[0].reset(); + break; + } + FWAction::TruncateFile => {} + FWAction::Pause(ps) => delay(ps), + FWAction::Exit => break, + FWAction::WriteLine(ref s) => { + fwfiles[0].write_line(s); + assert!(fp.write(s.as_bytes()).is_ok()); + assert!(fp.write(b"\n").is_ok()); + assert!(fp.flush().is_ok()); + } + FWAction::RotateFile => { + let mut new_path = path.clone(); + new_path.set_extension(format!("log.{}", rotation_count)); + rotation_count += 1; + fs::rename(&path, &new_path).expect("could not rename"); + fp = fs::File::create(&path).expect("could not create"); + fwfiles.insert(0, FWFile::new()); + read_index += 1; + } + FWAction::Read => { + let mut attempts = 10; + while attempts > 0 { + match fw.read_line() { + Err(_) => { + unreachable!(); + } + Ok(Some(line)) if line.is_empty() => { + attempts -= 1; + assert!(fwfiles[read_index].read_line().is_none()); + continue; + } + Ok(None) => { + attempts -= 1; + assert!(fwfiles[read_index].read_line().is_none()); + continue; + } + Ok(Some(line)) => { + let exp = fwfiles[read_index].read_line().expect("could not readline"); + assert_eq!(exp.into_bytes(), line); + // assert_eq!(sz, buf.len() + 1); + break; + } + } + } + } + } + } +} + +#[test] +fn file_watcher_no_truncation() { + fn inner(actions: Vec) -> TestResult { + experiment_no_truncations(actions); + TestResult::passed() + } + QuickCheck::new() + .tests(10000) + .max_tests(100000) + .quickcheck(inner as fn(Vec) -> TestResult); +} diff --git a/lib/file-source/src/file_watcher/tests/mod.rs b/lib/file-source/src/file_watcher/tests/mod.rs new file mode 100644 index 0000000000000..aae5b8563bd0f --- /dev/null +++ b/lib/file-source/src/file_watcher/tests/mod.rs @@ -0,0 +1,189 @@ +mod experiment; +mod experiment_no_truncations; + +use quickcheck::{Arbitrary, Gen}; +use std::str; + +// Welcome. +// +// This suite of tests is structured as an interpreter of file system +// actions. You'll find two interpreters here, `experiment` and +// `experiment_no_truncations`. These differ in one key respect: the later +// does not interpret the 'truncation' instruction. +// +// What do I mean by all this? Well, what we're trying to do is validate the +// behaviour of the file_watcher in the presence of arbitrary file-system +// actions. These actions we call `FWAction`. +#[derive(Clone, Debug)] +pub enum FWAction { + WriteLine(String), + RotateFile, + DeleteFile, + TruncateFile, + Read, + Pause(u32), + Exit, +} +// WriteLine writes an arbitrary line of text -- plus newline -- RotateFile +// rotates the file as a log rotator might etc etc. Our interpreter +// functions take these instructions and apply them to the system under test +// (SUT), being a file_watcher pointed at a certain directory on-disk. In +// this way we can drive the behaviour of file_watcher. Validation requires +// a model, which we scattered between the interpreters -- as the model +// varies slightly in the presence of truncation vs. not -- and FWFile. +pub struct FWFile { + contents: Vec, + read_idx: usize, + previous_read_size: usize, + reads_available: usize, +} +// FWFile mimics an actual Unix file, at least for our purposes here. The +// operations available on FWFile have to do with reading and writing lines, +// truncation and resets, which mimic a delete/create cycle on the file +// system. The function `FWFile::read_line` is the most complex and you're +// warmly encouraged to read the documentation present there. +impl FWFile { + pub fn new() -> FWFile { + FWFile { + contents: vec![], + read_idx: 0, + previous_read_size: 0, + reads_available: 0, + } + } + + pub fn reset(&mut self) { + self.contents.truncate(0); + self.read_idx = 0; + self.previous_read_size = 0; + self.reads_available = 0; + } + + pub fn truncate(&mut self) { + self.reads_available = 0; + self.contents.truncate(0); + } + + pub fn write_line(&mut self, input: &str) { + self.contents.extend_from_slice(input.as_bytes()); + self.contents.push(b'\n'); + self.reads_available += 1; + } + + /// Read a line from storage, if a line is available to be read. + pub fn read_line(&mut self) -> Option { + // FWFile mimics a unix file being read in a buffered fashion, + // driven by file_watcher. We _have_ to keep on top of where the + // reader's read index -- called read_idx -- is between reads and + // the size of the file -- called previous_read_size -- in the event + // of truncation. + // + // If we detect in file_watcher that a truncation has happened then + // the buffered reader is seeked back to 0. This is performed in + // like kind when we reset read_idx to 0, as in the following case + // where there are no reads available. + if self.contents.is_empty() && self.reads_available == 0 { + self.read_idx = 0; + self.previous_read_size = 0; + return None; + } + // Now, the above is done only when nothing has been written to the + // FWFile or the contents have been totally removed. The trickier + // case is where there are maybe _some_ things to be read but the + // read_idx might be mis-set owing to truncations. + // + // `read_line` is performed in a line-wise fashion. start_idx + // and end_idx are pulled apart from one another to find the + // start and end of the line, if there's a line to be found. + let mut end_idx; + let start_idx; + // Here's where we do truncation detection. When our file has + // shrunk, restart the search at zero index. If the file is the + // same size -- implying that it's either not changed or was + // truncated and then filled back in before a read could occur + // -- we return None. Else, start searching at the present + // read_idx. + let max = self.contents.len(); + if self.previous_read_size > max { + self.read_idx = 0; + start_idx = 0; + end_idx = 0; + } else if self.read_idx == max { + return None; + } else { + start_idx = self.read_idx; + end_idx = self.read_idx; + } + // Seek end_idx forward until we hit the newline character. + while self.contents[end_idx] != b'\n' { + end_idx += 1; + if end_idx == max { + return None; + } + } + // Produce the read string -- minus its newline character -- and + // set the control variables appropriately. + let ret = str::from_utf8(&self.contents[start_idx..end_idx]).unwrap(); + self.read_idx = end_idx + 1; + self.reads_available -= 1; + self.previous_read_size = max; + // There's a trick here. What happens if we _only_ read a + // newline character. Well, that'll happen when truncations + // cause trimmed reads and the only remaining character in the + // line is the newline. Womp womp + if !ret.is_empty() { + Some(ret.to_string()) + } else { + None + } + } +} + +impl Arbitrary for FWAction { + fn arbitrary(g: &mut Gen) -> FWAction { + let i: usize = *g.choose(&(0..100).collect::>()).unwrap(); + match i { + // These weights are more or less arbitrary. 'Pause' maybe + // doesn't have a use but we keep it in place to allow for + // variations in file-system flushes. + 0..=50 => { + const GEN_ASCII_STR_CHARSET: &[u8] = + b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + let ln_sz = *g.choose(&(1..32).collect::>()).unwrap(); + FWAction::WriteLine( + std::iter::repeat_with(|| *g.choose(&GEN_ASCII_STR_CHARSET).unwrap()) + .take(ln_sz) + .map(|v| -> char { v.into() }) + .collect(), + ) + } + 51..=69 => FWAction::Read, + 70..=75 => { + let pause = *g.choose(&(1..3).collect::>()).unwrap(); + FWAction::Pause(pause) + } + 76..=85 => FWAction::RotateFile, + 86..=90 => FWAction::TruncateFile, + 91..=95 => FWAction::DeleteFile, + _ => FWAction::Exit, + } + } +} + +#[inline] +pub fn delay(attempts: u32) { + let delay = match attempts { + 0 => return, + 1 => 1, + 2 => 4, + 3 => 8, + 4 => 16, + 5 => 32, + 6 => 64, + 7 => 128, + 8 => 256, + _ => 512, + }; + let sleep_time = std::time::Duration::from_millis(delay as u64); + std::thread::sleep(sleep_time); +} diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index 92565ed25a3ba..664d404bea613 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -4,7 +4,7 @@ use std::{ collections::HashSet, fs::{self, metadata, File}, io::{self, Read, Seek, SeekFrom, Write}, - path::PathBuf, + path::{Path, PathBuf}, }; #[derive(Clone)] @@ -63,7 +63,7 @@ impl From for FileFingerprint { impl Fingerprinter { pub fn get_fingerprint_of_file( &self, - path: &PathBuf, + path: &Path, buffer: &mut Vec, ) -> Result { use FileFingerprint::*; @@ -94,7 +94,7 @@ impl Fingerprinter { pub fn get_fingerprint_or_log_error( &self, - path: &PathBuf, + path: &Path, buffer: &mut Vec, known_small_files: &mut HashSet, emitter: &impl FileSourceInternalEvents, @@ -111,7 +111,7 @@ impl Fingerprinter { io::ErrorKind::UnexpectedEof => { if !known_small_files.contains(path) { emitter.emit_file_checksum_failed(path); - known_small_files.insert(path.clone()); + known_small_files.insert(path.to_path_buf()); } } io::ErrorKind::NotFound => { @@ -129,7 +129,7 @@ impl Fingerprinter { pub fn get_bytes_checksum( &self, - path: &PathBuf, + path: &Path, buffer: &mut Vec, ) -> Result, io::Error> { match self.strategy { diff --git a/lib/file-source/src/lib.rs b/lib/file-source/src/lib.rs index e04ed367e889f..9bcbc882d0fb8 100644 --- a/lib/file-source/src/lib.rs +++ b/lib/file-source/src/lib.rs @@ -1,8 +1,9 @@ +#![deny(clippy::all)] + #[macro_use] extern crate scan_fmt; -#[macro_use] -extern crate tracing; +pub mod buffer; mod checkpointer; mod file_server; mod file_watcher; @@ -15,7 +16,7 @@ pub use self::file_server::{FileServer, Shutdown as FileServerShutdown}; pub use self::fingerprinter::{FingerprintStrategy, Fingerprinter}; pub use self::internal_events::FileSourceInternalEvents; -type FilePosition = u64; +pub type FilePosition = u64; #[derive(Copy, Clone, Debug, PartialEq)] pub enum ReadFrom { @@ -29,414 +30,3 @@ impl Default for ReadFrom { ReadFrom::Beginning } } - -#[cfg(test)] -mod test { - use self::file_watcher::FileWatcher; - use super::*; - use bytes::Bytes; - use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; - use std::fs; - use std::io::Write; - #[cfg(unix)] - use std::os::unix::fs::MetadataExt; - #[cfg(windows)] - use std::os::windows::fs::MetadataExt; - use std::str; - // Welcome. - // - // This suite of tests is structured as an interpreter of file system - // actions. You'll find two interpreters here, `experiment` and - // `experiment_no_truncations`. These differ in one key respect: the later - // does not interpret the 'truncation' instruction. - // - // What do I mean by all this? Well, what we're trying to do is validate the - // behaviour of the file_watcher in the presence of arbitrary file-system - // actions. These actions we call `FWAction`. - #[derive(Clone, Debug)] - enum FWAction { - WriteLine(String), - RotateFile, - DeleteFile, - TruncateFile, - Read, - Pause(u32), - Exit, - } - // WriteLine writes an arbitrary line of text -- plus newline -- RotateFile - // rotates the file as a log rotator might etc etc. Our interpreter - // functions take these instructions and apply them to the system under test - // (SUT), being a file_watcher pointed at a certain directory on-disk. In - // this way we can drive the behaviour of file_watcher. Validation requires - // a model, which we scattered between the interpreters -- as the model - // varies slightly in the presence of truncation vs. not -- and FWFile. - struct FWFile { - contents: Vec, - read_idx: usize, - previous_read_size: usize, - reads_available: usize, - } - // FWFile mimics an actual Unix file, at least for our purposes here. The - // operations available on FWFile have to do with reading and writing lines, - // truncation and resets, which mimic a delete/create cycle on the file - // system. The function `FWFile::read_line` is the most complex and you're - // warmly encouraged to read the documentation present there. - impl FWFile { - pub fn new() -> FWFile { - FWFile { - contents: vec![], - read_idx: 0, - previous_read_size: 0, - reads_available: 0, - } - } - - pub fn reset(&mut self) { - self.contents.truncate(0); - self.read_idx = 0; - self.previous_read_size = 0; - self.reads_available = 0; - } - - pub fn truncate(&mut self) { - self.reads_available = 0; - self.contents.truncate(0); - } - - pub fn write_line(&mut self, input: &str) { - self.contents.extend_from_slice(input.as_bytes()); - self.contents.push(b'\n'); - self.reads_available += 1; - } - - /// Read a line from storage, if a line is available to be read. - pub fn read_line(&mut self) -> Option { - // FWFile mimics a unix file being read in a buffered fashion, - // driven by file_watcher. We _have_ to keep on top of where the - // reader's read index -- called read_idx -- is between reads and - // the size of the file -- called previous_read_size -- in the event - // of truncation. - // - // If we detect in file_watcher that a truncation has happened then - // the buffered reader is seeked back to 0. This is performed in - // like kind when we reset read_idx to 0, as in the following case - // where there are no reads available. - if self.contents.is_empty() && self.reads_available == 0 { - self.read_idx = 0; - self.previous_read_size = 0; - return None; - } - // Now, the above is done only when nothing has been written to the - // FWFile or the contents have been totally removed. The trickier - // case is where there are maybe _some_ things to be read but the - // read_idx might be mis-set owing to truncations. - // - // `read_line` is performed in a line-wise fashion. start_idx - // and end_idx are pulled apart from one another to find the - // start and end of the line, if there's a line to be found. - let mut end_idx; - let start_idx; - // Here's where we do truncation detection. When our file has - // shrunk, restart the search at zero index. If the file is the - // same size -- implying that it's either not changed or was - // truncated and then filled back in before a read could occur - // -- we return None. Else, start searching at the present - // read_idx. - let max = self.contents.len(); - if self.previous_read_size > max { - self.read_idx = 0; - start_idx = 0; - end_idx = 0; - } else if self.read_idx == max { - return None; - } else { - start_idx = self.read_idx; - end_idx = self.read_idx; - } - // Seek end_idx forward until we hit the newline character. - while self.contents[end_idx] != b'\n' { - end_idx += 1; - if end_idx == max { - return None; - } - } - // Produce the read string -- minus its newline character -- and - // set the control variables appropriately. - let ret = str::from_utf8(&self.contents[start_idx..end_idx]).unwrap(); - self.read_idx = end_idx + 1; - self.reads_available -= 1; - self.previous_read_size = max; - // There's a trick here. What happens if we _only_ read a - // newline character. Well, that'll happen when truncations - // cause trimmed reads and the only remaining character in the - // line is the newline. Womp womp - if !ret.is_empty() { - Some(ret.to_string()) - } else { - None - } - } - } - - impl Arbitrary for FWAction { - fn arbitrary(g: &mut Gen) -> FWAction { - let i: usize = *g.choose(&(0..100).collect::>()).unwrap(); - match i { - // These weights are more or less arbitrary. 'Pause' maybe - // doesn't have a use but we keep it in place to allow for - // variations in file-system flushes. - 0..=50 => { - const GEN_ASCII_STR_CHARSET: &[u8] = - b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - let ln_sz = *g.choose(&(1..32).collect::>()).unwrap(); - FWAction::WriteLine( - std::iter::repeat_with(|| *g.choose(&GEN_ASCII_STR_CHARSET).unwrap()) - .take(ln_sz) - .map(|v| -> char { v.into() }) - .collect(), - ) - } - 51..=69 => FWAction::Read, - 70..=75 => { - let pause = *g.choose(&(1..3).collect::>()).unwrap(); - FWAction::Pause(pause) - } - 76..=85 => FWAction::RotateFile, - 86..=90 => FWAction::TruncateFile, - 91..=95 => FWAction::DeleteFile, - _ => FWAction::Exit, - } - } - } - - // Interpret all FWActions, including truncation - // - // In the presence of truncation we cannot accurately determine which writes - // will eventually be read. This is because of the presence of buffered - // reading in file_watcher which pulls an unknown amount from the underlying - // disk. This is _good_ in the sense that we reduce the total number of file - // system reads and potentially retain data that would otherwise be lost - // during a truncation but is bad on account of we cannot guarantee _which_ - // writes are lost in the presence of truncation. - // - // What we can do, though, is drive our FWFile model and the SUT at the same - // time, recording the total number of reads/writes. The SUT reads should be - // bounded below by the model reads, bounded above by the writes. - fn experiment(actions: Vec) { - let dir = tempfile::TempDir::new().expect("could not create tempdir"); - let path = dir.path().join("a_file.log"); - let mut fp = fs::File::create(&path).expect("could not create"); - let mut rotation_count = 0; - let mut fw = FileWatcher::new( - path.clone(), - ReadFrom::Beginning, - None, - 100_000, - Bytes::from("\n"), - ) - .expect("must be able to create"); - - let mut writes = 0; - let mut sut_reads = 0; - let mut model_reads = 0; - - let mut fwfiles: Vec = vec![]; - fwfiles.push(FWFile::new()); - let mut read_index = 0; - for action in actions.iter() { - match *action { - FWAction::DeleteFile => { - let _ = fs::remove_file(&path); - assert!(!path.exists()); - fwfiles[0].reset(); - break; - } - FWAction::TruncateFile => { - fwfiles[0].truncate(); - fp = fs::OpenOptions::new() - .read(true) - .write(true) - .truncate(true) - .open(&path) - .expect("could not truncate"); - #[cfg(unix)] - assert_eq!(fp.metadata().expect("could not get metadata").size(), 0); - #[cfg(windows)] - assert_eq!( - fp.metadata().expect("could not get metadata").file_size(), - 0 - ); - assert!(path.exists()); - } - FWAction::Pause(ps) => delay(ps), - FWAction::Exit => break, - FWAction::WriteLine(ref s) => { - fwfiles[0].write_line(s); - assert!(fp.write(s.as_bytes()).is_ok()); - assert!(fp.write(b"\n").is_ok()); - assert!(fp.flush().is_ok()); - writes += 1; - } - FWAction::RotateFile => { - let mut new_path = path.clone(); - new_path.set_extension(format!("log.{}", rotation_count)); - rotation_count += 1; - fs::rename(&path, &new_path).expect("could not rename"); - fp = fs::File::create(&path).expect("could not create"); - fwfiles.insert(0, FWFile::new()); - read_index += 1; - } - FWAction::Read => { - let mut attempts = 10; - while attempts > 0 { - match fw.read_line() { - Err(_) => { - unreachable!(); - } - Ok(Some(line)) if line.is_empty() => { - attempts -= 1; - continue; - } - Ok(None) => { - attempts -= 1; - continue; - } - Ok(_) => { - sut_reads += 1; - let psv = fwfiles[read_index].read_line(); - if psv.is_some() { - model_reads += 1; - break; - } - break; - } - } - } - } - } - } - assert!(writes >= sut_reads); - assert!(sut_reads >= model_reads); - } - - // Interpret all FWActions, excluding truncation - // - // This interpretation is the happy case. When there are no truncations our - // model and SUT should agree exactly. To that end, we confirm that every - // read from SUT exactly matches the reads from the model. - fn experiment_no_truncations(actions: Vec) { - let dir = tempfile::TempDir::new().expect("could not create tempdir"); - let path = dir.path().join("a_file.log"); - let mut fp = fs::File::create(&path).expect("could not create"); - let mut rotation_count = 0; - let mut fw = FileWatcher::new( - path.clone(), - ReadFrom::Beginning, - None, - 100_000, - Bytes::from("\n"), - ) - .expect("must be able to create"); - - let mut fwfiles: Vec = vec![]; - fwfiles.push(FWFile::new()); - let mut read_index = 0; - for action in actions.iter() { - match *action { - FWAction::DeleteFile => { - let _ = fs::remove_file(&path); - assert!(!path.exists()); - fwfiles[0].reset(); - break; - } - FWAction::TruncateFile => {} - FWAction::Pause(ps) => delay(ps), - FWAction::Exit => break, - FWAction::WriteLine(ref s) => { - fwfiles[0].write_line(s); - assert!(fp.write(s.as_bytes()).is_ok()); - assert!(fp.write(b"\n").is_ok()); - assert!(fp.flush().is_ok()); - } - FWAction::RotateFile => { - let mut new_path = path.clone(); - new_path.set_extension(format!("log.{}", rotation_count)); - rotation_count += 1; - fs::rename(&path, &new_path).expect("could not rename"); - fp = fs::File::create(&path).expect("could not create"); - fwfiles.insert(0, FWFile::new()); - read_index += 1; - } - FWAction::Read => { - let mut attempts = 10; - while attempts > 0 { - match fw.read_line() { - Err(_) => { - unreachable!(); - } - Ok(Some(line)) if line.is_empty() => { - attempts -= 1; - assert!(fwfiles[read_index].read_line().is_none()); - continue; - } - Ok(None) => { - attempts -= 1; - assert!(fwfiles[read_index].read_line().is_none()); - continue; - } - Ok(Some(line)) => { - let exp = - fwfiles[read_index].read_line().expect("could not readline"); - assert_eq!(exp.into_bytes(), line); - // assert_eq!(sz, buf.len() + 1); - break; - } - } - } - } - } - } - } - - #[test] - fn file_watcher_no_truncation() { - fn inner(actions: Vec) -> TestResult { - experiment_no_truncations(actions); - TestResult::passed() - } - QuickCheck::new() - .tests(10000) - .max_tests(100000) - .quickcheck(inner as fn(Vec) -> TestResult); - } - - #[test] - fn file_watcher_with_truncation() { - fn inner(actions: Vec) -> TestResult { - experiment(actions); - TestResult::passed() - } - QuickCheck::new() - .tests(10000) - .max_tests(100000) - .quickcheck(inner as fn(Vec) -> TestResult); - } - - #[inline] - pub fn delay(attempts: u32) { - let delay = match attempts { - 0 => return, - 1 => 1, - 2 => 4, - 3 => 8, - 4 => 16, - 5 => 32, - 6 => 64, - 7 => 128, - 8 => 256, - _ => 512, - }; - let sleep_time = std::time::Duration::from_millis(delay as u64); - std::thread::sleep(sleep_time); - } -}