diff --git a/Dockerfile b/Dockerfile index 43fa7653a..67e2825d0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,7 @@ COPY . /app RUN cargo build --release --locked --bin lading FROM docker.io/debian:bullseye-20240701-slim +RUN apt-get update && apt-get install -y libfuse3-dev=3.10.3-2 && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/lading /usr/bin/lading # smoke test diff --git a/lading/src/generator/file_gen.rs b/lading/src/generator/file_gen.rs index 9542f8046..40d350dc8 100644 --- a/lading/src/generator/file_gen.rs +++ b/lading/src/generator/file_gen.rs @@ -13,7 +13,7 @@ //! pub mod logrotate; -pub mod model; +pub mod logrotate_fs; pub mod traditional; use std::str; @@ -31,6 +31,9 @@ pub enum Error { /// Wrapper around [`logrotate::Error`]. #[error(transparent)] Logrotate(#[from] logrotate::Error), + /// Wrapper around [`logrotate_fs::Error`]. + #[error(transparent)] + LogrotateFs(#[from] logrotate_fs::Error), } /// Configuration of [`FileGen`] @@ -42,6 +45,8 @@ pub enum Config { Traditional(traditional::Config), /// See [`logrotate::Config`]. Logrotate(logrotate::Config), + /// See [`logrotate_fs::Config`]. + LogrotateFs(logrotate_fs::Config), } #[derive(Debug)] @@ -54,6 +59,8 @@ pub enum FileGen { Traditional(traditional::Server), /// See [`logrotate::Server`] for details. Logrotate(logrotate::Server), + /// See [`logrotate_fs::Server`] for details. + LogrotateFs(logrotate_fs::Server), } impl FileGen { @@ -78,6 +85,9 @@ impl FileGen { Self::Traditional(traditional::Server::new(general, c, shutdown)?) } Config::Logrotate(c) => Self::Logrotate(logrotate::Server::new(general, c, shutdown)?), + Config::LogrotateFs(c) => { + Self::LogrotateFs(logrotate_fs::Server::new(general, c, shutdown)?) + } }; Ok(srv) } @@ -98,6 +108,7 @@ impl FileGen { match self { Self::Traditional(inner) => inner.spin().await?, Self::Logrotate(inner) => inner.spin().await?, + Self::LogrotateFs(inner) => inner.spin().await?, }; Ok(()) diff --git a/lading/src/bin/logrotate_fs.rs b/lading/src/generator/file_gen/logrotate_fs.rs similarity index 58% rename from lading/src/bin/logrotate_fs.rs rename to lading/src/generator/file_gen/logrotate_fs.rs index ef2f3bbe7..e72cf4143 100644 --- a/lading/src/bin/logrotate_fs.rs +++ b/lading/src/generator/file_gen/logrotate_fs.rs @@ -1,16 +1,25 @@ -use byte_unit::Byte; -use clap::Parser; +//! A filesystem that mimics logs with rotation + +#![allow(clippy::cast_sign_loss)] // TODO remove these clippy allows +#![allow(clippy::cast_possible_truncation)] +#![allow(clippy::cast_possible_wrap)] +#![allow(missing_docs)] +#![allow(clippy::missing_panics_doc)] +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::needless_pass_by_value)] + use fuser::{ - FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request, + spawn_mount2, BackgroundSession, FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData, + ReplyDirectory, ReplyEntry, Request, }; -use lading::generator::file_gen::model; use lading_payload::block; use rand::{rngs::SmallRng, SeedableRng}; +use tokio::task; use tracing::{error, info}; -use tracing_subscriber::{fmt::format::FmtSpan, util::SubscriberInitExt}; // use lading_payload::block; +use crate::generator; use nix::libc::{self, ENOENT}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, ffi::OsStr, @@ -20,66 +29,134 @@ use std::{ time::Duration, }; -// fn default_config_path() -> String { -// "/etc/lading/logrotate_fs.yaml".to_string() -// } - -#[derive(Parser, Debug)] -#[clap(author, version, about, long_about = None)] -struct Args { - // /// path on disk to the configuration file - // #[clap(long, default_value_t = default_config_path())] - // config_path: String, - #[clap(long)] - bytes_per_second: Byte, - #[clap(long)] - mount_point: PathBuf, -} +mod model; + +const TTL: Duration = Duration::from_secs(1); // Attribute cache timeout -#[derive(Debug, Deserialize, PartialEq)] +#[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(deny_unknown_fields)] /// Configuration of [`FileGen`] -struct Config { - // // /// The seed for random operations against this target - // // pub seed: [u8; 32], - // /// Total number of concurrent logs. - // concurrent_logs: u16, - // /// The **soft** maximum byte size of each log. - // maximum_bytes_per_log: Byte, - // /// The number of rotations per log file. - // total_rotations: u8, - // /// The maximum directory depth allowed below the root path. If 0 all log - // /// files will be present in the root path. - // max_depth: u8, - // /// Sets the [`crate::payload::Config`] of this template. - // variant: lading_payload::Config, +pub struct Config { + /// The seed for random operations against this target + pub seed: [u8; 32], + /// Total number of concurrent logs. + concurrent_logs: u16, + /// The maximum byte size of each log. + maximum_bytes_per_log: byte_unit::Byte, + /// The number of rotations per log file. + total_rotations: u8, + /// The maximum directory depth allowed below the root path. If 0 all log + /// files will be present in the root path. + max_depth: u8, + /// Sets the [`crate::payload::Config`] of this template. + variant: lading_payload::Config, /// Defines the number of bytes that written in each log file. - bytes_per_second: Byte, - // /// Defines the maximum internal cache of this log target. `file_gen` will - // /// pre-build its outputs up to the byte capacity specified here. - // maximum_prebuild_cache_size_bytes: Byte, - // /// The maximum size in bytes of the largest block in the prebuild cache. - // #[serde(default = "lading_payload::block::default_maximum_block_size")] - // maximum_block_size: byte_unit::Byte, - // /// Whether to use a fixed or streaming block cache - // #[serde(default = "lading_payload::block::default_cache_method")] - // block_cache_method: block::CacheMethod, - // /// The load throttle configuration - // #[serde(default)] - // throttle: lading_throttle::Config, + bytes_per_second: byte_unit::Byte, + /// Defines the maximum internal cache of this log target. `file_gen` will + /// pre-build its outputs up to the byte capacity specified here. + maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// The maximum size in bytes of the largest block in the prebuild cache. + #[serde(default = "lading_payload::block::default_maximum_block_size")] + maximum_block_size: byte_unit::Byte, /// The mount-point for this filesystem mount_point: PathBuf, } #[derive(thiserror::Error, Debug)] +/// Error for `LogrotateFs` pub enum Error { #[error(transparent)] + /// IO error Io(#[from] std::io::Error), - #[error("Failed to deserialize configuration: {0}")] - SerdeYaml(#[from] serde_yaml::Error), + /// Creation of payload blocks failed. + #[error("Block creation error: {0}")] + Block(#[from] block::Error), + /// Failed to convert, value is 0 + #[error("Value provided must not be zero")] + Zero, } -const TTL: Duration = Duration::from_secs(1); // Attribute cache timeout +#[derive(Debug)] +/// The logrotate filesystem server. +/// +/// This generator manages a FUSE filesystem which "writes" files to a mounted +/// filesystem, rotating them as appropriate. It does this without coordination +/// to the target _but_ keeps track of how many bytes are written and read +/// during operation. +pub struct Server { + // handles: Vec>>, + shutdown: lading_signal::Watcher, + background_session: BackgroundSession, +} + +impl Server { + pub fn new( + _: generator::General, + config: Config, + shutdown: lading_signal::Watcher, + ) -> Result { + // TODO spawn a filesystem thread here and not in spin but bubble up any + // errors and make it so that spin waits for the FS thread and also for + // the shutdown signal. We have a synchronous try_recv on the Watcher + // but the way that fuser runs it just blocks the thread and there's no + // place to poll. + let mut rng = SmallRng::from_seed(config.seed); + + let total_bytes = + NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as u32) + .ok_or(Error::Zero)?; + let block_cache = block::Cache::fixed( + &mut rng, + total_bytes, + config.maximum_block_size.get_bytes(), + &config.variant, + )?; + + let state = model::State::new( + &mut rng, + config.bytes_per_second.get_bytes() as u64, + config.total_rotations, + config.maximum_bytes_per_log.get_bytes() as u64, + block_cache, + config.max_depth, + config.concurrent_logs, + ); + + // Initialize the FUSE filesystem + let fs = LogrotateFS { + state: Arc::new(Mutex::new(state)), + open_files: Arc::new(Mutex::new(HashMap::new())), + start_time: std::time::Instant::now(), + start_time_system: std::time::SystemTime::now(), + }; + + let options = vec![ + MountOption::FSName("lading_logrotate_fs".to_string()), + MountOption::AutoUnmount, + MountOption::AllowOther, + ]; + + // Mount the filesystem in the background + let background_session = spawn_mount2(fs, config.mount_point, &options) + .expect("Failed to mount FUSE filesystem"); + + Ok(Self { + shutdown, + background_session, + }) + } + + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + pub async fn spin(self) -> Result<(), Error> { + self.shutdown.recv().await; + + let handle = task::spawn_blocking(|| self.background_session.join()); + let _ = handle.await; + + Ok(()) + } +} #[derive(Debug)] struct LogrotateFS { @@ -111,12 +188,13 @@ fn getattr_helper( let access_duration = Duration::from_secs(attr.access_tick); let modified_duration = Duration::from_secs(attr.modified_tick); let status_duration = Duration::from_secs(attr.status_tick); + let created_duration = Duration::from_secs(attr.created_tick); // Calculate SystemTime instances let atime = start_time_system + access_duration; let mtime = start_time_system + modified_duration; let ctime = start_time_system + status_duration; - let crtime = start_time_system; // Assume creation time is when the filesystem started + let crtime = start_time_system + created_duration; FileAttr { ino: attr.inode as u64, @@ -146,17 +224,13 @@ fn getattr_helper( } impl Filesystem for LogrotateFS { - #[tracing::instrument(skip(self, _req, _config))] - fn init( - &mut self, - _req: &Request, - _config: &mut fuser::KernelConfig, - ) -> Result<(), libc::c_int> { + #[tracing::instrument(skip(self))] + fn init(&mut self, _: &Request, _: &mut fuser::KernelConfig) -> Result<(), libc::c_int> { Ok(()) } - #[tracing::instrument(skip(self, _req, reply))] - fn lookup(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEntry) { + #[tracing::instrument(skip(self, reply))] + fn lookup(&mut self, _: &Request, parent: u64, name: &OsStr, reply: ReplyEntry) { let tick = self.get_current_tick(); let mut state = self.state.lock().expect("lock poisoned"); @@ -166,17 +240,16 @@ impl Filesystem for LogrotateFS { info!("lookup: returning attr for inode {}: {:?}", ino, attr); reply.entry(&TTL, &attr, 0); return; - } else { - error!("lookup: getattr_helper returned None for inode {}", ino); } + error!("lookup: getattr_helper returned None for inode {}", ino); } else { error!("lookup: state.lookup returned None for name {}", name_str); } reply.error(ENOENT); } - #[tracing::instrument(skip(self, _req, reply))] - fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) { + #[tracing::instrument(skip(self, reply))] + fn getattr(&mut self, _: &Request, ino: u64, reply: ReplyAttr) { let tick = self.get_current_tick(); let mut state = self.state.lock().expect("lock poisoned"); @@ -187,16 +260,16 @@ impl Filesystem for LogrotateFS { } } - #[tracing::instrument(skip(self, _req, reply))] + #[tracing::instrument(skip(self, reply))] fn read( &mut self, - _req: &Request, + _: &Request, ino: u64, fh: u64, offset: i64, size: u32, - _flags: i32, - _lock_owner: Option, + _: i32, + _: Option, reply: ReplyData, ) { let tick = self.get_current_tick(); @@ -205,7 +278,7 @@ impl Filesystem for LogrotateFS { // Get the FileHandle from fh let file_handle = { let open_files = self.open_files.lock().expect("lock poisoned"); - open_files.get(&fh).cloned() + open_files.get(&fh).copied() }; if let Some(file_handle) = file_handle { @@ -223,15 +296,15 @@ impl Filesystem for LogrotateFS { } } - #[tracing::instrument(skip(self, _req, reply))] + #[tracing::instrument(skip(self, reply))] fn release( &mut self, - _req: &Request, - _ino: u64, + _: &Request, + _: u64, fh: u64, - _flags: i32, - _lock_owner: Option, - _flush: bool, + _: i32, + _: Option, + _: bool, reply: fuser::ReplyEmpty, ) { let tick = self.get_current_tick(); @@ -252,15 +325,8 @@ impl Filesystem for LogrotateFS { } } - #[tracing::instrument(skip(self, _req, reply))] - fn readdir( - &mut self, - _req: &Request, - ino: u64, - _fh: u64, - offset: i64, - mut reply: ReplyDirectory, - ) { + #[tracing::instrument(skip(self, reply))] + fn readdir(&mut self, _: &Request, ino: u64, _: u64, offset: i64, mut reply: ReplyDirectory) { let tick = self.get_current_tick(); let mut state = self.state.lock().expect("lock poisoned"); state.advance_time(tick); @@ -332,64 +398,3 @@ impl Filesystem for LogrotateFS { } } } - -#[tracing::instrument] -fn main() -> Result<(), Error> { - tracing_subscriber::fmt() - .with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE) - .with_ansi(false) - .finish() - .init(); - - info!("Hello, welcome. I hope things are well with you."); - - let args = Args::parse(); - // let config_contents = std::fs::read_to_string(&args.config_path)?; - // let config: Config = serde_yaml::from_str(&config_contents)?; - - let primes: [u8; 32] = [ - 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, - 97, 101, 103, 107, 109, 113, 127, 131, - ]; - let mut rng = SmallRng::from_seed(primes); - - let block_cache = block::Cache::fixed( - &mut rng, - NonZeroU32::new(100_000_000).expect("zero value"), // TODO make this an Error - 10_000, // 10 KiB - &lading_payload::Config::Ascii, - ) - .expect("block construction"); // TODO make this an Error - - let state = model::State::new( - &mut rng, - args.bytes_per_second.get_bytes() as u64, // Adjust units accordingly - 5, // TODO make an argument - 1_000_000, // 1MiB - block_cache, - 10, // max_depth - 8, // concurrent_logs - ); - - // Initialize the FUSE filesystem - let fs = LogrotateFS { - state: Arc::new(Mutex::new(state)), - open_files: Arc::new(Mutex::new(HashMap::new())), - start_time: std::time::Instant::now(), - start_time_system: std::time::SystemTime::now(), - }; - - // Mount the filesystem - fuser::mount2( - fs, - &args.mount_point, - &[ - MountOption::FSName("logrotate_fs".to_string()), - MountOption::AutoUnmount, - MountOption::AllowOther, - ], - ) - .expect("Failed to mount FUSE filesystem"); - - Ok(()) -} diff --git a/lading/src/generator/file_gen/model.rs b/lading/src/generator/file_gen/logrotate_fs/model.rs similarity index 89% rename from lading/src/generator/file_gen/model.rs rename to lading/src/generator/file_gen/logrotate_fs/model.rs index fae600a6a..045544171 100644 --- a/lading/src/generator/file_gen/model.rs +++ b/lading/src/generator/file_gen/logrotate_fs/model.rs @@ -4,18 +4,18 @@ use std::collections::{HashMap, HashSet}; use bytes::Bytes; use lading_payload::block; +use metrics::counter; use rand::Rng; -use tracing::info; /// Time representation of the model -pub type Tick = u64; +pub(crate) type Tick = u64; /// The identification node number -pub type Inode = usize; +pub(crate) type Inode = usize; /// Model representation of a `File`. Does not actually contain any bytes but /// stores sufficient metadata to determine access patterns over time. #[derive(Debug, Clone, Copy)] -pub struct File { +pub(crate) struct File { /// The parent `Node` of this `File`. parent: Inode, @@ -30,6 +30,8 @@ pub struct File { /// Property: `bytes_written` >= `bytes_read`. bytes_read: u64, + /// The `Tick` on which the `File` was created. + created_tick: Tick, /// The `Tick` on which the `File` was last accessed. Updated on reads, /// opens for reading. access_tick: Tick, @@ -49,6 +51,9 @@ pub struct File { /// happen -- or not. read_only: bool, + /// When the file became read-only. Will only be Some if `read_only` is false. + read_only_since: Option, + /// The peer of this file, the next in line in rotation. So, if this file is /// foo.log the peer will be foo.log.1 and its peer foo.log.2 etc. peer: Option, @@ -67,11 +72,14 @@ pub struct File { /// Indicates that the `File` no longer has a name but is not removed from /// the filesystem. unlinked: bool, + + /// The maximual offset observed, maintained by `State`. + max_offset_observed: u64, } /// Represents an open file handle. #[derive(Debug, Clone, Copy)] -pub struct FileHandle { +pub(crate) struct FileHandle { id: u64, inode: Inode, } @@ -79,13 +87,13 @@ pub struct FileHandle { impl FileHandle { /// Return the ID of this file handle #[must_use] - pub fn id(&self) -> u64 { + pub(crate) fn id(&self) -> u64 { self.id } /// Return the inode of this file handle #[must_use] - pub fn inode(&self) -> Inode { + pub(crate) fn inode(&self) -> Inode { self.inode } } @@ -94,7 +102,7 @@ impl File { /// Open a new handle to this file. /// /// TODO these need to modify access time et al - pub fn open(&mut self, now: Tick) { + pub(crate) fn open(&mut self, now: Tick) { self.advance_time(now); if now > self.access_tick { self.access_tick = now; @@ -110,7 +118,7 @@ impl File { /// /// Function will panic if attempt is made to close file with no file /// handles outstanding. - pub fn close(&mut self, now: Tick) { + pub(crate) fn close(&mut self, now: Tick) { self.advance_time(now); assert!( @@ -120,20 +128,8 @@ impl File { self.open_handles -= 1; } - /// Return the number of open file handles to this `File` - #[must_use] - pub fn open_handles(&self) -> usize { - self.open_handles - } - - /// Return whether the file is unlinked or not - #[must_use] - pub fn unlinked(&self) -> bool { - self.unlinked - } - /// Mark the file as unlinked (deleted). - pub fn unlink(&mut self, now: Tick) { + pub(crate) fn unlink(&mut self, now: Tick) { self.advance_time(now); self.unlinked = true; @@ -149,13 +145,14 @@ impl File { /// /// Updates `access_tick` to `now` and adds `request` to `bytes_read`. Time /// will be advanced, meaning `modified_tick` may update. - pub fn read(&mut self, request: u64, now: Tick) { + pub(crate) fn read(&mut self, request: u64, now: Tick) { self.advance_time(now); if now > self.access_tick { self.access_tick = now; self.status_tick = now; } + counter!("bytes_read").increment(request); self.bytes_read = self.bytes_read.saturating_add(request); } @@ -174,6 +171,7 @@ impl File { let diff = now.saturating_sub(self.modified_tick); let bytes_accum = diff.saturating_mul(self.bytes_per_tick); + counter!("bytes_written").increment(bytes_accum); self.bytes_written = self.bytes_written.saturating_add(bytes_accum); self.modified_tick = now; self.status_tick = now; @@ -183,24 +181,25 @@ impl File { /// /// This function flips the internal bool on this `File` stopping any future /// byte accumulations. - pub fn set_read_only(&mut self) { + pub(crate) fn set_read_only(&mut self, now: Tick) { self.read_only = true; + self.read_only_since = Some(now); } /// Return whether the file is read-only or not #[must_use] - pub fn read_only(&self) -> bool { + pub(crate) fn read_only(&self) -> bool { self.read_only } /// Return the ordinal number of this File #[must_use] - pub fn ordinal(&self) -> u8 { + pub(crate) fn ordinal(&self) -> u8 { self.ordinal } /// Increment the ordinal number of this File - pub fn incr_ordinal(&mut self) { + pub(crate) fn incr_ordinal(&mut self) { self.ordinal = self.ordinal.saturating_add(1); } @@ -208,22 +207,31 @@ impl File { /// /// This function does not advance time. #[must_use] - pub fn size(&self) -> u64 { + pub(crate) fn size(&self) -> u64 { self.bytes_written } + + /// Calculate the expected bytes written based on writable duration. + #[cfg(test)] + pub(crate) fn expected_bytes_written(&self, now: Tick) -> u64 { + let start_tick = self.created_tick; + let end_tick = self.read_only_since.unwrap_or(now); + let writable_duration = end_tick.saturating_sub(start_tick); + self.bytes_per_tick.saturating_mul(writable_duration) + } } /// Model representation of a `Directory`. Contains children are `Directory` /// instances or `File` instances. Root directory will not have a `parent`. #[derive(Debug)] -pub struct Directory { +pub(crate) struct Directory { children: HashSet, parent: Option, } /// A filesystem object, either a `File` or a `Directory`. #[derive(Debug)] -pub enum Node { +pub(crate) enum Node { /// A [`File`] File { /// The `File` instance. @@ -238,22 +246,12 @@ pub enum Node { }, } -impl Node { - /// Run the clock forward on this node - pub fn advance_time(&mut self, now: Tick) { - match self { - Node::Directory { .. } => { /* nothing, intentionally */ } - Node::File { file, .. } => file.advance_time(now), - } - } -} - /// The state of the filesystem /// /// This structure is responsible for maintenance of the structure of the /// filesystem. It does not contain any bytes, the caller must maintain this /// themselves. -pub struct State { +pub(crate) struct State { nodes: HashMap, root_inode: Inode, now: Tick, @@ -264,7 +262,6 @@ pub struct State { group_names: Vec>, next_inode: Inode, next_file_handle: u64, - lost_bytes: u64, } impl std::fmt::Debug for State { @@ -284,24 +281,26 @@ impl std::fmt::Debug for State { /// The attributes of a `Node`. #[derive(Debug, Clone, Copy)] -pub struct NodeAttributes { +pub(crate) struct NodeAttributes { /// The id of the node. - pub inode: Inode, + pub(crate) inode: Inode, /// The kind, whether a file or directory. - pub kind: NodeType, + pub(crate) kind: NodeType, /// The size in bytes. - pub size: u64, + pub(crate) size: u64, /// The last access time in ticks. - pub access_tick: Tick, + pub(crate) access_tick: Tick, /// The last modified time in ticks. - pub modified_tick: Tick, + pub(crate) modified_tick: Tick, /// The last status change time in ticks. - pub status_tick: Tick, + pub(crate) status_tick: Tick, + /// The tick on which the file was created. + pub(crate) created_tick: Tick, } /// Describe whether the Node is a File or Directory. #[derive(Debug, Clone, Copy)] -pub enum NodeType { +pub(crate) enum NodeType { /// A [`File`] File, /// A [`Directory`] @@ -311,7 +310,7 @@ pub enum NodeType { impl State { /// Create a new instance of `State`. #[tracing::instrument(skip(rng, block_cache))] - pub fn new( + pub(crate) fn new( rng: &mut R, bytes_per_tick: u64, max_rotations: u8, @@ -348,7 +347,6 @@ impl State { group_names: Vec::new(), next_inode: 2, next_file_handle: 0, - lost_bytes: 0, }; if concurrent_logs == 0 { @@ -449,6 +447,7 @@ impl State { access_tick: state.now, modified_tick: state.now, status_tick: state.now, + created_tick: state.now, bytes_per_tick, read_only: false, ordinal: 0, @@ -456,6 +455,8 @@ impl State { group_id, open_handles: 0, unlinked: false, + max_offset_observed: 0, + read_only_since: None, }; state.nodes.insert(file_inode, Node::File { file }); @@ -471,7 +472,7 @@ impl State { /// Open a file and return a handle. /// /// This function advances time. - pub fn open_file(&mut self, now: Tick, inode: Inode) -> Option { + pub(crate) fn open_file(&mut self, now: Tick, inode: Inode) -> Option { self.advance_time(now); if let Some(Node::File { file, .. }) = self.nodes.get_mut(&inode) { @@ -491,7 +492,7 @@ impl State { /// # Panics /// /// Function will panic if `FileHandle` is not valid. - pub fn close_file(&mut self, now: Tick, handle: FileHandle) { + pub(crate) fn close_file(&mut self, now: Tick, handle: FileHandle) { self.advance_time(now); if let Some(Node::File { file, .. }) = self.nodes.get_mut(&handle.inode) { @@ -507,7 +508,8 @@ impl State { /// /// Will panic if passed `now` is less than recorded `now`. Time can only /// advance. - pub fn advance_time(&mut self, now: Tick) { + #[allow(clippy::too_many_lines)] + pub(crate) fn advance_time(&mut self, now: Tick) { assert!(now >= self.now); let mut inodes: Vec = self.nodes.keys().copied().collect(); @@ -532,7 +534,7 @@ impl State { "Expected rotated file to be 0th ordinal, was {ordinal}", ordinal = file.ordinal() ); - file.set_read_only(); + file.set_read_only(now); Some(( inode, file.parent, @@ -566,13 +568,14 @@ impl State { // become the 0th ordinal in the `group_id` and may -- although // we don't know yet -- cause `rotated_inode` to be deleted. let new_file_inode = self.next_inode; - let new_file = File { + let mut new_file = File { parent: parent_inode, bytes_written: 0, bytes_read: 0, - access_tick: now, - modified_tick: now, - status_tick: now, + access_tick: self.now, + modified_tick: self.now, + status_tick: self.now, + created_tick: self.now, bytes_per_tick, read_only: false, ordinal: 0, @@ -580,7 +583,10 @@ impl State { group_id, open_handles: 0, unlinked: false, + max_offset_observed: 0, + read_only_since: None, }; + new_file.advance_time(now); // Insert `new_file` into the node list and make it a member of // its directory's children. @@ -688,9 +694,8 @@ impl State { } for inode in to_remove { if let Some(Node::File { file }) = self.nodes.remove(&inode) { - let lost_bytes = file.bytes_written.saturating_sub(file.bytes_read); - self.lost_bytes += lost_bytes; - info!("TOTAL BYTES LOST: {lost}", lost = self.lost_bytes); + let lost_bytes = file.bytes_written.saturating_sub(file.max_offset_observed); + counter!("lost_bytes").increment(lost_bytes); } } } @@ -701,7 +706,7 @@ impl State { /// returning any inode that happens to match. Time will be advanced to /// `now`. #[tracing::instrument(skip(self))] - pub fn lookup(&mut self, now: Tick, parent_inode: Inode, name: &str) -> Option { + pub(crate) fn lookup(&mut self, now: Tick, parent_inode: Inode, name: &str) -> Option { self.advance_time(now); if let Some(Node::Directory { dir, .. }) = self.nodes.get(&parent_inode) { @@ -726,7 +731,7 @@ impl State { /// /// Time will be advanced to `now`. #[tracing::instrument(skip(self))] - pub fn getattr(&mut self, now: Tick, inode: Inode) -> Option { + pub(crate) fn getattr(&mut self, now: Tick, inode: Inode) -> Option { self.advance_time(now); self.nodes.get(&inode).map(|node| match node { @@ -737,6 +742,7 @@ impl State { access_tick: file.access_tick, modified_tick: file.modified_tick, status_tick: file.status_tick, + created_tick: file.created_tick, }, Node::Directory { .. } => NodeAttributes { inode, @@ -745,6 +751,7 @@ impl State { access_tick: self.now, modified_tick: self.now, status_tick: self.now, + created_tick: self.now, }, }) } @@ -755,7 +762,7 @@ impl State { /// be advanced -- and a slice up to `size` bytes will be returned or `None` /// if no bytes are available to be read. #[tracing::instrument(skip(self))] - pub fn read( + pub(crate) fn read( &mut self, file_handle: FileHandle, offset: usize, @@ -778,8 +785,12 @@ impl State { let available = bytes_written.saturating_sub(offset); let to_read = available.min(size); + let end_offset = offset as u64 + to_read as u64; + file.max_offset_observed = file.max_offset_observed.max(end_offset); + // Get data from block_cache without worrying about blocks let data = self.block_cache.read_at(offset as u64, to_read); + assert!(data.len() == to_read, "Data returned from block_cache is distinct from the read size: {l} != {to_read}", l = data.len()); file.read(to_read as u64, now); @@ -796,7 +807,7 @@ impl State { /// /// Function does not advance time in the model. #[tracing::instrument(skip(self))] - pub fn readdir(&self, inode: Inode) -> Option<&HashSet> { + pub(crate) fn readdir(&self, inode: Inode) -> Option<&HashSet> { if let Some(Node::Directory { dir, .. }) = self.nodes.get(&inode) { Some(&dir.children) } else { @@ -806,7 +817,7 @@ impl State { /// Get the fuser file type of an inode if it exists #[tracing::instrument(skip(self))] - pub fn get_file_type(&self, inode: Inode) -> Option { + pub(crate) fn get_file_type(&self, inode: Inode) -> Option { self.nodes.get(&inode).map(|node| match node { Node::Directory { .. } => fuser::FileType::Directory, Node::File { .. } => fuser::FileType::RegularFile, @@ -815,7 +826,7 @@ impl State { /// Return the name of the inode if it exists #[tracing::instrument(skip(self))] - pub fn get_name(&self, inode: Inode) -> Option<&str> { + pub(crate) fn get_name(&self, inode: Inode) -> Option<&str> { self.nodes .get(&inode) .map(|node| match node { @@ -829,7 +840,7 @@ impl State { /// Return the parent inode of an inode, if it exists #[tracing::instrument(skip(self))] - pub fn get_parent_inode(&self, inode: Inode) -> Option { + pub(crate) fn get_parent_inode(&self, inode: Inode) -> Option { if inode == self.root_inode { Some(self.root_inode) } else { @@ -842,13 +853,13 @@ impl State { /// Return the root inode of this state #[must_use] - pub fn root_inode(&self) -> Inode { + pub(crate) fn root_inode(&self) -> Inode { self.root_inode } /// Return the number of links for the inode. #[must_use] - pub fn nlink(&self, inode: Inode) -> usize { + pub(crate) fn nlink(&self, inode: Inode) -> usize { if let Some(Node::Directory { dir, .. }) = self.nodes.get(&inode) { let subdirectory_count = dir .children @@ -872,9 +883,7 @@ mod test { num::NonZeroU32, }; - use crate::generator::file_gen::model::FileHandle; - - use super::{Inode, Node, State}; + use super::{FileHandle, Inode, Node, State}; use lading_payload::block; use proptest::collection::vec; use proptest::prelude::*; @@ -995,14 +1004,17 @@ mod test { } fn assert_state_properties(state: &State) { - // Property 1: bytes_written >= bytes_read + // Property 1: bytes_written >= max_offset_observed + // + // While a caller can read the same bytes multiple times they cannot + // read past the maximum bytes available. for node in state.nodes.values() { if let Node::File { file } = node { assert!( - file.bytes_written >= file.bytes_read, - "bytes_written ({}) < bytes_read ({})", + file.bytes_written >= file.max_offset_observed, + "bytes_written ({}) < max_offset_observed ({})", file.bytes_written, - file.bytes_read + file.max_offset_observed, ); } } @@ -1040,10 +1052,7 @@ mod test { } if let Some(Node::File { file: peer_file }) = state.nodes.get(&peer_inode) { - assert!( - !peer_file.unlinked(), - "File was found in peer chain unlinked" - ); + assert!(!peer_file.unlinked, "File was found in peer chain unlinked"); expected_ordinal += 1; assert_eq!( peer_file.ordinal, @@ -1065,7 +1074,7 @@ mod test { // No ordinal should exeed max_rotations, so long as the file is linked. for node in state.nodes.values() { if let Node::File { file } = node { - if file.unlinked() { + if file.unlinked { continue; } assert!( @@ -1087,7 +1096,7 @@ mod test { // holds linked and unlinked files. for (&inode, node) in &state.nodes { if let Node::File { file } = node { - if file.unlinked() && file.open_handles() == 0 { + if file.unlinked && file.open_handles == 0 { panic!( "Found orphaned file inode {} (unlinked with zero open handles)", inode @@ -1102,7 +1111,7 @@ mod test { // the correct name. for (&inode, node) in &state.nodes { if let Node::File { file } = node { - if file.unlinked() { + if file.unlinked { continue; } if let Some(names) = state.group_names.get(file.group_id as usize) { @@ -1127,6 +1136,21 @@ mod test { } } } + + // Property 7: bytes_written are tick accurate + for node in state.nodes.values() { + if let Node::File { file } = node { + let expected_bytes = file.expected_bytes_written(state.now); + assert_eq!( + file.bytes_written, + expected_bytes, + "bytes_written ({}) does not match expected_bytes_written ({}) for file with inode {}", + file.bytes_written, + expected_bytes, + file.parent + ); + } + } } proptest! {