diff --git a/tensorboard/data/server/BUILD b/tensorboard/data/server/BUILD index 41d3cdf994..c626daba0f 100644 --- a/tensorboard/data/server/BUILD +++ b/tensorboard/data/server/BUILD @@ -30,6 +30,7 @@ rust_library( "cli.rs", "commit.rs", "data_compat.rs", + "disk_logdir.rs", "downsample.rs", "event_file.rs", "logdir.rs", diff --git a/tensorboard/data/server/bench.rs b/tensorboard/data/server/bench.rs index dd409b2cb8..7ad7ba57a4 100644 --- a/tensorboard/data/server/bench.rs +++ b/tensorboard/data/server/bench.rs @@ -21,6 +21,7 @@ use std::path::PathBuf; use std::time::Instant; use rustboard_core::commit::Commit; +use rustboard_core::disk_logdir::DiskLogdir; use rustboard_core::logdir::LogdirLoader; #[derive(Clap)] @@ -44,7 +45,11 @@ fn main() { init_logging(&opts); let commit = Commit::new(); - let mut loader = LogdirLoader::new(&commit, opts.logdir, opts.reload_threads.unwrap_or(0)); + let mut loader = LogdirLoader::new( + &commit, + DiskLogdir::new(opts.logdir), + opts.reload_threads.unwrap_or(0), + ); loader.checksum(opts.checksum); // if neither `--[no-]checksum` given, defaults to false info!("Starting load cycle"); diff --git a/tensorboard/data/server/cli.rs b/tensorboard/data/server/cli.rs index 8c23e95852..fd988d9cf3 100644 --- a/tensorboard/data/server/cli.rs +++ b/tensorboard/data/server/cli.rs @@ -29,6 +29,7 @@ use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::Server; use crate::commit::Commit; +use crate::disk_logdir::DiskLogdir; use crate::logdir::LogdirLoader; use crate::proto::tensorboard::data; use crate::server::DataProviderHandler; @@ -175,9 +176,8 @@ pub async fn main() -> Result<(), Box> { thread::Builder::new() .name("Reloader".to_string()) .spawn({ - let logdir = opts.logdir; let reload_strategy = opts.reload; - let mut loader = LogdirLoader::new(commit, logdir, 0); + let mut loader = LogdirLoader::new(commit, DiskLogdir::new(opts.logdir), 0); // Checksum only if `--checksum` given (i.e., off by default). loader.checksum(opts.checksum); move || loop { diff --git a/tensorboard/data/server/disk_logdir.rs b/tensorboard/data/server/disk_logdir.rs new file mode 100644 index 0000000000..b1c68c0425 --- /dev/null +++ b/tensorboard/data/server/disk_logdir.rs @@ -0,0 +1,102 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +//! Log directories on local disk. + +use log::{error, warn}; +use std::collections::HashMap; +use std::fs::File; +use std::io::{self, BufReader}; +use std::path::{Path, PathBuf}; +use walkdir::WalkDir; + +use crate::logdir::{EventFileBuf, Logdir, EVENT_FILE_BASENAME_INFIX}; +use crate::types::Run; + +/// A log directory on local disk. +pub struct DiskLogdir { + root: PathBuf, +} + +impl DiskLogdir { + /// Creates a `DiskLogdir` with the given root directory. + pub fn new(root: PathBuf) -> Self { + DiskLogdir { root } + } +} + +impl Logdir for DiskLogdir { + type File = BufReader; + + fn discover(&self) -> io::Result>> { + let mut run_map: HashMap> = HashMap::new(); + let walker = WalkDir::new(&self.root) + .sort_by(|a, b| a.file_name().cmp(b.file_name())) + .follow_links(true); + for walkdir_item in walker { + let dirent = match walkdir_item { + Ok(dirent) => dirent, + Err(e) => { + error!("While walking log directory: {}", e); + continue; + } + }; + if !dirent.file_type().is_file() { + continue; + } + let filename = dirent.file_name().to_string_lossy(); + if !filename.contains(EVENT_FILE_BASENAME_INFIX) { + continue; + } + let run_dir = match dirent.path().parent() { + Some(parent) => parent, + None => { + // I don't know of any circumstance where this can happen, but I would believe + // that some weird filesystem can hit it, so just proceed. + warn!( + "Path {} is a file but has no parent", + dirent.path().display() + ); + continue; + } + }; + let mut run_relpath = match run_dir.strip_prefix(&self.root) { + Ok(rp) => rp.to_path_buf(), + Err(_) => { + error!( + "Log directory {} is not a prefix of run directory {}", + &self.root.display(), + &run_dir.display(), + ); + continue; + } + }; + // Render the root run as ".", not "". + if run_relpath == Path::new("") { + run_relpath.push("."); + } + let run = Run(run_relpath.display().to_string()); + run_map + .entry(run) + .or_default() + .push(EventFileBuf(dirent.into_path())); + } + Ok(run_map) + } + + fn open(&self, path: &EventFileBuf) -> io::Result { + File::open(self.root.join(&path.0)).map(BufReader::new) + } +} diff --git a/tensorboard/data/server/lib.rs b/tensorboard/data/server/lib.rs index 38db533e32..5defae22e9 100644 --- a/tensorboard/data/server/lib.rs +++ b/tensorboard/data/server/lib.rs @@ -21,6 +21,7 @@ pub mod blob_key; pub mod cli; pub mod commit; pub mod data_compat; +pub mod disk_logdir; pub mod downsample; pub mod event_file; pub mod logdir; diff --git a/tensorboard/data/server/logdir.rs b/tensorboard/data/server/logdir.rs index 23ff658933..3e99d15ced 100644 --- a/tensorboard/data/server/logdir.rs +++ b/tensorboard/data/server/logdir.rs @@ -15,68 +15,75 @@ limitations under the License. //! Loader for many runs under a directory. -use log::{error, warn}; +use log::warn; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; -use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; -use walkdir::WalkDir; +use std::collections::HashMap; +use std::io::{self, Read}; +use std::path::PathBuf; use crate::commit::Commit; use crate::run::RunLoader; use crate::types::Run; -/// A loader for a log directory, connecting a filesystem to a [`Commit`] via [`RunLoader`]s. -pub struct LogdirLoader<'a> { - thread_pool: rayon::ThreadPool, - commit: &'a Commit, - logdir: PathBuf, - runs: HashMap, - checksum: bool, -} +/// A TensorBoard log directory, with event files organized into runs. +pub trait Logdir { + /// Type of output stream for reading event files under this log directory. + type File: Read; -struct RunState { - /// Stateful loader for this run. - loader: RunLoader, - /// Path to this run's directory relative to the root logdir. - relpath: PathBuf, - /// Logdir-relative paths to other directories that normalize to the same name as this run - /// after lossy Unicode conversion. + /// Finds all event files under the log directory. /// - /// Directories on the filesystem need not be valid Unicode. On Unix, for instance, they're - /// arbitrary byte sequences without `\0` or `/`. But TensorBoard wants run names to be strings - /// for human display. Thus, we apply [`OsStr::to_string_lossy`] to convert run directory paths - /// to run names. As the name suggests, this is a lossy conversion: it replaces non-Unicode - /// sequences with U+FFFD. This creates an edge case wherein two different directories may map - /// to the same run name. In such a case, we merge the two directories into one run, and print - /// a warning. + /// Event files within each run should be emitted in chronological order. Canonically, a file + /// is an event file if its basename contains [`EVENT_FILE_BASENAME_INFIX`] as a substring. /// - /// This set tracks the directories other than `self.relpath` that comprise this run. It is - /// non-empty only when we hit this edge case: i.e., when there are multiple filesystem paths - /// that are invalid Unicode and collide. It's used for deduplicating warnings, so that we - /// don't warn about the directory again on every load cycle. + /// A `Run` whose corresponding `Vec` is empty is interpreted as if the run were + /// absent. + fn discover(&self) -> io::Result>>; + + /// Attempts to open an event file for reading. /// - /// [`OsStr::to_string_lossy`]: std::ffi::OsStr::to_string_lossy - collided_relpaths: HashSet, + /// The `path` should be one of the values returned by a previous call to [`Self::discover`]. + fn open(&self, path: &EventFileBuf) -> io::Result; } -/// Record of an event file found under the log directory. -#[derive(Debug, PartialEq, Eq)] -struct EventFileDiscovery { - run_relpath: PathBuf, - event_file: PathBuf, -} -/// Record of all event files found under the log directory, grouped by run. -/// -/// Each value in the map must be non-empty. +/// An opaque reference to an event file within the context of a specific log directory. /// -/// Not all `run_relpaths` within one `Vec<_>` value need be the same due to lossy paths. See the -/// `collided_relpaths` field of [`RunState`] for details. -struct Discoveries(HashMap>); +/// Event files are represented as [`PathBuf`]s, but the precise semantics are at the discretion of +/// the [`Logdir`] implementation. They may or may not represent paths on the user's physical +/// filesystem. Clients of a [`Logdir`] should treat `EventFileBuf`s as opaque: they should be +/// returned from [`Logdir::discover`] and passed verbatim to [`Logdir::open`], without inspection +/// or modification. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub struct EventFileBuf(pub PathBuf); /// A file is treated as an event file if its basename contains this substring. -const EVENT_FILE_BASENAME_INFIX: &str = "tfevents"; +pub const EVENT_FILE_BASENAME_INFIX: &str = "tfevents"; + +/// A loader for a [`Logdir`], connecting a filesystem to a [`Commit`] via [`RunLoader`]s. +/// +/// A `LogdirLoader` is a stateful object. Its [`reload`][Self::reload] method polls the underlying +/// logdir for the state of the world, reads in all data, and updates the associated commit. Since +/// a `LogdirLoader` only borrows its commit, other clients may read data from the commit as it's +/// updated, possibly concurrently. +pub struct LogdirLoader<'a, L: Logdir> { + /// Thread pool used for loading runs in parallel. + thread_pool: rayon::ThreadPool, + /// Shared reference to a commit, updated by [`Self::reload`]. + commit: &'a Commit, + /// Log directory providing the source of truth for event file data. + logdir: L, + /// Stateful run loaders for all known runs. + runs: HashMap::File>>, + /// Whether new run loaders should unconditionally verify CRCs (see [`RunLoader::checksum`]). + checksum: bool, +} -impl<'a> LogdirLoader<'a> { +type Discoveries = HashMap>; + +impl<'a, L: Logdir> LogdirLoader<'a, L> +where + L: Sync, + ::File: Sync + Send, +{ /// Creates a new, empty logdir loader. Does not load any data. /// /// This constructor is heavyweight: it builds a new thread-pool. The thread pool will be @@ -87,7 +94,7 @@ impl<'a> LogdirLoader<'a> { /// /// If [`rayon::ThreadPoolBuilder::build`] returns an error; should only happen if there is a /// failure to create threads at the OS level. - pub fn new(commit: &'a Commit, logdir: PathBuf, reload_threads: usize) -> Self { + pub fn new(commit: &'a Commit, logdir: L, reload_threads: usize) -> Self { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(reload_threads) .thread_name(|i| format!("Reloader-{:03}", i)) @@ -110,8 +117,6 @@ impl<'a> LogdirLoader<'a> { /// Performs a complete load cycle: finds all event files and reads data from all runs, /// updating the shared commit. /// - /// # Panics - /// /// If any of the commit locks is poisoned, or if a run is removed from the commit by another /// client while this reload is in progress (should not happen if the commit is only being /// updated by a single `LogdirLoader`). @@ -123,60 +128,10 @@ impl<'a> LogdirLoader<'a> { /// Finds all event files under the log directory and groups them by run. fn discover(&self) -> Discoveries { - let mut run_map: HashMap> = HashMap::new(); - let walker = WalkDir::new(&self.logdir) - .sort_by(|a, b| a.file_name().cmp(b.file_name())) - .follow_links(true); - for walkdir_item in walker { - let dirent = match walkdir_item { - Ok(dirent) => dirent, - Err(e) => { - error!("While walking log directory: {}", e); - continue; - } - }; - if !dirent.file_type().is_file() { - continue; - } - let filename = dirent.file_name().to_string_lossy(); - if !filename.contains(EVENT_FILE_BASENAME_INFIX) { - continue; - } - let run_dir = match dirent.path().parent() { - Some(parent) => parent, - None => { - // I don't know of any circumstance where this can happen, but I would believe - // that some weird filesystem can hit it, so just proceed. - warn!( - "Path {} is a file but has no parent", - dirent.path().display() - ); - continue; - } - }; - let mut run_relpath = match run_dir.strip_prefix(&self.logdir) { - Ok(rp) => rp.to_path_buf(), - Err(_) => { - error!( - "Log directory {} is not a prefix of run directory {}", - &self.logdir.display(), - &run_dir.display(), - ); - continue; - } - }; - // Render the root run as ".", not "". - if run_relpath == Path::new("") { - run_relpath.push("."); - } - let run_name = run_relpath.display().to_string(); - let discovery = EventFileDiscovery { - run_relpath, - event_file: dirent.into_path(), - }; - run_map.entry(Run(run_name)).or_default().push(discovery); - } - Discoveries(run_map) + self.logdir.discover().unwrap_or_else(|e| { + warn!("While loading log directory: {}", e); + Default::default() + }) } /// Updates `self.runs` by adding new runs and removing runs all of whose event files have been @@ -186,13 +141,11 @@ impl<'a> LogdirLoader<'a> { /// /// Panics if the `commit.runs` lock is poisoned. fn synchronize_runs(&mut self, discoveries: &Discoveries) { - let discoveries = &discoveries.0; - // Remove runs with no event files. (This could be cleaner and more efficient with // `HashMap::drain_filter`, but that's not yet stabilized.) let mut removed: Vec = Vec::new(); self.runs.retain(|run, _| { - if discoveries.contains_key(run) { + if discoveries.get(run).map_or(false, |fs| !fs.is_empty()) { true } else { removed.push(run.clone()); @@ -220,33 +173,14 @@ impl<'a> LogdirLoader<'a> { } } - // Add new runs, and warn on any path collisions for existing runs. - for (run_name, event_files) in discoveries { + // Add new runs. + for run_name in discoveries.keys() { let checksum = self.checksum; - let run = self - .runs - .entry(run_name.clone()) - .or_insert_with(|| RunState { - // Values of `discoveries` are non-empty by construction, so it's safe to take the - // first relpath. - relpath: event_files[0].run_relpath.clone(), - loader: { - let mut loader = RunLoader::new(run_name.clone()); - loader.checksum(checksum); - loader - }, - collided_relpaths: HashSet::new(), - }); - for ef in event_files { - if ef.run_relpath != run.relpath && !run.collided_relpaths.contains(&ef.run_relpath) - { - warn!( - "Merging directories {:?} and {:?}, which both normalize to run {:?}", - run.relpath, ef.run_relpath, run_name.0 - ); - run.collided_relpaths.insert(ef.run_relpath.clone()); // don't warn again - } - } + self.runs.entry(run_name.clone()).or_insert_with(|| { + let mut loader = RunLoader::new(run_name.clone()); + loader.checksum(checksum); + loader + }); } } @@ -256,8 +190,7 @@ impl<'a> LogdirLoader<'a> { /// /// Panics if a run in `self.runs` has no entry in `discoveries`, which should only happen if /// `synchronize_runs(&discoveries)` was not called. Panics if any run loader panics. - fn load_runs(&mut self, discoveries: Discoveries) { - let mut discoveries = discoveries.0; + fn load_runs(&mut self, mut discoveries: Discoveries) { let commit_runs = self .commit .runs @@ -265,11 +198,10 @@ impl<'a> LogdirLoader<'a> { .expect("could not acquire runs.data"); let mut work_items = Vec::new(); - for (run, run_state) in self.runs.iter_mut() { - let event_files = discoveries + for (run, loader) in self.runs.iter_mut() { + let filenames = discoveries .remove(run) .unwrap_or_else(|| panic!("run in self.runs but not discovered: {:?}", run)); - let filenames: Vec = event_files.into_iter().map(|d| d.event_file).collect(); let run_data = commit_runs.get(run).unwrap_or_else(|| { panic!( "run in self.runs but not in commit.runs \ @@ -277,12 +209,15 @@ impl<'a> LogdirLoader<'a> { run ) }); - work_items.push((&mut run_state.loader, filenames, run_data)); + work_items.push((loader, filenames, run_data)); } + let logdir = &self.logdir; self.thread_pool.install(|| { work_items .into_par_iter() - .for_each(|(loader, filenames, run_data)| loader.reload(filenames, run_data)); + .for_each(|(loader, filenames, run_data)| { + loader.reload(logdir, filenames, run_data); + }); }); } } @@ -290,8 +225,10 @@ impl<'a> LogdirLoader<'a> { #[cfg(test)] mod tests { use super::*; + use std::collections::HashSet; use std::fs::{self, File}; + use crate::disk_logdir::DiskLogdir; use crate::types::{Step, Tag, WallTime}; use crate::writer::SummaryWriteExt; @@ -335,13 +272,10 @@ mod tests { let root_run = Run(".".to_string()); let train_run = Run(format!("mnist{}train", std::path::MAIN_SEPARATOR)); let test_run = Run(format!("mnist{}test", std::path::MAIN_SEPARATOR)); - // expected logdir-relative paths - let root_relpath: PathBuf = ["."].iter().collect(); - let train_relpath: PathBuf = ["mnist", "train"].iter().collect(); - let test_relpath: PathBuf = ["mnist", "test"].iter().collect(); let commit = Commit::new(); - let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1); + let logdir = DiskLogdir::new(logdir.path().to_path_buf()); + let mut loader = LogdirLoader::new(&commit, logdir, 1); // Check that we persist the right run states in the loader. loader.reload(); @@ -349,12 +283,6 @@ mod tests { .into_iter() .collect::>(); assert_eq!(loader.runs.keys().collect::>(), expected_runs); - assert_eq!(&loader.runs[&root_run].relpath, &root_relpath); - assert_eq!(&loader.runs[&train_run].relpath, &train_relpath); - assert_eq!(&loader.runs[&test_run].relpath, &test_relpath); - for run_state in loader.runs.values() { - assert!(run_state.collided_relpaths.is_empty()); // no bad Unicode - } // Check that we persist the right run states in the commit. let mut expected_data = HashMap::new(); @@ -401,7 +329,8 @@ mod tests { )?; let commit = Commit::new(); - let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1); + let logdir = DiskLogdir::new(logdir.path().to_path_buf()); + let mut loader = LogdirLoader::new(&commit, logdir, 1); let get_run_names = || { let runs_store = commit.runs.read().unwrap(); @@ -440,44 +369,6 @@ mod tests { Ok(()) } - #[cfg(target_os = "linux")] // macOS seems to sometimes give EILSEQ on non-UTF-8 filenames - #[test] - fn test_bad_unicode_collision() -> Result<(), Box> { - use std::ffi::OsStr; - use std::os::unix::ffi::OsStrExt; - - // Generate a bad-Unicode collision. - let bad1 = Path::new(OsStr::from_bytes(&b"test\x99.run"[..])); - let bad2 = Path::new(OsStr::from_bytes(&b"test\xee.run"[..])); - let run1 = Run(bad1.to_string_lossy().into_owned()); - let run2 = Run(bad2.to_string_lossy().into_owned()); - assert_ne!(bad1, bad2); - assert_eq!(run1, run2); - let run = run1; - drop(run2); - - let logdir = tempfile::tempdir()?; - for &dir_basename in &[bad1, bad2] { - let dir = logdir.path().join(dir_basename); - fs::create_dir(&dir)?; - File::create(dir.join(EVENT_FILE_BASENAME_INFIX))?; - } - - let commit = Commit::new(); - let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1); - loader.reload(); - - assert_eq!(loader.runs.keys().collect::>(), vec![&run]); - let run_state = &loader.runs[&run]; - assert!(bad1 < bad2); - assert_eq!(run_state.relpath, bad1); - let mut expected_relpaths = HashSet::new(); - expected_relpaths.insert(bad2.to_path_buf()); - assert_eq!(run_state.collided_relpaths, expected_relpaths); - - Ok(()) - } - #[cfg(unix)] #[test] fn test_symlink() -> Result<(), Box> { @@ -489,7 +380,8 @@ mod tests { File::create(train_dir.join(EVENT_FILE_BASENAME_INFIX))?; let commit = Commit::new(); - let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1); + let logdir = DiskLogdir::new(logdir.path().to_path_buf()); + let mut loader = LogdirLoader::new(&commit, logdir, 1); loader.reload(); assert_eq!( @@ -511,7 +403,8 @@ mod tests { std::os::unix::fs::symlink(&dir2, &dir1)?; let commit = Commit::new(); - let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1); + let logdir = DiskLogdir::new(logdir.path().to_path_buf()); + let mut loader = LogdirLoader::new(&commit, logdir, 1); loader.reload(); // should not hang Ok(()) } diff --git a/tensorboard/data/server/run.rs b/tensorboard/data/server/run.rs index db68bfa715..b9795d041b 100644 --- a/tensorboard/data/server/run.rs +++ b/tensorboard/data/server/run.rs @@ -17,25 +17,24 @@ limitations under the License. use log::{debug, warn}; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::fs::File; -use std::io::BufReader; -use std::path::PathBuf; +use std::io::Read; use std::sync::RwLock; use std::time::{Duration, Instant}; use crate::commit; use crate::data_compat::{EventValue, GraphDefValue, SummaryValue, TaggedRunMetadataValue}; use crate::event_file::EventFileReader; +use crate::logdir::{EventFileBuf, Logdir}; use crate::proto::tensorboard as pb; use crate::reservoir::StageReservoir; use crate::types::{Run, Step, Tag, WallTime}; /// A loader to accumulate reservoir-sampled events in a single TensorBoard run. /// -/// For now, a run loader always reads from [`File`]s on disk. In the future, this may be -/// parameterized over a filesystem interface. +/// The type `R` represents the types of files read from disk, as in the +/// [`Logdir::File`][crate::logdir::Logdir::File] associated type. #[derive(Debug)] -pub struct RunLoader { +pub struct RunLoader { /// The run name associated with this loader. Used primarily for logging; the run name is /// canonically defined by the map key under which this `RunLoader` is stored in `LogdirLoader`. run: Run, @@ -45,7 +44,7 @@ pub struct RunLoader { /// with actual start time. See [`EventFile::Dead`] for conditions under which an event file /// may be dead. Once an event file is added to this map, it may become dead, but it will not /// be removed entirely. This way, we know not to just re-open it again at the next load cycle. - files: BTreeMap>>, + files: BTreeMap>, /// Whether to compute CRCs for records before parsing as protos. checksum: bool, @@ -161,7 +160,7 @@ impl StageTimeSeries { /// Minimum time to wait between committing while a run is still loading. const COMMIT_INTERVAL: Duration = Duration::from_secs(5); -impl RunLoader { +impl RunLoader { pub fn new(run: Run) -> Self { Self { run, @@ -186,11 +185,16 @@ impl RunLoader { /// # Panics /// /// If we need to access `run_data` but the lock is poisoned. - pub fn reload(&mut self, filenames: Vec, run_data: &RwLock) { + pub fn reload( + &mut self, + logdir: &impl Logdir, + filenames: Vec, + run_data: &RwLock, + ) { let run_name = self.run.0.clone(); debug!("Starting load for run {:?}", run_name); let start = Instant::now(); - self.update_file_set(filenames); + self.update_file_set(logdir, filenames); let mut n = 0; let mut last_commit_time = Instant::now(); self.reload_files(|run_loader_data, event| { @@ -220,9 +224,9 @@ impl RunLoader { /// /// After this function returns, `self.files` may still have keys not in `filenames`, but they /// will all map to [`EventFile::Dead`]. - fn update_file_set(&mut self, filenames: Vec) { + fn update_file_set(&mut self, logdir: &impl Logdir, filenames: Vec) { // Remove any discarded files. - let new_file_set: HashSet<&PathBuf> = filenames.iter().collect(); + let new_file_set: HashSet<&EventFileBuf> = filenames.iter().collect(); for (k, v) in self.files.iter_mut() { if !new_file_set.contains(k) { *v = EventFile::Dead; @@ -235,9 +239,9 @@ impl RunLoader { match self.files.entry(filename) { Entry::Occupied(_) => {} Entry::Vacant(v) => { - let event_file = match File::open(v.key()) { + let event_file = match logdir.open(v.key()) { Ok(file) => { - let mut reader = EventFileReader::new(BufReader::new(file)); + let mut reader = EventFileReader::new(file); reader.checksum(self.checksum); EventFile::Active(reader) } @@ -269,7 +273,7 @@ impl RunLoader { Err(ReadRecordError(Truncated)) => break, Err(e) => { // TODO(@wchargin): Improve error handling? - warn!("Read error in {}: {:?}", filename.display(), e); + warn!("Read error in {}: {:?}", filename.0.display(), e); *ef = EventFile::Dead; break; } @@ -376,6 +380,7 @@ mod test { use crate::commit::Commit; use crate::data_compat::plugin_names; + use crate::disk_logdir::DiskLogdir; use crate::types::Run; use crate::writer::SummaryWriteExt; @@ -424,13 +429,18 @@ mod test { f2.into_inner()?.sync_all()?; let mut loader = RunLoader::new(run.clone()); + let logdir = DiskLogdir::new(logdir.path().to_path_buf()); let commit = Commit::new(); commit .runs .write() .expect("write-locking runs map") .insert(run.clone(), Default::default()); - loader.reload(vec![f1_name, f2_name], &commit.runs.read().unwrap()[&run]); + loader.reload( + &logdir, + vec![EventFileBuf(f1_name), EventFileBuf(f2_name)], + &commit.runs.read().unwrap()[&run], + ); // Start time should be that of the file version event, even though that didn't correspond // to any time series.