Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tensorboard/data/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ rust_library(
"//third_party/rust:prost",
"//third_party/rust:rand",
"//third_party/rust:rand_chacha",
"//third_party/rust:rayon",
"//third_party/rust:serde",
"//third_party/rust:serde_json",
"//third_party/rust:thiserror",
Expand Down Expand Up @@ -82,6 +83,7 @@ rust_binary(
"//third_party/rust:clap",
"//third_party/rust:env_logger",
"//third_party/rust:log",
"//third_party/rust:rayon",
],
)

Expand Down
4 changes: 3 additions & 1 deletion tensorboard/data/server/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct Opts {
logdir: PathBuf,
#[clap(long, default_value = "info")]
log_level: String,
#[clap(long)]
reload_threads: Option<usize>,
// Pair of `--no-checksum` and `--checksum` flags, defaulting to "no checksum".
#[clap(long, multiple_occurrences = true, overrides_with = "checksum")]
#[allow(unused)]
Expand All @@ -42,7 +44,7 @@ fn main() {
init_logging(&opts);

let commit = Commit::new();
let mut loader = LogdirLoader::new(&commit, opts.logdir);
let mut loader = LogdirLoader::new(&commit, opts.logdir, opts.reload_threads.unwrap_or(0));
loader.checksum(opts.checksum); // if neither `--[no-]checksum` given, defaults to false

info!("Starting load cycle");
Expand Down
2 changes: 1 addition & 1 deletion tensorboard/data/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
.spawn({
let logdir = opts.logdir;
let reload_strategy = opts.reload;
let mut loader = LogdirLoader::new(commit, logdir);
let mut loader = LogdirLoader::new(commit, logdir, 0);
// Checksum only if `--checksum` given (i.e., off by default).
loader.checksum(opts.checksum);
move || loop {
Expand Down
52 changes: 37 additions & 15 deletions tensorboard/data/server/logdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
//! Loader for many runs under a directory.

use log::{error, warn};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use walkdir::WalkDir;
Expand All @@ -26,6 +27,7 @@ use crate::types::Run;

/// A loader for a log directory, connecting a filesystem to a [`Commit`] via [`RunLoader`]s.
pub struct LogdirLoader<'a> {
thread_pool: rayon::ThreadPool,
commit: &'a Commit,
logdir: PathBuf,
runs: HashMap<Run, RunState>,
Expand Down Expand Up @@ -76,8 +78,23 @@ const EVENT_FILE_BASENAME_INFIX: &str = "tfevents";

impl<'a> LogdirLoader<'a> {
/// Creates a new, empty logdir loader. Does not load any data.
pub fn new(commit: &'a Commit, logdir: PathBuf) -> Self {
///
/// This constructor is heavyweight: it builds a new thread-pool. The thread pool will be
/// reused for all calls to [`Self::reload`]. If `reload_threads` is `0`, the number of threads
/// will be determined automatically, per [`rayon`] semantics.
///
/// # Panics
///
/// If [`rayon::ThreadPoolBuilder::build`] returns an error; should only happen if there is a
/// failure to create threads at the OS level.
pub fn new(commit: &'a Commit, logdir: PathBuf, reload_threads: usize) -> Self {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(reload_threads)
.thread_name(|i| format!("Reloader-{:03}", i))
.build()
.expect("failed to construct Rayon thread pool");
LogdirLoader {
thread_pool,
commit,
logdir,
runs: HashMap::new(),
Expand Down Expand Up @@ -246,22 +263,27 @@ impl<'a> LogdirLoader<'a> {
.runs
.read()
.expect("could not acquire runs.data");

let mut work_items = Vec::new();
for (run, run_state) in self.runs.iter_mut() {
let event_files = discoveries
.remove(run)
.unwrap_or_else(|| panic!("run in self.runs but not discovered: {:?}", run));
let filenames: Vec<PathBuf> = event_files.into_iter().map(|d| d.event_file).collect();
run_state.loader.reload(
filenames,
commit_runs.get(run).unwrap_or_else(|| {
panic!(
"run in self.runs but not in commit.runs \
let run_data = commit_runs.get(run).unwrap_or_else(|| {
panic!(
"run in self.runs but not in commit.runs \
(is another client mutating this commit?): {:?}",
run
)
}),
);
run
)
});
work_items.push((&mut run_state.loader, filenames, run_data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might just be tying myself in knots here, but do you have any thoughts for how I can better understand the logic the borrow checker uses to handle this kind of case where we mutably borrow each run_state.loader and push them all into another vector for later extraction? I guess the original self.runs lifetime is the outer bound for the individual loaders, and then the Vec inherits their lifetime, but then we take the loaders back out of Vec - and I guess the lifetime follows them through to wherever they go? But presumably it's also somehow attached to the Vec itself now, right, so even if we were to empty the Vec entirely and then try to go refill it without dropping it in between, that would still prevent mutation of self.runs even though the actual loader references are all gone?

Copy link
Contributor Author

@wchargin wchargin Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Let’s spelunk through the logic:

  • This method takes &mut self; call that elided lifetime '1, so
    it’s fn load_runs<'1>(&'1 mut self, ...).
  • We borrow self.runs.iter_mut() for lifetime '2, which must not
    outlive '1 so that it doesn’t outlive borrowed content.
  • Then we borrow &mut run_state.loader, for lifetime '3, which
    must not outlive '3 so that it doesn’t outlive that loan.
  • These are the lifetime upper bounds: '1: '2 and '2: '3, where
    :” is a subtyping relation that you can pronounce “outlives (or
    is equal to)”. You’ll often see T: 'static to indicate that T
    has only static references (“outlives everything”).
  • We have a Vec::<(&'v mut RunLoader, ...)> for some lifetime 'v.
    Since we push items with lifetime '3, we must have '3: 'v: the
    referenced content must outlive the vector.
  • Now, we move work_items into the closure arg to install. From
    here on out, there aren’t really any more lifetimes: we call
    into_par_iter and for_each, both of which are simply typed
    methods with by-value receivers. In particular, neither of them has
    a lifetime bound on its closure type, unlike std::thread::spawn.

You can similarly analyze the lifetime flow for the &RwLock<...>
lifetime. You’ll get something like this:

Lifetime graph; for details, see https://gist.github.com/wchargin/e18bca00fbf7f2eb6ae77a193fc2048e

(Here, an arrow from a to b means a: b.)

Writing it out, the inference looks pretty unidirectional here: we
require '1: '2: '3: 'v, but there’s nothing concrete at the end of
that chain, so this is satisfiable by (e.g.) setting '1 = '2 = '3 = 'v
and the borrow checker is happy.

If we had called std::thread::spawn instead of install, we would
then require 'v: 'static, since spawn requires that F: 'static
(where F is the type of the closure), and we have 'v: F since the
vector must live as long as the closure itself. This transitively means
that the whole diagram must outlive 'static. But '1 was arbitrary:
it’s the lifetime to the receiver parameter. Thus, this isn’t
satisfiable in general and should be a borrowck error.

Let’s see what Rustc has to say. In this patch, I’ve dropped the
run_data arg so that the compiler is forced to complain about the left
branch of the graph above rather than the right branch (it’s the same
logic, just not the one that you asked about):

diff --git a/tensorboard/data/server/logdir.rs b/tensorboard/data/server/logdir.rs
--- a/tensorboard/data/server/logdir.rs
+++ b/tensorboard/data/server/logdir.rs
@@ -277,12 +277,13 @@ impl<'a> LogdirLoader<'a> {
                     run
                 )
             });
-            work_items.push((&mut run_state.loader, filenames, run_data));
+            work_items.push((&mut run_state.loader, filenames));
         }
-        self.thread_pool.install(|| {
-            work_items
-                .into_par_iter()
-                .for_each(|(loader, filenames, run_data)| loader.reload(filenames, run_data));
+        drop(commit_runs);
+        std::thread::spawn(move || {
+            for (loader, filenames) in work_items.into_iter() {
+                loader.reload(filenames, todo!("omitted for example"));
+            }
         });
     }
 }

Rustc says:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> logdir.rs:268:43
    |
259 |     fn load_runs(&mut self, discoveries: Discoveries) {
    |                  --------- this data with an anonymous lifetime `'_`...
...
268 |         for (run, run_state) in self.runs.iter_mut() {
    |                                 --------- ^^^^^^^^
    |                                 |
    |                                 ...is captured here...
...
283 |         std::thread::spawn(move || {
    |         ------------------ ...and is required to live as long as `'static` here

This is pointing out the transitive '1: '2: 'static chain and
complaining about its unsatisfiability.

So how can Rayon’s install or Crossbeam’s scope get away with this
while thread::spawn needs the 'static requirement? The key is that
the scoped thread implementations ensure that the callback finishes
before install/scope itself returns. Thus, the borrowed content only
needs to live as long as the call to install, which is on the caller’s
stack frame and is thus known to be outlived by '1. Under the hood,
both of these will call thread::spawn after using unsafe to
transmute your closure to 'static
. (If “unsafe” is surprising
here, remember that soundness is what we care about. “Sound” means “in
no real execution will it trigger undefined behavior”; “safe” means “the
compiler has statically proven that it is sound”; “unsafe” means “the
code author
has statically proven that it is sound based on properties
too advanced to be represented in the base type system”. We use sound
unsafe code to build safe abstractions.)

But presumably it's also somehow attached to the Vec itself now, right, so even if we were to empty the Vec entirely and then try to go refill it without dropping it in between, that would still prevent mutation of self.runs even though the actual loader references are all gone?

Yeah, definitely. You can get that kind of flexibility with RefCell,
which basically pushes the borrow checking to runtime. (Follows a Rust
pattern: you can have dynamic borrow checking, dynamic dispatch, dynamic
reference counting, dynamic typing (std::any), but you have to opt in,
and sometimes accept consequences like “your type is no longer Sync”.)
But a humble Vec<_> isn’t that smart by default.

Does this help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that helps! Thanks very much for the thorough explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check my understanding, this is a Vec of the parameters to the closures we ultimately run, rather than the closures themselves, because the closures even though they are all identically structured here are in fact all different anonymous types, so we can't put them in a Vec directly (we'd have to used a boxed FnMut or something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Assuming that you’re talking about the closure argument
to for_each (currently line 285): my understanding is that these are
all the same closure type. The type is anonymous (you can’t name it),
but you can capture it (much like how in Java if you have a Foo<?> you
can pass it to a <T>helper(Foo<T>) to give T a local name). So we
could also write this:

diff --git a/tensorboard/data/server/logdir.rs b/tensorboard/data/server/logdir.rs
index 23ff65893..cbab00442 100644
--- a/tensorboard/data/server/logdir.rs
+++ b/tensorboard/data/server/logdir.rs
@@ -277,12 +277,11 @@ impl<'a> LogdirLoader<'a> {
                     run
                 )
             });
-            work_items.push((&mut run_state.loader, filenames, run_data));
+            let loader = &mut run_state.loader;
+            work_items.push(move || loader.reload(filenames, run_data));
         }
         self.thread_pool.install(|| {
-            work_items
-                .into_par_iter()
-                .for_each(|(loader, filenames, run_data)| loader.reload(filenames, run_data));
+            work_items.into_par_iter().for_each(|f| f());
         });
     }
 }

As written, the closures desugar kind of like this:

struct MyClosure; // actually doesn't close over anything

impl for<'a> FnMut<(&'a mut RunLoader, Vec<PathBuf>, &RwLock<commit::RunData>)> for MyClosure {
    fn call_mut(&mut self, arg: (&'a mut RunLoader, ...)) {
        let (loader, filenames, run_data) = arg;
        loader.reload(filenames, run_data);
    }
}

(Since they don’t close over anything, this could be a free-standing
function caled as .for_each(call_reload_with_things), but defining it
inline is easier to read, hence the “closure”.)

If we put them in a Vec directly, they would close over things, so
would desugar like this:

struct YourClosure<'a> {
    loader: &'a mut RunLoader,
    filenames: Vec<PathBuf>,
    run_data: &RwLock<commit::RunData>,
}

impl for<'a> FnMut<()> for MyClosure { // now doesn't need any args
    fn call_mut(&mut self) {
        self.loader.reload(self.filenames, self.run_data);
    }
}

Both work, and I wouldn’t be surprised if they yielded exactly the same
machine code.

Does this help? Personally, I find the “collect the arguments” approach
a bit easier to think about here. I don’t have a strong preference. You?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, got it, thanks. I think my natural inclination would be to do a vector of argument-less closures since that seems more encapsulated (the work item execution code doesn't need to know anything task-specific about how to unpack the vec), but I don't have a strong preference.

}
self.thread_pool.install(|| {
work_items
.into_par_iter()
.for_each(|(loader, filenames, run_data)| loader.reload(filenames, run_data));
});
}
}

Expand Down Expand Up @@ -319,7 +341,7 @@ mod tests {
let test_relpath: PathBuf = ["mnist", "test"].iter().collect();

let commit = Commit::new();
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf());
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1);

// Check that we persist the right run states in the loader.
loader.reload();
Expand Down Expand Up @@ -379,7 +401,7 @@ mod tests {
)?;

let commit = Commit::new();
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf());
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1);

let get_run_names = || {
let runs_store = commit.runs.read().unwrap();
Expand Down Expand Up @@ -442,7 +464,7 @@ mod tests {
}

let commit = Commit::new();
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf());
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1);
loader.reload();

assert_eq!(loader.runs.keys().collect::<Vec<_>>(), vec![&run]);
Expand All @@ -467,7 +489,7 @@ mod tests {
File::create(train_dir.join(EVENT_FILE_BASENAME_INFIX))?;

let commit = Commit::new();
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf());
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1);
loader.reload();

assert_eq!(
Expand All @@ -489,7 +511,7 @@ mod tests {
std::os::unix::fs::symlink(&dir2, &dir1)?;

let commit = Commit::new();
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf());
let mut loader = LogdirLoader::new(&commit, logdir.path().to_path_buf(), 1);
loader.reload(); // should not hang
Ok(())
}
Expand Down