diff --git a/tensorboard/backend/application.py b/tensorboard/backend/application.py index 6b1d2929c4..30513fea81 100644 --- a/tensorboard/backend/application.py +++ b/tensorboard/backend/application.py @@ -14,8 +14,7 @@ # ============================================================================== """TensorBoard WSGI Application Logic. -TensorBoardApplication constructs TensorBoard as a WSGI application. -It handles serving static assets, and implements TensorBoard data APIs. +Provides TensorBoardWSGIApp for building a TensorBoard WSGI app. """ from __future__ import absolute_import @@ -107,38 +106,79 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider): :type plugin_loaders: list[base_plugin.TBLoader] :rtype: TensorBoardWSGI """ - event_file_active_filter = _get_event_file_active_filter(flags) - multiplexer = event_multiplexer.EventMultiplexer( - size_guidance=DEFAULT_SIZE_GUIDANCE, - tensor_size_guidance=tensor_size_guidance_from_flags(flags), - purge_orphaned_data=flags.purge_orphaned_data, - max_reload_threads=flags.max_reload_threads, - event_file_active_filter=event_file_active_filter) - if flags.generic_data == 'false': - data_provider = None - else: - data_provider = event_data_provider.MultiplexerDataProvider(multiplexer) - loading_multiplexer = multiplexer + data_provider = None + multiplexer = None reload_interval = flags.reload_interval - db_uri = flags.db - db_connection_provider = None - # For DB import mode, create a DB file if we weren't given one. - if flags.db_import and not flags.db: - tmpdir = tempfile.mkdtemp(prefix='tbimport') - atexit.register(shutil.rmtree, tmpdir) - db_uri = 'sqlite:%s/tmp.sqlite' % tmpdir if flags.db_import: # DB import mode. - logger.info('Importing logdir into DB at %s', db_uri) + db_uri = flags.db + # Create a temporary DB file if we weren't given one. + if not db_uri: + tmpdir = tempfile.mkdtemp(prefix='tbimport') + atexit.register(shutil.rmtree, tmpdir) + db_uri = 'sqlite:%s/tmp.sqlite' % tmpdir db_connection_provider = create_sqlite_connection_provider(db_uri) - loading_multiplexer = db_import_multiplexer.DbImportMultiplexer( + logger.info('Importing logdir into DB at %s', db_uri) + multiplexer = db_import_multiplexer.DbImportMultiplexer( + db_uri=db_uri, db_connection_provider=db_connection_provider, purge_orphaned_data=flags.purge_orphaned_data, max_reload_threads=flags.max_reload_threads) elif flags.db: # DB read-only mode, never load event logs. reload_interval = -1 - db_connection_provider = create_sqlite_connection_provider(db_uri) + db_connection_provider = create_sqlite_connection_provider(flags.db) + multiplexer = _DbModeMultiplexer(flags.db, db_connection_provider) + else: + # Regular logdir loading mode. + multiplexer = event_multiplexer.EventMultiplexer( + size_guidance=DEFAULT_SIZE_GUIDANCE, + tensor_size_guidance=tensor_size_guidance_from_flags(flags), + purge_orphaned_data=flags.purge_orphaned_data, + max_reload_threads=flags.max_reload_threads, + event_file_active_filter=_get_event_file_active_filter(flags)) + if flags.generic_data != 'false': + data_provider = event_data_provider.MultiplexerDataProvider(multiplexer) + + if reload_interval >= 0: + # We either reload the multiplexer once when TensorBoard starts up, or we + # continuously reload the multiplexer. + path_to_run = parse_event_files_spec(flags.logdir) + start_reloading_multiplexer( + multiplexer, path_to_run, reload_interval, flags.reload_task) + return TensorBoardWSGIApp( + flags, plugin_loaders, data_provider, assets_zip_provider, multiplexer) + + +def TensorBoardWSGIApp( + flags, + plugins, + data_provider=None, + assets_zip_provider=None, + deprecated_multiplexer=None): + """Constructs a TensorBoard WSGI app from plugins and data providers. + + Args: + flags: An argparse.Namespace containing TensorBoard CLI flags. + plugins: A list of TBLoader subclasses for the plugins to load. + assets_zip_provider: See TBContext documentation for more information. + data_provider: Instance of `tensorboard.data.provider.DataProvider`. May + be `None` if `flags.generic_data` is set to `"false"` in which case + `deprecated_multiplexer` must be passed instead. + deprecated_multiplexer: Optional `plugin_event_multiplexer.EventMultiplexer` + to use for any plugins not yet enabled for the DataProvider API. + Required if the data_provider argument is not passed. + + Returns: + A WSGI application that implements the TensorBoard backend. + """ + db_uri = None + db_connection_provider = None + if isinstance( + deprecated_multiplexer, + (db_import_multiplexer.DbImportMultiplexer, _DbModeMultiplexer)): + db_uri = deprecated_multiplexer.db_uri + db_connection_provider = deprecated_multiplexer.db_connection_provider plugin_name_to_instance = {} context = base_plugin.TBContext( data_provider=data_provider, @@ -146,25 +186,18 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider): db_uri=db_uri, flags=flags, logdir=flags.logdir, - multiplexer=multiplexer, + multiplexer=deprecated_multiplexer, assets_zip_provider=assets_zip_provider, plugin_name_to_instance=plugin_name_to_instance, window_title=flags.window_title) - plugins = [] - for loader in plugin_loaders: + tbplugins = [] + for loader in plugins: plugin = loader.load(context) if plugin is None: continue - plugins.append(plugin) + tbplugins.append(plugin) plugin_name_to_instance[plugin.plugin_name] = plugin - - if reload_interval >= 0: - # We either reload the multiplexer once when TensorBoard starts up, or we - # continuously reload the multiplexer. - path_to_run = parse_event_files_spec(flags.logdir) - start_reloading_multiplexer( - loading_multiplexer, path_to_run, reload_interval, flags.reload_task) - return TensorBoardWSGI(plugins, flags.path_prefix) + return TensorBoardWSGI(tbplugins, flags.path_prefix) class TensorBoardWSGI(object): @@ -531,3 +564,40 @@ def _get_event_file_active_filter(flags): if inactive_secs < 0: return lambda timestamp: True return lambda timestamp: timestamp + inactive_secs >= time.time() + + +class _DbModeMultiplexer(event_multiplexer.EventMultiplexer): + """Shim EventMultiplexer to use when in read-only DB mode. + + In read-only DB mode, the EventMultiplexer is nonfunctional - there is no + logdir to reload, and the data is all exposed via SQL. This class represents + the do-nothing EventMultiplexer for that purpose, which serves only as a + conduit for DB-related parameters. + + The load APIs raise exceptions if called, and the read APIs always + return empty results. + """ + def __init__(self, db_uri, db_connection_provider): + """Constructor for `_DbModeMultiplexer`. + + Args: + db_uri: A URI to the database file in use. + db_connection_provider: Provider function for creating a DB connection. + """ + logger.info('_DbModeMultiplexer initializing for %s', db_uri) + super(_DbModeMultiplexer, self).__init__() + self.db_uri = db_uri + self.db_connection_provider = db_connection_provider + logger.info('_DbModeMultiplexer done initializing') + + def AddRun(self, path, name=None): + """Unsupported.""" + raise NotImplementedError() + + def AddRunsFromDirectory(self, path, name=None): + """Unsupported.""" + raise NotImplementedError() + + def Reload(self): + """Unsupported.""" + raise NotImplementedError() diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index bad6db279e..97bc8a4a69 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -262,6 +262,7 @@ py_library( deps = [ ":directory_watcher", ":event_file_loader", + ":event_multiplexer", ":io_wrapper", ":sqlite_writer", "//tensorboard:data_compat", diff --git a/tensorboard/backend/event_processing/db_import_multiplexer.py b/tensorboard/backend/event_processing/db_import_multiplexer.py index d4909ef885..968a06ea0f 100644 --- a/tensorboard/backend/event_processing/db_import_multiplexer.py +++ b/tensorboard/backend/event_processing/db_import_multiplexer.py @@ -31,6 +31,7 @@ from tensorboard.backend.event_processing import directory_watcher from tensorboard.backend.event_processing import event_file_loader from tensorboard.backend.event_processing import io_wrapper +from tensorboard.backend.event_processing import plugin_event_multiplexer from tensorboard.backend.event_processing import sqlite_writer from tensorboard.compat import tf from tensorboard.compat.proto import event_pb2 @@ -39,19 +40,24 @@ logger = tb_logging.get_logger() -class DbImportMultiplexer(object): + +class DbImportMultiplexer(plugin_event_multiplexer.EventMultiplexer): """A loading-only `EventMultiplexer` that populates a SQLite DB. - This EventMultiplexer only loads data; it provides no read APIs. + This EventMultiplexer only loads data; the read APIs always return empty + results, since all data is accessed instead via SQL against the + db_connection_provider wrapped by this multiplexer. """ def __init__(self, + db_uri, db_connection_provider, purge_orphaned_data, max_reload_threads): """Constructor for `DbImportMultiplexer`. Args: + db_uri: A URI to the database file in use. db_connection_provider: Provider function for creating a DB connection. purge_orphaned_data: Whether to discard any events that were "orphaned" by a TensorFlow restart. @@ -59,8 +65,10 @@ def __init__(self, to reload runs. Each thread reloads one run at a time. If not provided, reloads runs serially (one after another). """ - logger.info('DbImportMultiplexer initializing') - self._db_connection_provider = db_connection_provider + logger.info('DbImportMultiplexer initializing for %s', db_uri) + super(DbImportMultiplexer, self).__init__() + self.db_uri = db_uri + self.db_connection_provider = db_connection_provider self._purge_orphaned_data = purge_orphaned_data self._max_reload_threads = max_reload_threads self._event_sink = None @@ -70,13 +78,17 @@ def __init__(self, logger.warn( '--db_import does not yet support purging orphaned data') - conn = self._db_connection_provider() + conn = self.db_connection_provider() # Set the DB in WAL mode so reads don't block writes. conn.execute('PRAGMA journal_mode=wal') conn.execute('PRAGMA synchronous=normal') # Recommended for WAL mode sqlite_writer.initialize_schema(conn) logger.info('DbImportMultiplexer done initializing') + def AddRun(self, path, name=None): + """Unsupported; instead use AddRunsFromDirectory.""" + raise NotImplementedError("Unsupported; use AddRunsFromDirectory()") + def AddRunsFromDirectory(self, path, name=None): """Load runs from a directory; recursively walks subdirectories. @@ -111,7 +123,7 @@ def Reload(self): # Defer event sink creation until needed; this ensures it will only exist in # the thread that calls Reload(), since DB connections must be thread-local. if not self._event_sink: - self._event_sink = _SqliteWriterEventSink(self._db_connection_provider) + self._event_sink = _SqliteWriterEventSink(self.db_connection_provider) # Use collections.deque() for speed when we don't need blocking since it # also has thread-safe appends/pops. loader_queue = collections.deque(six.itervalues(self._run_loaders)) diff --git a/tensorboard/backend/event_processing/db_import_multiplexer_test.py b/tensorboard/backend/event_processing/db_import_multiplexer_test.py index d025ff0a4b..4a43dd3fb8 100644 --- a/tensorboard/backend/event_processing/db_import_multiplexer_test.py +++ b/tensorboard/backend/event_processing/db_import_multiplexer_test.py @@ -45,6 +45,7 @@ def setUp(self): db_file_name = os.path.join(self.get_temp_dir(), 'db') self.db_connection_provider = lambda: sqlite3.connect(db_file_name) self.multiplexer = db_import_multiplexer.DbImportMultiplexer( + db_uri='sqlite:' + db_file_name, db_connection_provider=self.db_connection_provider, purge_orphaned_data=False, max_reload_threads=1) @@ -150,6 +151,14 @@ def test_manual_name(self): self.assertEqual(self._get_runs(), [os.path.join('some', 'nested', 'name'), os.path.join('some', 'nested', 'name')]) + def test_empty_read_apis(self): + path = self.get_temp_dir() + add_event(path) + self.assertEmpty(self.multiplexer.Runs()) + self.multiplexer.AddRunsFromDirectory(path) + self.multiplexer.Reload() + self.assertEmpty(self.multiplexer.Runs()) + if __name__ == '__main__': tf.test.main()