Skip to content
2 changes: 1 addition & 1 deletion tensorboard/data/server/logdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl<'a> LogdirLoader<'a> {
// first relpath.
relpath: event_files[0].run_relpath.clone(),
loader: {
let mut loader = RunLoader::new();
let mut loader = RunLoader::new(run_name.clone());
loader.checksum(checksum);
loader
},
Expand Down
221 changes: 127 additions & 94 deletions tensorboard/data/server/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,30 @@ limitations under the License.

//! Loader for a single run, with one or more event files.

use log::warn;
use log::{debug, warn};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::RwLock;
use std::time::{Duration, Instant};

use crate::commit;
use crate::data_compat::{EventValue, GraphDefValue, SummaryValue, TaggedRunMetadataValue};
use crate::event_file::EventFileReader;
use crate::proto::tensorboard as pb;
use crate::reservoir::StageReservoir;
use crate::types::{Step, Tag, WallTime};
use crate::types::{Run, Step, Tag, WallTime};

/// A loader to accumulate reservoir-sampled events in a single TensorBoard run.
///
/// For now, a run loader always reads from [`File`]s on disk. In the future, this may be
/// parameterized over a filesystem interface.
#[derive(Debug)]
pub struct RunLoader {
/// The earliest event `wall_time` seen in any event file in this run.
///
/// This is `None` if and only if no events have been seen. Its value may decrease as new
/// events are read, but in practice this is expected to be the wall time of the first
/// `file_version` event in the first event file.
start_time: Option<WallTime>,

/// The run name associated with this loader. Used primarily for logging; the run name is
/// canonically defined by the map key under which this `RunLoader` is stored in `LogdirLoader`.
run: Run,
/// The event files in this run.
///
/// Event files are sorted and read lexicographically by name, which is designed to coincide
Expand All @@ -50,11 +47,13 @@ pub struct RunLoader {
/// be removed entirely. This way, we know not to just re-open it again at the next load cycle.
files: BTreeMap<PathBuf, EventFile<BufReader<File>>>,

/// Reservoir-sampled data and metadata for each time series.
time_series: HashMap<Tag, StageTimeSeries>,

/// Whether to compute CRCs for records before parsing as protos.
checksum: bool,

/// The data staged by this `RunLoader`. This is encapsulated in a sub-struct so that these
/// fields can be reborrowed within `reload_files` in a context that already has an exclusive
/// reference into `self.files`, and hence can't call methods on the whole of `&mut self`.
data: RunLoaderData,
}

#[derive(Debug)]
Expand All @@ -69,6 +68,20 @@ enum EventFile<R> {
Dead,
}

/// Holds data staged by a `RunLoader` that will be committed to the `Commit`.
#[derive(Debug, Default)]
struct RunLoaderData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of extracting this type.

When first writing this module, I considered separating RunLoader into
files and data and putting methods on data, to cut down on
free-floating functions taking a bunch of &mut arguments. It seems
like that’s about what you’re doing here, so, some observations:

  • You’ve written this semantically as “the data that will be
    committed”, which makes sense. You could also write it more
    structurally as “the data that isn’t in files, since that’s the
    field into which we like to take long-lived mutable references”.
    I.e.: you could include self.run and self.checksum here.

    Doing so avoids the clone of run_name, which obviously isn’t a big
    performance hit, but makes me wonder whether the structure might be
    nice. It also simplifies the callback structure a bit (see below).

  • The free-standing fn commit_all and fn read_event functions are
    nicely set up to be &mut self methods on RunLoaderData, which I
    viewed as one of the benefits of this structure in the first place.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about how the functions could be methods, I hadn't thought that far. Can do.

Re: the scope of the data structure, I did briefly think about which fields to include and it seemed like a more semantically coherent boundary to define it as the data that will be committed. Defining as just "everything except files" felt odd to me even if functionally it's the most convenient. In particular even though cloning run is a bit ugly, semantically it just feels quite weird to make run a field on self.data instead of the top-level RunLoader, so I guess I preferred putting the weirdness inside the method bodies versus into the struct layout. Similarly, it seems like semantically self.checksum would be more naturally located alongside self.files since it only affects the file-reading half of RunLoader, not the data-buffering half.

I guess in the purest sense we want something like a tripartite structure in which we have "effectively constant metadata" like self.run and self.checksum grouped in one substructure, and and then two different mutable substructures, self.files and self.data, so that all three can be reborrowed independently from the parent struct, but that seems... overcomplicated? Would be nice to have some way to help the borrow checker resolve this without as much reshuffling of the actual struct fields themselves.

Thoughts?

Copy link
Contributor

@wchargin wchargin Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense and seems reasonable to me. No strong opinion;
feel free to proceed as desired.

Would be nice to have some way to help the borrow checker resolve this without as much reshuffling of the actual struct fields themselves.

You may be interested in @nikomatsakis’s article on interprocedural
conflicts
. (Context: “NLL” = “non-lexical lifetimes”, referring to
a set of changes to the borrow checker that taught it to accept more kinds of
valid code, making it much easier to write programs that compile.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted the free functions into methods.

Yeah, that article pretty much sums up the understanding I had come to as well. The "View" thing is interesting, I could see that being fairly effective for a more complex situation but it definitely isn't lightweight. Thanks for sharing!

/// The earliest event `wall_time` seen in any event file in this run.
///
/// This is `None` if and only if no events have been seen. Its value may decrease as new
/// events are read, but in practice this is expected to be the wall time of the first
/// `file_version` event in the first event file.
start_time: Option<WallTime>,

/// Reservoir-sampled data and metadata for each time series.
time_series: HashMap<Tag, StageTimeSeries>,
}

#[derive(Debug)]
struct StageTimeSeries {
data_class: pb::DataClass,
Expand Down Expand Up @@ -145,13 +158,16 @@ impl StageTimeSeries {
}
}

/// Minimum time to wait between committing while a run is still loading.
const COMMIT_INTERVAL: Duration = Duration::from_secs(5);

impl RunLoader {
pub fn new() -> Self {
pub fn new(run: Run) -> Self {
Self {
start_time: None,
run,
files: BTreeMap::new(),
time_series: HashMap::new(),
checksum: true,
data: RunLoaderData::default(),
}
}

Expand All @@ -171,9 +187,33 @@ impl RunLoader {
///
/// If we need to access `run_data` but the lock is poisoned.
pub fn reload(&mut self, filenames: Vec<PathBuf>, run_data: &RwLock<commit::RunData>) {
let run_name = self.run.0.clone();
debug!("Starting load for run {:?}", run_name);
let start = Instant::now();
self.update_file_set(filenames);
self.reload_files();
self.commit_all(run_data);
let mut n = 0;
let mut last_commit_time = Instant::now();
self.reload_files(|run_loader_data, event| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on inlining this callback into reload_files? I don’t mind
callbacks in general, but they are a bit more complicated than not using
them. If self.run moves to self.data.run, then you only need to pass
in self.reload_files(run_data, &start), which feels pretty reasonable
to me: it’s natural that reload_files should take the destination, and
start is just for instrumentation.

Then reload reads like:

let start = Instant::now();

self.update_file_set(filenames);
self.reload_files(run_data, &run_name, &start);
self.data.commit_all(run_data);
// ^ maybe move this into `reload_files`, too? maybe; could go either way

debug!(
    "Finished load for run {:?} ({:?})",
    self.data.run.0,
    start.elapsed()
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. So I arrived at this because I liked reload_files() better with the business logic of what to do with the event and how often to commit abstracted out of it. In particular part of my reasoning was that if we wanted to parameterize the 5 second interval or do something fancier than a fixed interval (like triggering based on the event details itself), that configuration would all need to be plumbed into reload_files() either as their own parameters or as fields on self, and encapsulating it via the callback felt prettier.

As a halfway solution, I originally did have commit_all() called in reload_files() and was trying to pass in some kind of smaller "check if I need to commit yet" predicate instead, but then I remembered that abstractions are supposed to be free in Rust so I figured hey maybe I can just get away with calling a callback closure on every event :)

I'll grant it makes reload() a little messier; but I liked being able to read the sequence of commit steps and the timing details all in one function. So if you aren't particularly opposed I think I'm happier with it this way.

Copy link
Contributor

@wchargin wchargin Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems reasonable to me. Sold.

abstractions are supposed to be free in Rust so I figured hey maybe I can just get away with calling a callback closure on every event :)

@chandlerc might contest that there are no zero-cost abstractions,
and that (Rust) closures have no run-time cost but do have human cost.
:-) But yeah, I’m okay with them here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No such thing as a free lunch after all huh 😿

run_loader_data.read_event(event);
n += 1;
// Reduce overhead of checking elapsed time by only doing it every 100 events.
if n % 100 == 0 && last_commit_time.elapsed() >= COMMIT_INTERVAL {
debug!(
"Loaded {} events for run {:?} after {:?}",
n,
run_name,
start.elapsed()
);
run_loader_data.commit_all(run_data);
last_commit_time = Instant::now();
}
});
self.data.commit_all(run_data);
debug!(
"Finished load for run {:?} ({:?})",
run_name,
start.elapsed()
);
}

/// Updates the active key set of `self.files` to match the given filenames.
Expand Down Expand Up @@ -213,8 +253,8 @@ impl RunLoader {
}
}

/// Reads data from all active event files.
fn reload_files(&mut self) {
/// Reads data from all active event files, and calls a handler for each event.
fn reload_files<F: FnMut(&mut RunLoaderData, pb::Event)>(&mut self, mut handle_event: F) {
for (filename, ef) in self.files.iter_mut() {
let reader = match ef {
EventFile::Dead => continue,
Expand All @@ -234,104 +274,97 @@ impl RunLoader {
break;
}
};
read_event(&mut self.time_series, &mut self.start_time, event);
handle_event(&mut self.data, event);
}
}
}
}

impl RunLoaderData {
/// Commits all staged data into the given run of the commit.
fn commit_all(&mut self, run_data: &RwLock<commit::RunData>) {
let mut run = run_data.write().expect("acquiring tags lock");
run.start_time = self.start_time;
for (tag, ts) in &mut self.time_series {
ts.commit(tag, &mut *run);
}
}
}

/// Reads a single event into the structures of a run loader.
///
/// This is a standalone function because it's called from `reload_files` in a context that already
/// has an exclusive reference into `self.files`, and so can't call methods on `&mut self`.
fn read_event(
time_series: &mut HashMap<Tag, StageTimeSeries>,
start_time: &mut Option<WallTime>,
e: pb::Event,
) {
let step = Step(e.step);
let wall_time = match WallTime::new(e.wall_time) {
None => {
// TODO(@wchargin): Improve error handling.
warn!(
"Dropping event at step {} with invalid wall time {}",
e.step, e.wall_time
);
return;
}
Some(wt) => wt,
};
if start_time.map(|start| wall_time < start).unwrap_or(true) {
*start_time = Some(wall_time);
}
match e.what {
Some(pb::event::What::GraphDef(graph_bytes)) => {
let sv = StageValue {
wall_time,
payload: EventValue::GraphDef(GraphDefValue(graph_bytes)),
};
use std::collections::hash_map::Entry;
let ts = match time_series.entry(Tag(GraphDefValue::TAG_NAME.to_string())) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
v.insert(StageTimeSeries::new(GraphDefValue::initial_metadata()))
}
};
ts.rsv.offer(step, sv);
}
Some(pb::event::What::TaggedRunMetadata(trm_proto)) => {
let sv = StageValue {
wall_time,
payload: EventValue::GraphDef(GraphDefValue(trm_proto.run_metadata)),
};
use std::collections::hash_map::Entry;
let ts = match time_series.entry(Tag(trm_proto.tag)) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
let metadata = TaggedRunMetadataValue::initial_metadata();
v.insert(StageTimeSeries::new(metadata))
}
};
ts.rsv.offer(step, sv);
/// Reads a single event and stages it for future committing.
fn read_event(&mut self, e: pb::Event) {
let step = Step(e.step);
let wall_time = match WallTime::new(e.wall_time) {
None => {
// TODO(@wchargin): Improve error handling.
warn!(
"Dropping event at step {} with invalid wall time {}",
e.step, e.wall_time
);
return;
}
Some(wt) => wt,
};
if self.start_time.map_or(true, |start| wall_time < start) {
self.start_time = Some(wall_time);
}
Some(pb::event::What::Summary(sum)) => {
for mut summary_pb_value in sum.value {
let summary_value = match summary_pb_value.value {
None => continue,
Some(v) => SummaryValue(Box::new(v)),
match e.what {
Some(pb::event::What::GraphDef(graph_bytes)) => {
let sv = StageValue {
wall_time,
payload: EventValue::GraphDef(GraphDefValue(graph_bytes)),
};

use std::collections::hash_map::Entry;
let ts = match time_series.entry(Tag(summary_pb_value.tag)) {
let ts = match self
.time_series
.entry(Tag(GraphDefValue::TAG_NAME.to_string()))
{
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
let metadata =
summary_value.initial_metadata(summary_pb_value.metadata.take());
v.insert(StageTimeSeries::new(metadata))
v.insert(StageTimeSeries::new(GraphDefValue::initial_metadata()))
}
};
ts.rsv.offer(step, sv);
}
Some(pb::event::What::TaggedRunMetadata(trm_proto)) => {
let sv = StageValue {
wall_time,
payload: EventValue::Summary(summary_value),
payload: EventValue::GraphDef(GraphDefValue(trm_proto.run_metadata)),
};
use std::collections::hash_map::Entry;
let ts = match self.time_series.entry(Tag(trm_proto.tag)) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
let metadata = TaggedRunMetadataValue::initial_metadata();
v.insert(StageTimeSeries::new(metadata))
}
};
ts.rsv.offer(step, sv);
}
}
_ => {}
}
}
Some(pb::event::What::Summary(sum)) => {
for mut summary_pb_value in sum.value {
let summary_value = match summary_pb_value.value {
None => continue,
Some(v) => SummaryValue(Box::new(v)),
};

impl Default for RunLoader {
fn default() -> Self {
Self::new()
use std::collections::hash_map::Entry;
let ts = match self.time_series.entry(Tag(summary_pb_value.tag)) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
let metadata =
summary_value.initial_metadata(summary_pb_value.metadata.take());
v.insert(StageTimeSeries::new(metadata))
}
};
let sv = StageValue {
wall_time,
payload: EventValue::Summary(summary_value),
};
ts.rsv.offer(step, sv);
}
}
_ => {}
}
}
}

Expand Down Expand Up @@ -390,7 +423,7 @@ mod test {
f1.into_inner()?.sync_all()?;
f2.into_inner()?.sync_all()?;

let mut loader = RunLoader::new();
let mut loader = RunLoader::new(run.clone());
let commit = Commit::new();
commit
.runs
Expand All @@ -401,7 +434,7 @@ mod test {

// Start time should be that of the file version event, even though that didn't correspond
// to any time series.
assert_eq!(loader.start_time, Some(WallTime::new(1234.0).unwrap()));
assert_eq!(loader.data.start_time, Some(WallTime::new(1234.0).unwrap()));

let runs = commit.runs.read().expect("read-locking runs map");
let run_data: &commit::RunData = &*runs
Expand Down
2 changes: 2 additions & 0 deletions tensorboard/data/server_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def start(self):
]
if logger.isEnabledFor(logging.INFO):
args.append("--verbose")
if logger.isEnabledFor(logging.DEBUG):
args.append("--verbose") # Repeat arg to increase verbosity.

logger.info("Spawning data server: %r", args)
popen = subprocess.Popen(args, stdin=subprocess.PIPE)
Expand Down