diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index 18973ffc05..6b37a914b0 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -13,6 +13,7 @@ py_library( srcs_version = "PY2AND3", deps = [ "//tensorboard:expect_tensorflow_installed", + "@org_pythonhosted_six", ], ) @@ -24,6 +25,7 @@ py_test( deps = [ ":io_wrapper", "//tensorboard:expect_tensorflow_installed", + "@org_pythonhosted_six", ], ) @@ -94,6 +96,7 @@ py_library( deps = [ ":directory_watcher", ":event_file_loader", + ":io_wrapper", ":plugin_asset_util", ":reservoir", "//tensorboard:data_compat", @@ -188,7 +191,7 @@ py_library( srcs_version = "PY2AND3", deps = [ ":event_accumulator", - ":event_multiplexer", + ":io_wrapper", "//tensorboard:expect_tensorflow_installed", ], ) diff --git a/tensorboard/backend/event_processing/directory_watcher_test.py b/tensorboard/backend/event_processing/directory_watcher_test.py index b0727b6624..c65a2e238a 100644 --- a/tensorboard/backend/event_processing/directory_watcher_test.py +++ b/tensorboard/backend/event_processing/directory_watcher_test.py @@ -193,7 +193,12 @@ def Fake(*args, **kwargs): FakeFactory.has_been_called = False - for stub_name in ['ListDirectoryAbsolute', 'ListRecursively']: + stub_names = [ + 'ListDirectoryAbsolute', + 'ListRecursivelyViaGlobbing', + 'ListRecursivelyViaWalking', + ] + for stub_name in stub_names: self.stubs.Set(io_wrapper, stub_name, FakeFactory(getattr(io_wrapper, stub_name))) for stub_name in ['IsDirectory', 'Exists', 'Stat']: diff --git a/tensorboard/backend/event_processing/event_accumulator.py b/tensorboard/backend/event_processing/event_accumulator.py index 81a79fbc32..742863b0dc 100644 --- a/tensorboard/backend/event_processing/event_accumulator.py +++ b/tensorboard/backend/event_processing/event_accumulator.py @@ -18,13 +18,13 @@ from __future__ import print_function import collections -import os import threading import tensorflow as tf from tensorboard.backend.event_processing import directory_watcher from tensorboard.backend.event_processing import event_file_loader +from tensorboard.backend.event_processing import io_wrapper from tensorboard.backend.event_processing import plugin_asset_util from tensorboard.backend.event_processing import reservoir from tensorboard.plugins.distribution import compressor @@ -98,23 +98,6 @@ } -def IsTensorFlowEventsFile(path): - """Check the path name to see if it is probably a TF Events file. - - Args: - path: A file path to check if it is an event file. - - Raises: - ValueError: If the path is an empty string. - - Returns: - If path is formatted like a TensorFlowEventsFile. - """ - if not path: - raise ValueError('Path must be a nonempty string') - return 'tfevents' in tf.compat.as_str_any(os.path.basename(path)) - - class EventAccumulator(object): """An `EventAccumulator` takes an event generator, and accumulates the values. @@ -747,11 +730,13 @@ def _GeneratorFromPath(path): """Create an event generator for file or directory at given path string.""" if not path: raise ValueError('path must be a valid string') - if IsTensorFlowEventsFile(path): + if io_wrapper.IsTensorFlowEventsFile(path): return event_file_loader.EventFileLoader(path) else: return directory_watcher.DirectoryWatcher( - path, event_file_loader.EventFileLoader, IsTensorFlowEventsFile) + path, + event_file_loader.EventFileLoader, + io_wrapper.IsTensorFlowEventsFile) def _ParseFileVersion(file_version): diff --git a/tensorboard/backend/event_processing/event_file_inspector.py b/tensorboard/backend/event_processing/event_file_inspector.py index dab4196376..c7852bba34 100644 --- a/tensorboard/backend/event_processing/event_file_inspector.py +++ b/tensorboard/backend/event_processing/event_file_inspector.py @@ -120,7 +120,7 @@ from tensorboard.backend.event_processing import event_accumulator from tensorboard.backend.event_processing import event_file_loader -from tensorboard.backend.event_processing import event_multiplexer +from tensorboard.backend.event_processing import io_wrapper FLAGS = tf.flags.FLAGS @@ -323,12 +323,12 @@ def generators_from_logdir(logdir): Returns: List of event generators for each subdirectory with event files. """ - subdirs = event_multiplexer.GetLogdirSubdirectories(logdir) + subdirs = io_wrapper.GetLogdirSubdirectories(logdir) generators = [ itertools.chain(*[ generator_from_event_file(os.path.join(subdir, f)) for f in tf.gfile.ListDirectory(subdir) - if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f)) + if io_wrapper.IsTensorFlowEventsFile(os.path.join(subdir, f)) ]) for subdir in subdirs ] return generators @@ -356,13 +356,13 @@ def get_inspection_units(logdir='', event_file='', tag=''): A list of InspectionUnit objects. """ if logdir: - subdirs = event_multiplexer.GetLogdirSubdirectories(logdir) + subdirs = io_wrapper.GetLogdirSubdirectories(logdir) inspection_units = [] for subdir in subdirs: generator = itertools.chain(*[ generator_from_event_file(os.path.join(subdir, f)) for f in tf.gfile.ListDirectory(subdir) - if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f)) + if io_wrapper.IsTensorFlowEventsFile(os.path.join(subdir, f)) ]) inspection_units.append(InspectionUnit( name=subdir, @@ -371,7 +371,7 @@ def get_inspection_units(logdir='', event_file='', tag=''): if inspection_units: print('Found event files in:\n{}\n'.format('\n'.join( [u.name for u in inspection_units]))) - elif event_accumulator.IsTensorFlowEventsFile(logdir): + elif io_wrapper.IsTensorFlowEventsFile(logdir): print( 'It seems that {} may be an event file instead of a logdir. If this ' 'is the case, use --event_file instead of --logdir to pass ' diff --git a/tensorboard/backend/event_processing/event_multiplexer.py b/tensorboard/backend/event_processing/event_multiplexer.py index 37884188c3..cdc498d4f9 100644 --- a/tensorboard/backend/event_processing/event_multiplexer.py +++ b/tensorboard/backend/event_processing/event_multiplexer.py @@ -166,7 +166,7 @@ def AddRunsFromDirectory(self, path, name=None): The `EventMultiplexer`. """ tf.logging.info('Starting AddRunsFromDirectory: %s', path) - for subdir in GetLogdirSubdirectories(path): + for subdir in io_wrapper.GetLogdirSubdirectories(path): tf.logging.info('Adding events from directory %s', subdir) rpath = os.path.relpath(subdir, path) subname = os.path.join(name, rpath) if name else rpath @@ -480,17 +480,3 @@ def GetAccumulator(self, run): """ with self._accumulators_mutex: return self._accumulators[run] - - -def GetLogdirSubdirectories(path): - """Returns subdirectories with event files on path.""" - if tf.gfile.Exists(path) and not tf.gfile.IsDirectory(path): - raise ValueError('GetLogdirSubdirectories: path exists and is not a ' - 'directory, %s' % path) - - # ListRecursively just yields nothing if the path doesn't exist. - return ( - subdir - for (subdir, files) in io_wrapper.ListRecursively(path) - if list(filter(event_accumulator.IsTensorFlowEventsFile, files)) - ) diff --git a/tensorboard/backend/event_processing/io_wrapper.py b/tensorboard/backend/event_processing/io_wrapper.py index c185f26a4f..a2635e3388 100644 --- a/tensorboard/backend/event_processing/io_wrapper.py +++ b/tensorboard/backend/event_processing/io_wrapper.py @@ -17,36 +17,182 @@ from __future__ import division from __future__ import print_function +import collections import os +import re +import six import tensorflow as tf +_ESCAPE_GLOB_CHARACTERS_REGEX = re.compile('([*?[])') + +# TODO(chihuahua): Rename this method to use camel-case for GCS (Gcs). def IsGCSPath(path): return path.startswith("gs://") +def IsCnsPath(path): + return path.startswith("/cns/") + + +def IsTensorFlowEventsFile(path): + """Check the path name to see if it is probably a TF Events file. + + Args: + path: A file path to check if it is an event file. + + Raises: + ValueError: If the path is an empty string. + + Returns: + If path is formatted like a TensorFlowEventsFile. + """ + if not path: + raise ValueError('Path must be a nonempty string') + return 'tfevents' in tf.compat.as_str_any(os.path.basename(path)) + + def ListDirectoryAbsolute(directory): """Yields all files in the given directory. The paths are absolute.""" return (os.path.join(directory, path) for path in tf.gfile.ListDirectory(directory)) -def ListRecursively(top): +def _EscapeGlobCharacters(path): + """Escapes the glob characters in a path. + + Python 3 has a glob.escape method, but python 2 lacks it, so we manually + implement this method. + + Args: + path: The absolute path to escape. + + Returns: + The escaped path string. + """ + drive, path = os.path.splitdrive(path) + return '%s%s' % (drive, _ESCAPE_GLOB_CHARACTERS_REGEX.sub(r'[\1]', path)) + + +def ListRecursivelyViaGlobbing(top): + """Recursively lists all files within the directory. + + This method does not list subdirectories (in addition to regular files), and + the file paths are all absolute. If the directory does not exist, this yields + nothing. + + This method does so by glob-ing deeper and deeper directories, ie + foo/*, foo/*/*, foo/*/*/* and so on until all files are listed. All file + paths are absolute, and this method lists subdirectories too. + + For certain file systems, Globbing via this method may prove + significantly faster than recursively walking a directory. + Specifically, file systems that implement analogs to TensorFlow's + FileSystem.GetMatchingPaths method could save costly disk reads by using + this method. However, for other file systems, this method might prove slower + because the file system performs a walk per call to glob (in which case it + might as well just perform 1 walk). + + Args: + top: A path to a directory. + + Yields: + A (dir_path, file_paths) tuple for each directory/subdirectory. + """ + current_glob_string = os.path.join(_EscapeGlobCharacters(top), '*') + level = 0 + + while True: + tf.logging.info('GlobAndListFiles: Starting to glob level %d', level) + glob = tf.gfile.Glob(current_glob_string) + tf.logging.info( + 'GlobAndListFiles: %d files glob-ed at level %d', len(glob), level) + + if not glob: + # This subdirectory level lacks files. Terminate. + return + + # Map subdirectory to a list of files. + pairs = collections.defaultdict(list) + for file_path in glob: + pairs[os.path.dirname(file_path)].append(file_path) + for dir_name, file_paths in six.iteritems(pairs): + yield (dir_name, tuple(file_paths)) + + if len(pairs) == 1: + # If at any point the glob returns files that are all in a single + # directory, replace the current globbing path with that directory as the + # literal prefix. This should improve efficiency in cases where a single + # subdir is significantly deeper than the rest of the sudirs. + current_glob_string = os.path.join(list(pairs.keys())[0], '*') + + # Iterate to the next level of subdirectories. + current_glob_string = os.path.join(current_glob_string, '*') + level += 1 + + +def ListRecursivelyViaWalking(top): """Walks a directory tree, yielding (dir_path, file_paths) tuples. For each of `top` and its subdirectories, yields a tuple containing the path to the directory and the path to each of the contained files. Note that - unlike os.Walk()/tf.gfile.Walk(), this does not list subdirectories and the - file paths are all absolute. + unlike os.Walk()/tf.gfile.Walk()/ListRecursivelyViaGlobbing, this does not + list subdirectories. The file paths are all absolute. If the directory does + not exist, this yields nothing. - If the directory does not exist, this yields nothing. + Walking may be incredibly slow on certain file systems. Args: - top: A path to a directory.. + top: A path to a directory. + Yields: - A list of (dir_path, file_paths) tuples. + A (dir_path, file_paths) tuple for each directory/subdirectory. """ for dir_path, _, filenames in tf.gfile.Walk(top): yield (dir_path, (os.path.join(dir_path, filename) for filename in filenames)) + + +def GetLogdirSubdirectories(path): + """Obtains all subdirectories with events files. + + The order of the subdirectories returned is unspecified. The internal logic + that determines order varies by scenario. + + Args: + path: The path to a directory under which to find subdirectories. + + Returns: + A tuple of absolute paths of all subdirectories each with at least 1 events + file directly within the subdirectory. + + Raises: + ValueError: If the path passed to the method exists and is not a directory. + """ + if not tf.gfile.Exists(path): + # No directory to traverse. + return () + + if not tf.gfile.IsDirectory(path): + raise ValueError('GetLogdirSubdirectories: path exists and is not a ' + 'directory, %s' % path) + + if IsGCSPath(path) or IsCnsPath(path): + # Glob-ing for files can be significantly faster than recursively + # walking through directories for some file systems. + tf.logging.info( + 'GetLogdirSubdirectories: Starting to list directories via glob-ing.') + traversal_method = ListRecursivelyViaGlobbing + else: + # For other file systems, the glob-ing based method might be slower because + # each call to glob could involve performing a recursive walk. + tf.logging.info( + 'GetLogdirSubdirectories: Starting to list directories via walking.') + traversal_method = ListRecursivelyViaWalking + + return ( + subdir + for (subdir, files) in traversal_method(path) + if any(IsTensorFlowEventsFile(f) for f in files) + ) diff --git a/tensorboard/backend/event_processing/io_wrapper_test.py b/tensorboard/backend/event_processing/io_wrapper_test.py index a6df9e346c..0621df09e8 100644 --- a/tensorboard/backend/event_processing/io_wrapper_test.py +++ b/tensorboard/backend/event_processing/io_wrapper_test.py @@ -20,6 +20,7 @@ import os import tempfile +import six import tensorflow as tf from tensorboard.backend.event_processing import io_wrapper @@ -32,113 +33,277 @@ def testIsGcsPathIsTrue(self): def testIsGcsPathIsFalse(self): self.assertFalse(io_wrapper.IsGCSPath('/tmp/foo')) + def testIsCnsPathTrue(self): + self.assertTrue(io_wrapper.IsCnsPath('/cns/foo/bar')) + + def testIsCnsPathFalse(self): + self.assertFalse(io_wrapper.IsCnsPath('/tmp/foo')) + + def testIsIsTensorFlowEventsFileTrue(self): + self.assertTrue( + io_wrapper.IsTensorFlowEventsFile( + '/logdir/events.out.tfevents.1473720042.com')) + + def testIsIsTensorFlowEventsFileFalse(self): + self.assertFalse( + io_wrapper.IsTensorFlowEventsFile('/logdir/model.ckpt')) + + def testIsIsTensorFlowEventsFileWithEmptyInput(self): + with six.assertRaisesRegex(self, + ValueError, + r'Path must be a nonempty string'): + io_wrapper.IsTensorFlowEventsFile('') + def testListDirectoryAbsolute(self): temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) + self._CreateDeepDirectoryStructure(temp_dir) - # Add a few subdirectories. - directory_names = ( + expected_files = ( 'foo', 'bar', - 'we/must/go/deeper' + 'quuz', + 'a.tfevents.1', + 'model.ckpt', + 'waldo', + ) + self.assertItemsEqual( + (os.path.join(temp_dir, f) for f in expected_files), + io_wrapper.ListDirectoryAbsolute(temp_dir)) + + def testListRecursivelyViaGlobbing(self): + temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) + self._CreateDeepDirectoryStructure(temp_dir) + expected = [ + ['', [ + 'foo', + 'bar', + 'a.tfevents.1', + 'model.ckpt', + 'quuz', + 'waldo', + ]], + ['bar', [ + 'b.tfevents.1', + 'red_herring.txt', + 'baz', + 'quux', + ]], + ['bar/baz', [ + 'c.tfevents.1', + 'd.tfevents.1', + ]], + ['bar/quux', [ + 'some_flume_output.txt', + 'some_more_flume_output.txt', + ]], + ['quuz', [ + 'e.tfevents.1', + 'garply', + ]], + ['quuz/garply', [ + 'f.tfevents.1', + 'corge', + 'grault', + ]], + ['quuz/garply/corge', [ + 'g.tfevents.1' + ]], + ['quuz/garply/grault', [ + 'h.tfevents.1', + ]], + ['waldo', [ + 'fred', + ]], + ['waldo/fred', [ + 'i.tfevents.1', + ]], + ] + for pair in expected: + # If this is not the top-level directory, prepend the high-level + # directory. + pair[0] = os.path.join(temp_dir, pair[0]) if pair[0] else temp_dir + pair[1] = [os.path.join(pair[0], f) for f in pair[1]] + self._CompareFilesPerSubdirectory( + expected, io_wrapper.ListRecursivelyViaGlobbing(temp_dir)) + + def testListRecursivelyViaGlobbingForPathWithGlobCharacters(self): + temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) + directory_names = ( + 'ba*', + 'ba*/subdirectory', + 'bar', ) for directory_name in directory_names: os.makedirs(os.path.join(temp_dir, directory_name)) - # Add a few files to the directory. file_names = ( - 'events.out.tfevents.1473720381.foo.com', - 'model.ckpt', - 'we/must_not_include_this_file_in_the_listing.txt' + 'ba*/a.tfevents.1', + 'ba*/subdirectory/b.tfevents.1', + 'bar/c.tfevents.1', ) for file_name in file_names: open(os.path.join(temp_dir, file_name), 'w').close() - expected_files = ( - 'foo', + expected = [ + ['', [ + 'a.tfevents.1', + 'subdirectory', + ]], + ['subdirectory', [ + 'b.tfevents.1', + ]], + # The contents of the bar subdirectory should be excluded from + # this listing because the * character should have been escaped. + ] + top = os.path.join(temp_dir, 'ba*') + for pair in expected: + # If this is not the top-level directory, prepend the high-level + # directory. + pair[0] = os.path.join(top, pair[0]) if pair[0] else top + pair[1] = [os.path.join(pair[0], f) for f in pair[1]] + self._CompareFilesPerSubdirectory( + expected, io_wrapper.ListRecursivelyViaGlobbing(top)) + + def testListRecursivelyViaWalking(self): + temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) + self._CreateDeepDirectoryStructure(temp_dir) + expected = [ + ['', [ + 'a.tfevents.1', + 'model.ckpt', + ]], + ['foo', []], + ['bar', [ + 'b.tfevents.1', + 'red_herring.txt', + ]], + ['bar/baz', [ + 'c.tfevents.1', + 'd.tfevents.1', + ]], + ['bar/quux', [ + 'some_flume_output.txt', + 'some_more_flume_output.txt', + ]], + ['quuz', [ + 'e.tfevents.1', + ]], + ['quuz/garply', [ + 'f.tfevents.1', + ]], + ['quuz/garply/corge', [ + 'g.tfevents.1', + ]], + ['quuz/garply/grault', [ + 'h.tfevents.1', + ]], + ['waldo', []], + ['waldo/fred', [ + 'i.tfevents.1', + ]], + ] + for pair in expected: + # If this is not the top-level directory, prepend the high-level + # directory. + pair[0] = os.path.join(temp_dir, pair[0]) if pair[0] else temp_dir + pair[1] = [os.path.join(pair[0], f) for f in pair[1]] + self._CompareFilesPerSubdirectory( + expected, io_wrapper.ListRecursivelyViaWalking(temp_dir)) + + def testGetLogdirSubdirectories(self): + temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) + self._CreateDeepDirectoryStructure(temp_dir) + # Only subdirectories that immediately contains at least 1 events + # file should be listed. + expected = [ + '', 'bar', - 'we', - 'events.out.tfevents.1473720381.foo.com', - 'model.ckpt', - ) + 'bar/baz', + 'quuz', + 'quuz/garply', + 'quuz/garply/corge', + 'quuz/garply/grault', + 'waldo/fred', + ] self.assertItemsEqual( - (os.path.join(temp_dir, f) for f in expected_files), - io_wrapper.ListDirectoryAbsolute(temp_dir)) + [(os.path.join(temp_dir, subdir) if subdir else temp_dir) + for subdir in expected], + io_wrapper.GetLogdirSubdirectories(temp_dir)) - def testListRecursivelyForNestedFiles(self): - temp_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) + def _CreateDeepDirectoryStructure(self, top_directory): + """Creates a reasonable deep structure of subdirectories with files. + Args: + top_directory: The absolute path of the top level directory in + which to create the directory structure. + """ # Add a few subdirectories. directory_names = ( + # An empty directory. 'foo', + # A directory with an events file (and a text file). 'bar', + # A deeper directory with events files. 'bar/baz', + # A non-empty subdirectory that lacks event files (should be ignored). + 'bar/quux', + # This 3-level deep set of subdirectories tests logic that replaces the + # full glob string with an absolute path prefix if there is only 1 + # subdirectory in the final mapping. + 'quuz/garply', + 'quuz/garply/corge', + 'quuz/garply/grault', + # A directory that lacks events files, but contains a subdirectory + # with events files (first level should be ignored, second level should + # be included). + 'waldo', + 'waldo/fred', ) for directory_name in directory_names: - os.makedirs(os.path.join(temp_dir, directory_name)) + os.makedirs(os.path.join(top_directory, directory_name)) # Add a few files to the directory. file_names = ( - 'events.out.tfevents.1473720381.meep.com', + 'a.tfevents.1', 'model.ckpt', - 'bar/events.out.tfevents.1473720382.bar.com', + 'bar/b.tfevents.1', 'bar/red_herring.txt', - 'bar/baz/events.out.tfevents.1473720383.baz.com', - 'bar/baz/events.out.tfevents.1473720384.baz.com', + 'bar/baz/c.tfevents.1', + 'bar/baz/d.tfevents.1', + 'bar/quux/some_flume_output.txt', + 'bar/quux/some_more_flume_output.txt', + 'quuz/e.tfevents.1', + 'quuz/garply/f.tfevents.1', + 'quuz/garply/corge/g.tfevents.1', + 'quuz/garply/grault/h.tfevents.1', + 'waldo/fred/i.tfevents.1', ) for file_name in file_names: - open(os.path.join(temp_dir, file_name), 'w').close() - - # There were 4 subdirectories in total. - listing = io_wrapper.ListRecursively(temp_dir) - directory_to_listing = { - dir: list(generator) for (dir, generator) in listing} - expected = ( - 'foo', - 'bar', - 'bar/baz' - ) - self.assertItemsEqual( - [os.path.join(temp_dir, f) for f in expected] + [temp_dir], - directory_to_listing.keys()) - - # Test for the listings of individual directories. - expected = ( - 'events.out.tfevents.1473720381.meep.com', - 'model.ckpt', - ) - self.assertItemsEqual( - (os.path.join(temp_dir, f) for f in expected), - directory_to_listing[temp_dir]) - - expected = () - self.assertItemsEqual( - (os.path.join(temp_dir, 'foo', f) for f in expected), - directory_to_listing[os.path.join(temp_dir, 'foo')]) + open(os.path.join(top_directory, file_name), 'w').close() - expected = ( - 'events.out.tfevents.1473720382.bar.com', - 'red_herring.txt', - ) - self.assertItemsEqual( - (os.path.join(temp_dir, 'bar', f) for f in expected), - directory_to_listing[os.path.join(temp_dir, 'bar')]) + def _CompareFilesPerSubdirectory(self, expected, gotten): + """Compares iterables of (subdirectory path, list of absolute paths) - expected = ( - 'events.out.tfevents.1473720383.baz.com', - 'events.out.tfevents.1473720384.baz.com', - ) + Args: + expected: The expected iterable of 2-tuples. + gotten: The gotten iterable of 2-tuples. + """ + expected_directory_to_listing = { + result[0]: list(result[1]) for result in expected} + gotten_directory_to_listing = { + result[0]: list(result[1]) for result in gotten} self.assertItemsEqual( - (os.path.join(temp_dir, 'bar/baz', f) for f in expected), - directory_to_listing[os.path.join(temp_dir, 'bar/baz')]) + expected_directory_to_listing.keys(), + gotten_directory_to_listing.keys()) - def testListRecursivelyForEmptyDirectory(self): - empty_dir = tempfile.mkdtemp(prefix=self.get_temp_dir()) - subdirectory_entries = list(io_wrapper.ListRecursively(empty_dir)) - self.assertEqual(1, len(subdirectory_entries)) + for subdirectory, expected_listing in expected_directory_to_listing.items(): + gotten_listing = gotten_directory_to_listing[subdirectory] + self.assertItemsEqual( + expected_listing, + gotten_listing, + 'Files for subdirectory %r must match. Expected %r. Got %r.' % ( + subdirectory, expected_listing, gotten_listing)) - entry = subdirectory_entries[0] - self.assertEqual(empty_dir, entry[0]) - self.assertItemsEqual((), entry[1]) if __name__ == '__main__': diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py index 23aba29814..9d8675a92a 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -18,7 +18,6 @@ from __future__ import print_function import collections -import os import threading import six @@ -27,6 +26,7 @@ from tensorboard import data_compat from tensorboard.backend.event_processing import directory_watcher from tensorboard.backend.event_processing import event_file_loader +from tensorboard.backend.event_processing import io_wrapper from tensorboard.backend.event_processing import plugin_asset_util from tensorboard.backend.event_processing import reservoir @@ -57,23 +57,6 @@ _TENSOR_RESERVOIR_KEY = "." # arbitrary -def IsTensorFlowEventsFile(path): - """Check the path name to see if it is probably a TF Events file. - - Args: - path: A file path to check if it is an event file. - - Raises: - ValueError: If the path is an empty string. - - Returns: - If path is formatted like a TensorFlowEventsFile. - """ - if not path: - raise ValueError('Path must be a nonempty string') - return 'tfevents' in tf.compat.as_str_any(os.path.basename(path)) - - class EventAccumulator(object): """An `EventAccumulator` takes an event generator, and accumulates the values. @@ -577,11 +560,13 @@ def _GeneratorFromPath(path): """Create an event generator for file or directory at given path string.""" if not path: raise ValueError('path must be a valid string') - if IsTensorFlowEventsFile(path): + if io_wrapper.IsTensorFlowEventsFile(path): return event_file_loader.EventFileLoader(path) else: return directory_watcher.DirectoryWatcher( - path, event_file_loader.EventFileLoader, IsTensorFlowEventsFile) + path, + event_file_loader.EventFileLoader, + io_wrapper.IsTensorFlowEventsFile) def _ParseFileVersion(file_version): diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer.py b/tensorboard/backend/event_processing/plugin_event_multiplexer.py index 621fe146f9..67a5e9e5b5 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer.py @@ -172,7 +172,7 @@ def AddRunsFromDirectory(self, path, name=None): The `EventMultiplexer`. """ tf.logging.info('Starting AddRunsFromDirectory: %s', path) - for subdir in GetLogdirSubdirectories(path): + for subdir in io_wrapper.GetLogdirSubdirectories(path): tf.logging.info('Adding run from directory %s', subdir) rpath = os.path.relpath(subdir, path) subname = os.path.join(name, rpath) if name else rpath @@ -432,17 +432,3 @@ def GetAccumulator(self, run): """ with self._accumulators_mutex: return self._accumulators[run] - - -def GetLogdirSubdirectories(path): - """Returns subdirectories with event files on path.""" - if tf.gfile.Exists(path) and not tf.gfile.IsDirectory(path): - raise ValueError('GetLogdirSubdirectories: path exists and is not a ' - 'directory, %s' % path) - - # ListRecursively just yields nothing if the path doesn't exist. - return ( - subdir - for (subdir, files) in io_wrapper.ListRecursively(path) - if list(filter(event_accumulator.IsTensorFlowEventsFile, files)) - )