-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add option to read multiple active event files per directory #1867
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time-based predicate and overall design looks okay to me. Few questions:
- when can the
timestamp
from TimestampedEventFileLoader be None? - it looks like once an event file's time fails to write within the
inactive_secs
, it will never have a chance to be visited again. Kinda sucks for the case when an epoch for that tookinactive_secs + 1s
. Is there a way to discern if a file is current opened by another process?
High-level design in the PR description sounds good to me. I have only
I like this idea. Emitting SessionLog.STOP has the problem of course Apart from efficiency improvements, a stability improvement for this
True, though I hear that Go’s abstracted over the platform-specificity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/me submits an official “review” to stop GitHub from nagging me. :-)
…ti-file directory loading behavior
I've finally updated the PR with a different approach to the flags (rather than a single one that forces the user to set a threshold, I now have a flag to enable and a different flag to customize the threshold). I also added tests :) And I moved the new code into a new file PTAL :) |
def testInactiveSecsNegative(self): | ||
flags = FakeFlags('logdir', reload_multifile=True, | ||
reload_multifile_inactive_secs=-1) | ||
filter = application._get_eventfile_active_filter(flags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we call this filter_
to avoid shadowing the builtin? Makes
the highlighting a bit confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to filter_fn
if that's okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly.
tensorboard/backend/application.py
Outdated
@@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider): | |||
:type plugin_loaders: list[base_plugin.TBLoader] | |||
:rtype: TensorBoardWSGI | |||
""" | |||
eventfile_active_filter = _get_eventfile_active_filter(flags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global nit: We’ve generally called them “event files” (two words)
throughout the codebase:
$ git grep -c 'events\?file'
tensorboard/plugins/profile/profile_demo.py:1
tensorboard/plugins/profile/profile_plugin_test.py:1
$ git grep 'events\?[ _]file' | wc -l
347
Shall we stick with that instead of adding “eventfile” as an alternate
term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, changed to two words.
tensorboard/backend/application.py
Outdated
"""Returns a predicate for whether an eventfile load timestamp is active. | ||
|
||
Returns: | ||
A predicate function accepting a single UNIX timestamp float argument. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…or None
if multi-file loading is not enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
except tf.errors.OpError: | ||
if not tf.io.gfile.exists(self._directory): | ||
raise directory_watcher.DirectoryDeletedError( | ||
'Directory %s has been permanently deleted' % self._directory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want else: raise
here? Or are there OpError
s that we want to
ignore? (Which ones?)
If this is intended to be else: pass
, can we briefly comment why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent was to avoid a behavior change here relative to the existing logic in DirectoryWatcher. I'm a little reluctant to change to else: raise
without some understanding of what errors out in the wild are now going to kill the backend reload thread, but I've changed it to at least log at INFO so we could potentially collect that data - WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
self._loaders[path] = loader | ||
logger.info('Loading data from path %s', path) | ||
for timestamp, value in loader.Load(): | ||
if max_timestamp is None or (timestamp is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn’t loader.Load()
always supposed to yield (float, value)
pairs?
In what case would timestamp
be None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, removed the guard. Somehow I was thinking if omitted in the proto it could be None, but it's 0.0.
# d: empty file (should be considered active in absence of timestamps). | ||
self._WriteToFile('a', ['A1', 'A2'], [1, 2]) | ||
self._WriteToFile('b', ['B1'], [1]) | ||
self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that we see in practice? My understanding was that the
timestamps are added by the C++ writer itself (and users have no control
over them). I suppose third-party writers can do whatever they want, and
I appreciate the test case anyway; mostly just curious.
(I’m deliberately ignoring non-monotonic clocks around DST transitions…)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regrettably, users can indeed control wall_time, because the TF 1.x FileWriter API supports writing arbitrary Events:
python -c 'import glob; import tensorflow as tf; w = tf.summary.FileWriter("/tmp/foo"); w.add_event(tf.Event(wall_time=42.0)); w.flush(); print(list(tf.train.summary_iterator(glob.glob("/tmp/foo/event*")[0])))'
[wall_time: 1564101194.0
file_version: "brain.Event:2"
, wall_time: 42.0
]
FWIW, I deliberately kept that ability out of the TF 2.0 API: tensorflow/tensorflow@1ea3483
I'm optimistic that not many people actually make use of this ability (and obviously if users are doing anything exotic with wall_time then the resource limiting approach here may backfire), but I thought it was at least worth confirming that we do the right thing in a case where e.g. the user takes an eventfile and re-sorts the events, then writes it back out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. Agreed, then.
all_paths = io_wrapper.ListDirectoryAbsolute(self._directory) | ||
paths = sorted(p for p in all_paths if self._path_filter(p)) | ||
for path in paths: | ||
for value in self._LoadPath(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bit of a race condition here: if listdir returns an event file that is
subsequently deleted before we invoke _LoadPath
on it, the exception
will break us out of the whole Load
loop.
The race window may be arbitrarily large, because it spans a coroutine
boundary.
def testFileDeletedBeforeLoaderOpened(self):
self._WriteToFile('a', 'a')
self._WriteToFile('b', 'b')
load_generator = self._loader.Load()
self.assertEqual('a', next(load_generator))
os.unlink(os.path.join(self._directory, 'b'))
self.assertEqual(list(load_generator), []) # fails: IOError(ENOENT)
Is this the desired behavior? (There might be a case for that.) I could
also see silently skipping the file, or maybe opening all loaders
eagerly—for loader in [self._LoadPath(path) for path in paths]:
—though
maybe we don’t like the performance characteristics of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Note that this race across the coroutine boundary existed in DirectoryWatcher too; I think the exception was being swallowed by the except clause below. I went with skipping the file, since opening all loaders eagerly is worse for performance (right now, we close inactive ones when we're done, so when reading an old logdir we only ever have one loader open at a time, which is a nice property to retain) and there's still the non-coroutine race in that case anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM; skipping made the most sense to me, too. Thanks!
# Create two separate event files, using filename suffix to ensure a | ||
# deterministic sort order, and then simulate a write to file A, then | ||
# to file B, then another write to file A (with reloads after each). | ||
with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is failing in the TF 2.0 test environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by updating the test file writer to avoid the guard against use under eager mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review - PTAL
tensorboard/backend/application.py
Outdated
"""Returns a predicate for whether an eventfile load timestamp is active. | ||
|
||
Returns: | ||
A predicate function accepting a single UNIX timestamp float argument. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
tensorboard/backend/application.py
Outdated
@@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider): | |||
:type plugin_loaders: list[base_plugin.TBLoader] | |||
:rtype: TensorBoardWSGI | |||
""" | |||
eventfile_active_filter = _get_eventfile_active_filter(flags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, changed to two words.
def testInactiveSecsNegative(self): | ||
flags = FakeFlags('logdir', reload_multifile=True, | ||
reload_multifile_inactive_secs=-1) | ||
filter = application._get_eventfile_active_filter(flags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to filter_fn
if that's okay.
# d: empty file (should be considered active in absence of timestamps). | ||
self._WriteToFile('a', ['A1', 'A2'], [1, 2]) | ||
self._WriteToFile('b', ['B1'], [1]) | ||
self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regrettably, users can indeed control wall_time, because the TF 1.x FileWriter API supports writing arbitrary Events:
python -c 'import glob; import tensorflow as tf; w = tf.summary.FileWriter("/tmp/foo"); w.add_event(tf.Event(wall_time=42.0)); w.flush(); print(list(tf.train.summary_iterator(glob.glob("/tmp/foo/event*")[0])))'
[wall_time: 1564101194.0
file_version: "brain.Event:2"
, wall_time: 42.0
]
FWIW, I deliberately kept that ability out of the TF 2.0 API: tensorflow/tensorflow@1ea3483
I'm optimistic that not many people actually make use of this ability (and obviously if users are doing anything exotic with wall_time then the resource limiting approach here may backfire), but I thought it was at least worth confirming that we do the right thing in a case where e.g. the user takes an eventfile and re-sorts the events, then writes it back out.
self._loaders[path] = loader | ||
logger.info('Loading data from path %s', path) | ||
for timestamp, value in loader.Load(): | ||
if max_timestamp is None or (timestamp is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, removed the guard. Somehow I was thinking if omitted in the proto it could be None, but it's 0.0.
except tf.errors.OpError: | ||
if not tf.io.gfile.exists(self._directory): | ||
raise directory_watcher.DirectoryDeletedError( | ||
'Directory %s has been permanently deleted' % self._directory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent was to avoid a behavior change here relative to the existing logic in DirectoryWatcher. I'm a little reluctant to change to else: raise
without some understanding of what errors out in the wild are now going to kill the backend reload thread, but I've changed it to at least log at INFO so we could potentially collect that data - WDYT?
# Create two separate event files, using filename suffix to ensure a | ||
# deterministic sort order, and then simulate a write to file A, then | ||
# to file B, then another write to file A (with reloads after each). | ||
with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by updating the test file writer to avoid the guard against use under eager mode.
all_paths = io_wrapper.ListDirectoryAbsolute(self._directory) | ||
paths = sorted(p for p in all_paths if self._path_filter(p)) | ||
for path in paths: | ||
for value in self._LoadPath(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Note that this race across the coroutine boundary existed in DirectoryWatcher too; I think the exception was being swallowed by the except clause below. I went with skipping the file, since opening all loaders eagerly is worse for performance (right now, we close inactive ones when we're done, so when reading an old logdir we only ever have one loader open at a time, which is a nice property to retain) and there's still the non-coroutine race in that case anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two high-level questions:
-
If I understand correctly, the new
DirectoryLoader
may mark a file
as inactive even if it’s the only file in the directory. Consider,
for instance, a long-running job that writes summaries every two
hours; previous versions of TensorBoard would continue updating
forever, while under the new semantics we’ll only see it update
once.Is this correct? It seems at least a little unfortunate. Did we
consider not marking a file inactive when it’s the only active file?Test case, in case the above description is unclear
def testFilter_onlyOneLoader(self): """A sole loader can be marked as inactive.""" loader_registry = [] loader_factory = functools.partial( _TimestampedByteLoader, registry=loader_registry) threshold = 0 active_filter = lambda timestamp: timestamp >= threshold self._loader = directory_loader.DirectoryLoader( self._directory, loader_factory, active_filter=active_filter) def assertLoadersForPaths(paths): paths = [os.path.join(self._directory, path) for path in paths] self.assertEqual(loader_registry, paths) # self._WriteToFile('a', ['A1', 'A2'], [1, 2]) threshold = 2 # First load pass should leave file C marked inactive. self.assertLoaderYields(['A1', 'A2']) assertLoadersForPaths(['a']) threshold = 3 # Second load pass should mark file A as inactive (due to newly # increased threshold) and thus skip reading data from it. self.assertLoaderYields([]) assertLoadersForPaths([]) # Even when we get more data and it's the only file. :-( self._WriteToFile('a', ['A3', 'A4'], [3, 4]) self.assertLoaderYields([]) assertLoadersForPaths([])
-
The two names
DirectoryWatcher
andDirectoryLoader
tripped me up
a bit in the course of this review. They both watch, and they both
load; the primary difference is in their multi-file support, yes?
Can we find clearer names? :-)
def testInactiveSecsNegative(self): | ||
flags = FakeFlags('logdir', reload_multifile=True, | ||
reload_multifile_inactive_secs=-1) | ||
filter = application._get_eventfile_active_filter(flags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly.
except tf.errors.OpError: | ||
if not tf.io.gfile.exists(self._directory): | ||
raise directory_watcher.DirectoryDeletedError( | ||
'Directory %s has been permanently deleted' % self._directory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
# d: empty file (should be considered active in absence of timestamps). | ||
self._WriteToFile('a', ['A1', 'A2'], [1, 2]) | ||
self._WriteToFile('b', ['B1'], [1]) | ||
self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. Agreed, then.
next(generator) # Ignore the file_version event. | ||
event = next(generator) | ||
self.assertEqual('a', event.summary.value[0].tag) | ||
os.remove(glob.glob(os.path.join(self._directory, '*.b'))[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably fine here, but I really wish glob.glob
had a directory
kwarg so that we could be sure that no globs are expanded in the first
argument to os.path.join
. :-( No action required.
# Create two separate event files, using filename suffix to ensure a | ||
# deterministic sort order, and then simulate a write to file A, then | ||
# to file B, then another write to file A (with reloads after each). | ||
with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
all_paths = io_wrapper.ListDirectoryAbsolute(self._directory) | ||
paths = sorted(p for p in all_paths if self._path_filter(p)) | ||
for path in paths: | ||
for value in self._LoadPath(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM; skipping made the most sense to me, too. Thanks!
Yes, the new loading algorithm will stop polling for new data if no files are active (it will still poll for new runs, to be clear). Honestly, I see this as a feature - it means loading up an old experiment will only read each file once and then only monitor for new files. To continue polling the last file in the directory even if it's inactive would prevent us from getting that nice property, unless we set a separate inactive threshold on that, but I honestly think that would just be more confusing for the user. I'm open to changing the default age threshold to be longer (maybe even 24 hours) but I guess I was thinking of starting off more aggressive and then walking it back if it's an issue, to be more ambitious about curtailing resource usage, since I think it's overall easier to lengthen the threshold than shorten it (the result being higher resource usage but not unexpectedly absent data). Re: Watcher and Loader, I originally named it MultiFileDirectoryWatcher but it's a mouthful and I wasn't crazy about that being the long-term name (assuming we migrate to this implementation only at some point). Also, since the EventAccumulator logic actually can use these in the place of an EventFileLoader, they essentially implement a "Loader" contract, so the "Loader" suffix seemed better (and people in the past complained that DirectoryWatcher suggested equivalence with an internal Watcher concept that doesn't have the same semantics). In an admittedly fuzzy sense, the old logic is more like just "watching" the latest file rather than truly loading the whole directory, whereas the new logic really does load the entire directory. That said, I am sympathetic to the similarity in the names being somewhat confusing. Do you have a suggestion in mind? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, I see this as a feature […]
Yep, this is pretty reasonable. As long as we message the change
clearly, it makes sense to me.
I'm open to changing the default age threshold to be longer
I’ve got no problem with the proposed default (the “two hours” in my
example was just ”whatever the default is + ε”). Agreed that it’s easier
to revise upward.
Do you have a suggestion in mind?
Nope!
PTAL, I've updated the FAQ blurbs to be more clear that the option changes the polling criteria, rather than just expanding them (which the previous blurbs implied). |
This adds an experimental option
--reload_multifile=true
that addresses #1063 by polling for new data from multiple active event files within a given run directory, rather than only polling the most recent event file. (See #1063 for why that behavior has been problematic.)To avoid runaway resource usage, we don't unconditionally poll all event files, since in most cases many of them are no longer receiving new data. This PR instead introduces a notion of an "inactive age" (settable via
--reload_multifile_inactive_secs
) beyond which an eventfile with not event wall time more recent than that will be considered "inactive" and no longer polled. This means, for example, that users who accumulate many runs over a period of weeks in the same logdir will only poll new data from runs that have data written within a recent window. (In this respect, the new code can significantly improve resource usage against logdirs in which many runs are entirely static.) The default value of this flag is set to 4000 (somewhat over an hour) to accommodate hourly update frequencies while still representing a fairly aggressive cutoff. In theory we could compute a smarter threshold by taking into account the length of time taken by previous reloads as well as the--reload_interval
value, but this seems reasonable for now.Enabling the behavior may still result in a large increase in memory usage and disk reads / network requests for logdirs that have directories with many recently written files, since those will now all be polled. Possible further efficiency improvements include: