Skip to content

Add option to read multiple active event files per directory #1867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ tensorboard in inspect mode to inspect the contents of your event files.

### TensorBoard is showing only some of my data, or isn't properly updating!

> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can
> now be used to poll all "active" files in a directory for new data, rather
> than the most recent one as described below. A file is "active" as long as it
> received new data within `--reload_multifile_inactive_secs` seconds ago,
> defaulting to 4000. You may need to install our nightly build
> [`tb-nightly`][tb-nightly] for this option to be available.

This issue usually comes about because of how TensorBoard iterates through the
`tfevents` files: it progresses through the events file in timestamp order, and
only reads one file at a time. Let's suppose we have files with timestamps `a`
Expand All @@ -260,6 +267,12 @@ multiple summary writers, each one should be writing to a separate directory.

### Does TensorBoard support multiple or distributed summary writers?

> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can
> now be used to poll all "active" files in a directory for new data, defined as
> any file that received new data within `--reload_multifile_inactive_secs`
> seconds ago, defaulting to 4000. You may need to install our nightly build
> [`tb-nightly`][tb-nightly] for this option to be available.

No. TensorBoard expects that only one events file will be written to at a time,
and multiple summary writers means multiple events files. If you are running a
distributed TensorFlow instance, we encourage you to designate a single worker
Expand All @@ -275,6 +288,12 @@ with itself, there are a few possible explanations.
* You may have multiple execution of TensorFlow that all wrote to the same log
directory. Please have each TensorFlow run write to its own logdir.

> **Update:** the [experimental `--reload_multifile=true` option][pr-1867] can
> now be used to poll all "active" files in a directory for new data, defined
> as any file that received new data within `--reload_multifile_inactive_secs`
> seconds ago, defaulting to 4000. You may need to install our nightly build
> [`tb-nightly`][tb-nightly] for this option to be available.

* You may have a bug in your code where the global_step variable (passed
to `FileWriter.add_summary`) is being maintained incorrectly.

Expand Down Expand Up @@ -372,3 +391,5 @@ information as you can provide (e.g. attaching events files, including the outpu
of `tensorboard --inspect`, etc.).

[stack-overflow]: https://stackoverflow.com/questions/tagged/tensorboard
[pr-1867]: https://github.com/tensorflow/tensorboard/pull/1867
[tb-nightly]: https://pypi.org/project/tb-nightly/
21 changes: 20 additions & 1 deletion tensorboard/backend/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
:type plugin_loaders: list[base_plugin.TBLoader]
:rtype: TensorBoardWSGI
"""
event_file_active_filter = _get_event_file_active_filter(flags)
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=DEFAULT_SIZE_GUIDANCE,
tensor_size_guidance=tensor_size_guidance_from_flags(flags),
purge_orphaned_data=flags.purge_orphaned_data,
max_reload_threads=flags.max_reload_threads)
max_reload_threads=flags.max_reload_threads,
event_file_active_filter=event_file_active_filter)
loading_multiplexer = multiplexer
reload_interval = flags.reload_interval
# For db import op mode, prefer reloading in a child process. See
Expand Down Expand Up @@ -530,3 +532,20 @@ def _clean_path(path, path_prefix=""):
if path != path_prefix + '/' and path.endswith('/'):
return path[:-1]
return path


def _get_event_file_active_filter(flags):
"""Returns a predicate for whether an event file load timestamp is active.

Returns:
A predicate function accepting a single UNIX timestamp float argument, or
None if multi-file loading is not enabled.
"""
if not flags.reload_multifile:
return None
inactive_secs = flags.reload_multifile_inactive_secs
if inactive_secs == 0:
return None
if inactive_secs < 0:
return lambda timestamp: True
return lambda timestamp: timestamp + inactive_secs >= time.time()
39 changes: 38 additions & 1 deletion tensorboard/backend/application_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import shutil
import socket
import tempfile
import time

import six

Expand Down Expand Up @@ -58,7 +59,9 @@ def __init__(
db_import=False,
db_import_use_op=False,
window_title='',
path_prefix=''):
path_prefix='',
reload_multifile=False,
reload_multifile_inactive_secs=4000):
self.logdir = logdir
self.purge_orphaned_data = purge_orphaned_data
self.reload_interval = reload_interval
Expand All @@ -70,6 +73,8 @@ def __init__(
self.db_import_use_op = db_import_use_op
self.window_title = window_title
self.path_prefix = path_prefix
self.reload_multifile = reload_multifile
self.reload_multifile_inactive_secs = reload_multifile_inactive_secs


class FakePlugin(base_plugin.TBPlugin):
Expand Down Expand Up @@ -366,6 +371,38 @@ def testSlashlessRoute(self):
self._test('runaway', False)


class GetEventFileActiveFilterTest(tb_test.TestCase):

def testDisabled(self):
flags = FakeFlags('logdir', reload_multifile=False)
self.assertIsNone(application._get_event_file_active_filter(flags))

def testInactiveSecsZero(self):
flags = FakeFlags('logdir', reload_multifile=True,
reload_multifile_inactive_secs=0)
self.assertIsNone(application._get_event_file_active_filter(flags))

def testInactiveSecsNegative(self):
flags = FakeFlags('logdir', reload_multifile=True,
reload_multifile_inactive_secs=-1)
filter_fn = application._get_event_file_active_filter(flags)
self.assertTrue(filter_fn(0))
self.assertTrue(filter_fn(time.time()))
self.assertTrue(filter_fn(float("inf")))

def testInactiveSecs(self):
flags = FakeFlags('logdir', reload_multifile=True,
reload_multifile_inactive_secs=10)
filter_fn = application._get_event_file_active_filter(flags)
with mock.patch.object(time, 'time') as mock_time:
mock_time.return_value = 100
self.assertFalse(filter_fn(0))
self.assertFalse(filter_fn(time.time() - 11))
self.assertTrue(filter_fn(time.time() - 10))
self.assertTrue(filter_fn(time.time()))
self.assertTrue(filter_fn(float("inf")))


class ParseEventFilesSpecTest(tb_test.TestCase):

def assertPlatformSpecificLogdirParsing(self, pathObj, logdir, expected):
Expand Down
29 changes: 29 additions & 0 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,33 @@ py_test(
],
)

py_library(
name = "directory_loader",
srcs = ["directory_loader.py"],
srcs_version = "PY2AND3",
deps = [
":directory_watcher",
":io_wrapper",
"//tensorboard/compat:tensorflow",
"//tensorboard/util:tb_logging",
],
)

py_test(
name = "directory_loader_test",
size = "small",
srcs = ["directory_loader_test.py"],
srcs_version = "PY2AND3",
deps = [
":directory_loader",
":directory_watcher",
":event_file_loader",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard/util:test_util",
"@org_pythonhosted_mock",
],
)

py_library(
name = "directory_watcher",
srcs = ["directory_watcher.py"],
Expand Down Expand Up @@ -101,6 +128,7 @@ py_library(
srcs_version = "PY2AND3",
visibility = ["//visibility:public"],
deps = [
":directory_loader",
":directory_watcher",
":event_file_loader",
":io_wrapper",
Expand Down Expand Up @@ -190,6 +218,7 @@ py_test(
":event_accumulator",
":event_multiplexer",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard/util:test_util",
],
)

Expand Down
136 changes: 136 additions & 0 deletions tensorboard/backend/event_processing/directory_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Implementation for a multi-file directory loader."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorboard.backend.event_processing import directory_watcher
from tensorboard.backend.event_processing import io_wrapper
from tensorboard.compat import tf
from tensorboard.util import tb_logging


logger = tb_logging.get_logger()


# Sentinel object for an inactive path.
_INACTIVE = object()


class DirectoryLoader(object):
"""Loader for an entire directory, maintaining multiple active file loaders.

This class takes a directory, a factory for loaders, and optionally a
path filter and watches all the paths inside that directory for new data.
Each file loader created by the factory must read a path and produce an
iterator of (timestamp, value) pairs.

Unlike DirectoryWatcher, this class does not assume that only one file
receives new data at a time; there can be arbitrarily many active files.
However, any file whose maximum load timestamp fails an "active" predicate
will be marked as inactive and no longer checked for new data.
"""

def __init__(self, directory, loader_factory, path_filter=lambda x: True,
active_filter=lambda timestamp: True):
"""Constructs a new MultiFileDirectoryLoader.

Args:
directory: The directory to load files from.
loader_factory: A factory for creating loaders. The factory should take a
path and return an object that has a Load method returning an iterator
yielding (unix timestamp as float, value) pairs for any new data
path_filter: If specified, only paths matching this filter are loaded.
active_filter: If specified, any loader whose maximum load timestamp does
not pass this filter will be marked as inactive and no longer read.

Raises:
ValueError: If directory or loader_factory are None.
"""
if directory is None:
raise ValueError('A directory is required')
if loader_factory is None:
raise ValueError('A loader factory is required')
self._directory = directory
self._loader_factory = loader_factory
self._path_filter = path_filter
self._active_filter = active_filter
self._loaders = {}
self._max_timestamps = {}

def Load(self):
"""Loads new values from all active files.

Yields:
All values that have not been yielded yet.

Raises:
DirectoryDeletedError: If the directory has been permanently deleted
(as opposed to being temporarily unavailable).
"""
try:
all_paths = io_wrapper.ListDirectoryAbsolute(self._directory)
paths = sorted(p for p in all_paths if self._path_filter(p))
for path in paths:
for value in self._LoadPath(path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of a race condition here: if listdir returns an event file that is
subsequently deleted before we invoke _LoadPath on it, the exception
will break us out of the whole Load loop.

The race window may be arbitrarily large, because it spans a coroutine
boundary.

  def testFileDeletedBeforeLoaderOpened(self):
    self._WriteToFile('a', 'a')
    self._WriteToFile('b', 'b')
    load_generator = self._loader.Load()
    self.assertEqual('a', next(load_generator))
    os.unlink(os.path.join(self._directory, 'b'))
    self.assertEqual(list(load_generator), [])  # fails: IOError(ENOENT)

Is this the desired behavior? (There might be a case for that.) I could
also see silently skipping the file, or maybe opening all loaders
eagerly—for loader in [self._LoadPath(path) for path in paths]:—though
maybe we don’t like the performance characteristics of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Note that this race across the coroutine boundary existed in DirectoryWatcher too; I think the exception was being swallowed by the except clause below. I went with skipping the file, since opening all loaders eagerly is worse for performance (right now, we close inactive ones when we're done, so when reading an old logdir we only ever have one loader open at a time, which is a nice property to retain) and there's still the non-coroutine race in that case anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM; skipping made the most sense to me, too. Thanks!

yield value
except tf.errors.OpError as e:
if not tf.io.gfile.exists(self._directory):
raise directory_watcher.DirectoryDeletedError(
'Directory %s has been permanently deleted' % self._directory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want else: raise here? Or are there OpErrors that we want to
ignore? (Which ones?)

If this is intended to be else: pass, can we briefly comment why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to avoid a behavior change here relative to the existing logic in DirectoryWatcher. I'm a little reluctant to change to else: raise without some understanding of what errors out in the wild are now going to kill the backend reload thread, but I've changed it to at least log at INFO so we could potentially collect that data - WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

else:
logger.info('Ignoring error during file loading: %s' % e)

def _LoadPath(self, path):
"""Generator for values from a single path's loader.

Args:
path: the path to load from

Yields:
All values from this path's loader that have not been yielded yet.
"""
max_timestamp = self._max_timestamps.get(path, None)
if max_timestamp is _INACTIVE or self._MarkIfInactive(path, max_timestamp):
logger.debug('Skipping inactive path %s', path)
return
loader = self._loaders.get(path, None)
if loader is None:
try:
loader = self._loader_factory(path)
except tf.errors.NotFoundError:
# Happens if a file was removed after we listed the directory.
logger.debug('Skipping nonexistent path %s', path)
return
self._loaders[path] = loader
logger.info('Loading data from path %s', path)
for timestamp, value in loader.Load():
if max_timestamp is None or timestamp > max_timestamp:
max_timestamp = timestamp
yield value
if not self._MarkIfInactive(path, max_timestamp):
self._max_timestamps[path] = max_timestamp

def _MarkIfInactive(self, path, max_timestamp):
"""If max_timestamp is inactive, returns True and marks the path as such."""
logger.debug('Checking active status of %s at %s', path, max_timestamp)
if max_timestamp is not None and not self._active_filter(max_timestamp):
self._max_timestamps[path] = _INACTIVE
del self._loaders[path]
return True
return False
Loading