Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions tensorboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ py_library(
visibility = ["//visibility:public"],
)

py_library(
name = "expect_absl_logging_installed",
# This is a dummy rule used as a absl-py dependency in open-source.
# We expect absl-py to already be installed on the system, e.g. via
# `pip install absl-py`
visibility = ["//visibility:public"],
)

py_library(
name = "tf_contrib_ffmpeg",
# This is a dummy rule for the open source world, which indicates
Expand Down Expand Up @@ -267,9 +275,11 @@ py_binary(
srcs = ["encode_png_benchmark.py"],
srcs_version = "PY2AND3",
deps = [
":expect_absl_logging_installed",
"//tensorboard:expect_numpy_installed",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard/util",
"//tensorboard/util:tb_logging",
"@org_pythonhosted_six",
],
)
Expand Down Expand Up @@ -304,10 +314,12 @@ py_library(
srcs_version = "PY2AND3",
visibility = ["//visibility:public"],
deps = [
":expect_absl_logging_installed",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/util",
"//tensorboard/util:platform_util",
"//tensorboard/util:tb_logging",
"@org_pythonhosted_six",
],
)
Expand Down
2 changes: 1 addition & 1 deletion tensorboard/backend/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ py_library(
"//tensorboard/backend/event_processing:db_import_multiplexer",
"//tensorboard/backend/event_processing:event_accumulator",
"//tensorboard/backend/event_processing:event_multiplexer",
"//tensorboard/compat:tensorflow",
"//tensorboard/plugins/core:core_plugin",
"//tensorboard/plugins/histogram:metadata",
"//tensorboard/plugins/image:metadata",
"//tensorboard/plugins/pr_curve:metadata",
"//tensorboard/plugins/scalar:metadata",
"//tensorboard/util:tb_logging",
"@org_pocoo_werkzeug",
"@org_pythonhosted_six",
],
Expand Down
22 changes: 12 additions & 10 deletions tensorboard/backend/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from tensorboard.plugins.image import metadata as image_metadata
from tensorboard.plugins.pr_curve import metadata as pr_curve_metadata
from tensorboard.plugins.scalar import metadata as scalar_metadata
from tensorboard.compat import tf
from tensorboard.util import tb_logging


DEFAULT_SIZE_GUIDANCE = {
Expand All @@ -75,6 +75,8 @@
# names as follows.
_VALID_PLUGIN_RE = re.compile(r'^[A-Za-z0-9_.-]+$')

logger = tb_logging.get_logger()


def tensor_size_guidance_from_flags(flags):
"""Apply user per-summary size guidance overrides."""
Expand Down Expand Up @@ -127,7 +129,7 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
# DB import mode.
if db_module != sqlite3:
raise ValueError('--db_import is only compatible with sqlite DBs')
tf.logging.info('Importing logdir into DB at %s', db_uri)
logger.info('Importing logdir into DB at %s', db_uri)
loading_multiplexer = db_import_multiplexer.DbImportMultiplexer(
db_connection_provider=db_connection_provider,
purge_orphaned_data=flags.purge_orphaned_data,
Expand Down Expand Up @@ -248,7 +250,7 @@ def __init__(self, plugins, path_prefix=''):
except Exception as e: # pylint: disable=broad-except
if type(plugin) is core_plugin.CorePlugin: # pylint: disable=unidiomatic-typecheck
raise
tf.logging.warning('Plugin %s failed. Exception: %s',
logger.warn('Plugin %s failed. Exception: %s',
plugin.plugin_name, str(e))
continue
for route, app in plugin_apps.items():
Expand Down Expand Up @@ -278,7 +280,7 @@ def _serve_plugins_listing(self, request):
start = time.time()
response[plugin.plugin_name] = plugin.is_active()
elapsed = time.time() - start
tf.logging.info(
logger.info(
'Plugin listing: is_active() for %s took %0.3f seconds',
plugin.plugin_name, elapsed)
return http_util.Respond(request, response, 'application/json')
Expand Down Expand Up @@ -307,7 +309,7 @@ class are WSGI applications.
if clean_path in self.data_applications:
return self.data_applications[clean_path](environ, start_response)
else:
tf.logging.warning('path %s not found, sending 404', clean_path)
logger.warn('path %s not found, sending 404', clean_path)
return http_util.Respond(request, 'Not found', 'text/plain', code=404)(
environ, start_response)
# pylint: enable=too-many-function-args
Expand Down Expand Up @@ -382,28 +384,28 @@ def start_reloading_multiplexer(multiplexer, path_to_run, load_interval,
def _reload():
while True:
start = time.time()
tf.logging.info('TensorBoard reload process beginning')
logger.info('TensorBoard reload process beginning')
for path, name in six.iteritems(path_to_run):
multiplexer.AddRunsFromDirectory(path, name)
tf.logging.info('TensorBoard reload process: Reload the whole Multiplexer')
logger.info('TensorBoard reload process: Reload the whole Multiplexer')
multiplexer.Reload()
duration = time.time() - start
tf.logging.info('TensorBoard done reloading. Load took %0.3f secs', duration)
logger.info('TensorBoard done reloading. Load took %0.3f secs', duration)
if load_interval == 0:
# Only load the multiplexer once. Do not continuously reload.
break
time.sleep(load_interval)

if reload_task == 'process':
tf.logging.info('Launching reload in a child process')
logger.info('Launching reload in a child process')
import multiprocessing
process = multiprocessing.Process(target=_reload, name='Reloader')
# Best-effort cleanup; on exit, the main TB parent process will attempt to
# kill all its daemonic children.
process.daemon = True
process.start()
elif reload_task in ('thread', 'auto'):
tf.logging.info('Launching reload in a daemon thread')
logger.info('Launching reload in a daemon thread')
thread = threading.Thread(target=_reload, name='Reloader')
# Make this a daemon thread, which won't block TB from exiting.
thread.daemon = True
Expand Down
8 changes: 7 additions & 1 deletion tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ py_library(
srcs_version = "PY2AND3",
deps = [
"//tensorboard/compat:tensorflow",
"//tensorboard/util:tb_logging",
"@org_pythonhosted_six",
],
)
Expand All @@ -36,6 +37,7 @@ py_library(
deps = [
":io_wrapper",
"//tensorboard/compat:tensorflow",
"//tensorboard/util:tb_logging",
],
)

Expand Down Expand Up @@ -75,6 +77,7 @@ py_library(
"//tensorboard/compat:tensorflow",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/util:platform_util",
"//tensorboard/util:tb_logging",
],
)

Expand Down Expand Up @@ -106,6 +109,7 @@ py_library(
"//tensorboard:data_compat",
"//tensorboard/compat:tensorflow",
"//tensorboard/plugins/distribution:compressor",
"//tensorboard/util:tb_logging",
],
)

Expand Down Expand Up @@ -158,7 +162,7 @@ py_library(
":directory_watcher",
":event_accumulator",
":io_wrapper",
"//tensorboard/compat:tensorflow",
"//tensorboard/util:tb_logging",
"@org_pythonhosted_six",
],
)
Expand Down Expand Up @@ -202,6 +206,7 @@ py_library(
"//tensorboard:data_compat",
"//tensorboard/compat:tensorflow",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/util:tb_logging",
"@org_pythonhosted_six",
],
)
Expand Down Expand Up @@ -230,6 +235,7 @@ py_library(
visibility = ["//visibility:public"],
deps = [
"//tensorboard/compat:tensorflow",
"//tensorboard/util:tb_logging",
"//tensorboard/util:tensor_util",
"@org_pythonhosted_six",
],
Expand Down
37 changes: 20 additions & 17 deletions tensorboard/backend/event_processing/db_import_multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
from tensorboard.backend.event_processing import sqlite_writer
from tensorboard.compat import tf
from tensorboard.compat.proto import event_pb2
from tensorboard.util import tb_logging


logger = tb_logging.get_logger()

class DbImportMultiplexer(object):
"""A loading-only `EventMultiplexer` that populates a SQLite DB.

Expand All @@ -59,7 +62,7 @@ def __init__(self,
use_import_op: If True, use TensorFlow's import_event() op for imports,
otherwise use TensorBoard's own sqlite ingestion logic.
"""
tf.logging.info('DbImportMultiplexer initializing')
logger.info('DbImportMultiplexer initializing')
self._db_connection_provider = db_connection_provider
self._purge_orphaned_data = purge_orphaned_data
self._max_reload_threads = max_reload_threads
Expand All @@ -68,20 +71,20 @@ def __init__(self,
self._run_loaders = {}

if self._purge_orphaned_data:
tf.logging.warning(
logger.warn(
'--db_import does not yet support purging orphaned data')

conn = self._db_connection_provider()
# Extract the file path of the DB from the DB connection.
rows = conn.execute('PRAGMA database_list').fetchall()
db_name_to_path = {row[1]: row[2] for row in rows}
self._db_path = db_name_to_path['main']
tf.logging.info('DbImportMultiplexer using db_path %s', self._db_path)
logger.info('DbImportMultiplexer using db_path %s', self._db_path)
# Set the DB in WAL mode so reads don't block writes.
conn.execute('PRAGMA journal_mode=wal')
conn.execute('PRAGMA synchronous=normal') # Recommended for WAL mode
sqlite_writer.initialize_schema(conn)
tf.logging.info('DbImportMultiplexer done initializing')
logger.info('DbImportMultiplexer done initializing')

def _CreateEventSink(self):
if self._use_import_op:
Expand All @@ -104,22 +107,22 @@ def AddRunsFromDirectory(self, path, name=None):
Raises:
ValueError: If the path exists and isn't a directory.
"""
tf.logging.info('Starting AddRunsFromDirectory: %s (as %s)', path, name)
logger.info('Starting AddRunsFromDirectory: %s (as %s)', path, name)
for subdir in io_wrapper.GetLogdirSubdirectories(path):
tf.logging.info('Processing directory %s', subdir)
logger.info('Processing directory %s', subdir)
if subdir not in self._run_loaders:
tf.logging.info('Creating DB loader for directory %s', subdir)
logger.info('Creating DB loader for directory %s', subdir)
names = self._get_exp_and_run_names(path, subdir, name)
experiment_name, run_name = names
self._run_loaders[subdir] = _RunLoader(
subdir=subdir,
experiment_name=experiment_name,
run_name=run_name)
tf.logging.info('Done with AddRunsFromDirectory: %s', path)
logger.info('Done with AddRunsFromDirectory: %s', path)

def Reload(self):
"""Load events from every detected run."""
tf.logging.info('Beginning DbImportMultiplexer.Reload()')
logger.info('Beginning DbImportMultiplexer.Reload()')
# Defer event sink creation until needed; this ensures it will only exist in
# the thread that calls Reload(), since DB connections must be thread-local.
if not self._event_sink:
Expand All @@ -141,11 +144,11 @@ def batch_generator():
except directory_watcher.DirectoryDeletedError:
loader_delete_queue.append(loader)
except (OSError, IOError) as e:
tf.logging.error('Unable to load run %r: %s', loader.subdir, e)
logger.error('Unable to load run %r: %s', loader.subdir, e)

num_threads = min(self._max_reload_threads, len(self._run_loaders))
if num_threads <= 1:
tf.logging.info('Importing runs serially on a single thread')
logger.info('Importing runs serially on a single thread')
for batch in batch_generator():
self._event_sink.write_batch(batch)
else:
Expand All @@ -157,7 +160,7 @@ def producer():
output_queue.put(batch)
finally:
output_queue.put(sentinel)
tf.logging.info('Starting %d threads to import runs', num_threads)
logger.info('Starting %d threads to import runs', num_threads)
for i in xrange(num_threads):
thread = threading.Thread(target=producer, name='Loader %d' % i)
thread.daemon = True
Expand All @@ -170,9 +173,9 @@ def producer():
continue
self._event_sink.write_batch(output)
for loader in loader_delete_queue:
tf.logging.warning('Deleting loader %r', loader.subdir)
logger.warn('Deleting loader %r', loader.subdir)
del self._run_loaders[loader.subdir]
tf.logging.info('Finished with DbImportMultiplexer.Reload()')
logger.info('Finished with DbImportMultiplexer.Reload()')

def _get_exp_and_run_names(self, path, subdir, experiment_name_override=None):
if experiment_name_override is not None:
Expand Down Expand Up @@ -228,7 +231,7 @@ def load_batches(self):
if len(events) >= self._BATCH_COUNT or event_bytes >= self._BATCH_BYTES:
break
elapsed = time.time() - start
tf.logging.debug('RunLoader.load_batch() yielded in %0.3f sec for %s',
logger.debug('RunLoader.load_batch() yielded in %0.3f sec for %s',
elapsed, self._subdir)
if not events:
return
Expand Down Expand Up @@ -291,7 +294,7 @@ def write_batch(self, event_batch):
for event_proto in event_batch.events:
writer_fn(event_proto)
elapsed = time.time() - start
tf.logging.debug(
logger.debug(
'ImportOpEventSink.WriteBatch() took %0.3f sec for %s events', elapsed,
len(event_batch.events))

Expand Down Expand Up @@ -319,7 +322,7 @@ def write_batch(self, event_batch):
experiment_name=event_batch.experiment_name,
run_name=event_batch.run_name)
elapsed = time.time() - start
tf.logging.debug(
logger.debug(
'SqliteWriterEventSink.WriteBatch() took %0.3f sec for %s events',
elapsed, len(event_batch.events))

Expand Down
15 changes: 9 additions & 6 deletions tensorboard/backend/event_processing/directory_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

from tensorboard.backend.event_processing import io_wrapper
from tensorboard.compat import tf
from tensorboard.util import tb_logging


logger = tb_logging.get_logger()

class DirectoryWatcher(object):
"""A DirectoryWatcher wraps a loader to load from a sequence of paths.

Expand Down Expand Up @@ -112,7 +115,7 @@ def _LoadInternal(self):

next_path = self._GetNextPath()
if not next_path:
tf.logging.info('No path found after %s', self._path)
logger.info('No path found after %s', self._path)
# Current path is empty and there are no new paths, so we're done.
return

Expand All @@ -135,7 +138,7 @@ def _LoadInternal(self):
for event in self._loader.Load():
yield event

tf.logging.info('Directory watcher advancing from %s to %s', self._path,
logger.info('Directory watcher advancing from %s to %s', self._path,
next_path)

# Advance to the next path and start over.
Expand Down Expand Up @@ -180,10 +183,10 @@ def _SetPath(self, path):
try:
# We're done with the path, so store its size.
size = tf.gfile.Stat(old_path).length
tf.logging.debug('Setting latest size of %s to %d', old_path, size)
logger.debug('Setting latest size of %s to %d', old_path, size)
self._finalized_sizes[old_path] = size
except tf.errors.OpError as e:
tf.logging.error('Unable to get size of %s: %s', old_path, e)
logger.error('Unable to get size of %s: %s', old_path, e)

self._path = path
self._loader = self._loader_factory(path)
Expand Down Expand Up @@ -232,10 +235,10 @@ def _HasOOOWrite(self, path):
old_size = self._finalized_sizes.get(path, None)
if size != old_size:
if old_size is None:
tf.logging.error('File %s created after file %s even though it\'s '
logger.error('File %s created after file %s even though it\'s '
'lexicographically earlier', path, self._path)
else:
tf.logging.error('File %s updated even though the current file is %s',
logger.error('File %s updated even though the current file is %s',
path, self._path)
return True
else:
Expand Down
Loading