diff --git a/tensorboard/data/server/logdir.rs b/tensorboard/data/server/logdir.rs index 10cab8e93e..dfac6fb760 100644 --- a/tensorboard/data/server/logdir.rs +++ b/tensorboard/data/server/logdir.rs @@ -214,7 +214,7 @@ impl<'a> LogdirLoader<'a> { // first relpath. relpath: event_files[0].run_relpath.clone(), loader: { - let mut loader = RunLoader::new(); + let mut loader = RunLoader::new(run_name.clone()); loader.checksum(checksum); loader }, diff --git a/tensorboard/data/server/run.rs b/tensorboard/data/server/run.rs index f3e07de4cf..db68bfa715 100644 --- a/tensorboard/data/server/run.rs +++ b/tensorboard/data/server/run.rs @@ -15,19 +15,20 @@ limitations under the License. //! Loader for a single run, with one or more event files. -use log::warn; +use log::{debug, warn}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; use std::io::BufReader; use std::path::PathBuf; use std::sync::RwLock; +use std::time::{Duration, Instant}; use crate::commit; use crate::data_compat::{EventValue, GraphDefValue, SummaryValue, TaggedRunMetadataValue}; use crate::event_file::EventFileReader; use crate::proto::tensorboard as pb; use crate::reservoir::StageReservoir; -use crate::types::{Step, Tag, WallTime}; +use crate::types::{Run, Step, Tag, WallTime}; /// A loader to accumulate reservoir-sampled events in a single TensorBoard run. /// @@ -35,13 +36,9 @@ use crate::types::{Step, Tag, WallTime}; /// parameterized over a filesystem interface. #[derive(Debug)] pub struct RunLoader { - /// The earliest event `wall_time` seen in any event file in this run. - /// - /// This is `None` if and only if no events have been seen. Its value may decrease as new - /// events are read, but in practice this is expected to be the wall time of the first - /// `file_version` event in the first event file. - start_time: Option, - + /// The run name associated with this loader. Used primarily for logging; the run name is + /// canonically defined by the map key under which this `RunLoader` is stored in `LogdirLoader`. + run: Run, /// The event files in this run. /// /// Event files are sorted and read lexicographically by name, which is designed to coincide @@ -50,11 +47,13 @@ pub struct RunLoader { /// be removed entirely. This way, we know not to just re-open it again at the next load cycle. files: BTreeMap>>, - /// Reservoir-sampled data and metadata for each time series. - time_series: HashMap, - /// Whether to compute CRCs for records before parsing as protos. checksum: bool, + + /// The data staged by this `RunLoader`. This is encapsulated in a sub-struct so that these + /// fields can be reborrowed within `reload_files` in a context that already has an exclusive + /// reference into `self.files`, and hence can't call methods on the whole of `&mut self`. + data: RunLoaderData, } #[derive(Debug)] @@ -69,6 +68,20 @@ enum EventFile { Dead, } +/// Holds data staged by a `RunLoader` that will be committed to the `Commit`. +#[derive(Debug, Default)] +struct RunLoaderData { + /// The earliest event `wall_time` seen in any event file in this run. + /// + /// This is `None` if and only if no events have been seen. Its value may decrease as new + /// events are read, but in practice this is expected to be the wall time of the first + /// `file_version` event in the first event file. + start_time: Option, + + /// Reservoir-sampled data and metadata for each time series. + time_series: HashMap, +} + #[derive(Debug)] struct StageTimeSeries { data_class: pb::DataClass, @@ -145,13 +158,16 @@ impl StageTimeSeries { } } +/// Minimum time to wait between committing while a run is still loading. +const COMMIT_INTERVAL: Duration = Duration::from_secs(5); + impl RunLoader { - pub fn new() -> Self { + pub fn new(run: Run) -> Self { Self { - start_time: None, + run, files: BTreeMap::new(), - time_series: HashMap::new(), checksum: true, + data: RunLoaderData::default(), } } @@ -171,9 +187,33 @@ impl RunLoader { /// /// If we need to access `run_data` but the lock is poisoned. pub fn reload(&mut self, filenames: Vec, run_data: &RwLock) { + let run_name = self.run.0.clone(); + debug!("Starting load for run {:?}", run_name); + let start = Instant::now(); self.update_file_set(filenames); - self.reload_files(); - self.commit_all(run_data); + let mut n = 0; + let mut last_commit_time = Instant::now(); + self.reload_files(|run_loader_data, event| { + run_loader_data.read_event(event); + n += 1; + // Reduce overhead of checking elapsed time by only doing it every 100 events. + if n % 100 == 0 && last_commit_time.elapsed() >= COMMIT_INTERVAL { + debug!( + "Loaded {} events for run {:?} after {:?}", + n, + run_name, + start.elapsed() + ); + run_loader_data.commit_all(run_data); + last_commit_time = Instant::now(); + } + }); + self.data.commit_all(run_data); + debug!( + "Finished load for run {:?} ({:?})", + run_name, + start.elapsed() + ); } /// Updates the active key set of `self.files` to match the given filenames. @@ -213,8 +253,8 @@ impl RunLoader { } } - /// Reads data from all active event files. - fn reload_files(&mut self) { + /// Reads data from all active event files, and calls a handler for each event. + fn reload_files(&mut self, mut handle_event: F) { for (filename, ef) in self.files.iter_mut() { let reader = match ef { EventFile::Dead => continue, @@ -234,11 +274,14 @@ impl RunLoader { break; } }; - read_event(&mut self.time_series, &mut self.start_time, event); + handle_event(&mut self.data, event); } } } +} +impl RunLoaderData { + /// Commits all staged data into the given run of the commit. fn commit_all(&mut self, run_data: &RwLock) { let mut run = run_data.write().expect("acquiring tags lock"); run.start_time = self.start_time; @@ -246,92 +289,82 @@ impl RunLoader { ts.commit(tag, &mut *run); } } -} -/// Reads a single event into the structures of a run loader. -/// -/// This is a standalone function because it's called from `reload_files` in a context that already -/// has an exclusive reference into `self.files`, and so can't call methods on `&mut self`. -fn read_event( - time_series: &mut HashMap, - start_time: &mut Option, - e: pb::Event, -) { - let step = Step(e.step); - let wall_time = match WallTime::new(e.wall_time) { - None => { - // TODO(@wchargin): Improve error handling. - warn!( - "Dropping event at step {} with invalid wall time {}", - e.step, e.wall_time - ); - return; - } - Some(wt) => wt, - }; - if start_time.map(|start| wall_time < start).unwrap_or(true) { - *start_time = Some(wall_time); - } - match e.what { - Some(pb::event::What::GraphDef(graph_bytes)) => { - let sv = StageValue { - wall_time, - payload: EventValue::GraphDef(GraphDefValue(graph_bytes)), - }; - use std::collections::hash_map::Entry; - let ts = match time_series.entry(Tag(GraphDefValue::TAG_NAME.to_string())) { - Entry::Occupied(o) => o.into_mut(), - Entry::Vacant(v) => { - v.insert(StageTimeSeries::new(GraphDefValue::initial_metadata())) - } - }; - ts.rsv.offer(step, sv); - } - Some(pb::event::What::TaggedRunMetadata(trm_proto)) => { - let sv = StageValue { - wall_time, - payload: EventValue::GraphDef(GraphDefValue(trm_proto.run_metadata)), - }; - use std::collections::hash_map::Entry; - let ts = match time_series.entry(Tag(trm_proto.tag)) { - Entry::Occupied(o) => o.into_mut(), - Entry::Vacant(v) => { - let metadata = TaggedRunMetadataValue::initial_metadata(); - v.insert(StageTimeSeries::new(metadata)) - } - }; - ts.rsv.offer(step, sv); + /// Reads a single event and stages it for future committing. + fn read_event(&mut self, e: pb::Event) { + let step = Step(e.step); + let wall_time = match WallTime::new(e.wall_time) { + None => { + // TODO(@wchargin): Improve error handling. + warn!( + "Dropping event at step {} with invalid wall time {}", + e.step, e.wall_time + ); + return; + } + Some(wt) => wt, + }; + if self.start_time.map_or(true, |start| wall_time < start) { + self.start_time = Some(wall_time); } - Some(pb::event::What::Summary(sum)) => { - for mut summary_pb_value in sum.value { - let summary_value = match summary_pb_value.value { - None => continue, - Some(v) => SummaryValue(Box::new(v)), + match e.what { + Some(pb::event::What::GraphDef(graph_bytes)) => { + let sv = StageValue { + wall_time, + payload: EventValue::GraphDef(GraphDefValue(graph_bytes)), }; - use std::collections::hash_map::Entry; - let ts = match time_series.entry(Tag(summary_pb_value.tag)) { + let ts = match self + .time_series + .entry(Tag(GraphDefValue::TAG_NAME.to_string())) + { Entry::Occupied(o) => o.into_mut(), Entry::Vacant(v) => { - let metadata = - summary_value.initial_metadata(summary_pb_value.metadata.take()); - v.insert(StageTimeSeries::new(metadata)) + v.insert(StageTimeSeries::new(GraphDefValue::initial_metadata())) } }; + ts.rsv.offer(step, sv); + } + Some(pb::event::What::TaggedRunMetadata(trm_proto)) => { let sv = StageValue { wall_time, - payload: EventValue::Summary(summary_value), + payload: EventValue::GraphDef(GraphDefValue(trm_proto.run_metadata)), + }; + use std::collections::hash_map::Entry; + let ts = match self.time_series.entry(Tag(trm_proto.tag)) { + Entry::Occupied(o) => o.into_mut(), + Entry::Vacant(v) => { + let metadata = TaggedRunMetadataValue::initial_metadata(); + v.insert(StageTimeSeries::new(metadata)) + } }; ts.rsv.offer(step, sv); } - } - _ => {} - } -} + Some(pb::event::What::Summary(sum)) => { + for mut summary_pb_value in sum.value { + let summary_value = match summary_pb_value.value { + None => continue, + Some(v) => SummaryValue(Box::new(v)), + }; -impl Default for RunLoader { - fn default() -> Self { - Self::new() + use std::collections::hash_map::Entry; + let ts = match self.time_series.entry(Tag(summary_pb_value.tag)) { + Entry::Occupied(o) => o.into_mut(), + Entry::Vacant(v) => { + let metadata = + summary_value.initial_metadata(summary_pb_value.metadata.take()); + v.insert(StageTimeSeries::new(metadata)) + } + }; + let sv = StageValue { + wall_time, + payload: EventValue::Summary(summary_value), + }; + ts.rsv.offer(step, sv); + } + } + _ => {} + } } } @@ -390,7 +423,7 @@ mod test { f1.into_inner()?.sync_all()?; f2.into_inner()?.sync_all()?; - let mut loader = RunLoader::new(); + let mut loader = RunLoader::new(run.clone()); let commit = Commit::new(); commit .runs @@ -401,7 +434,7 @@ mod test { // Start time should be that of the file version event, even though that didn't correspond // to any time series. - assert_eq!(loader.start_time, Some(WallTime::new(1234.0).unwrap())); + assert_eq!(loader.data.start_time, Some(WallTime::new(1234.0).unwrap())); let runs = commit.runs.read().expect("read-locking runs map"); let run_data: &commit::RunData = &*runs diff --git a/tensorboard/data/server_ingester.py b/tensorboard/data/server_ingester.py index 18664fef38..28341e9fee 100644 --- a/tensorboard/data/server_ingester.py +++ b/tensorboard/data/server_ingester.py @@ -86,6 +86,8 @@ def start(self): ] if logger.isEnabledFor(logging.INFO): args.append("--verbose") + if logger.isEnabledFor(logging.DEBUG): + args.append("--verbose") # Repeat arg to increase verbosity. logger.info("Spawning data server: %r", args) popen = subprocess.Popen(args, stdin=subprocess.PIPE)