Skip to content
Merged
21 changes: 21 additions & 0 deletions tensorboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ py_test(
],
)

py_test(
name = "manager_e2e_test",
size = "large", # spawns subprocesses, sleeps, makes requests to localhost
timeout = "short", # about 15 seconds on my machine
# On Python 2, this test fails about 0.5% of the time when run with
# high parallelism; TensorBoard subprocess time out instead of
# launching successfully.
flaky = True,
srcs = ["manager_e2e_test.py"],
srcs_version = "PY2AND3",
visibility = ["//tensorboard:internal"],
deps = [
":manager",
"//tensorboard:expect_tensorflow_installed",
"@org_pythonhosted_six",
],
data = [
":tensorboard",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required? Is it because manager does not declare dependency on :tensorboard or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it’s required. This isn’t a Python dependency; it’s a dependency on
the built tensorboard(1) binary, which we execute as a subprocess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was assuming that it was indirectly part of :manager's data. Don't see the test directly invoking the binary so I thought it was a bit weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:manager invokes whatever tensorboard(1) is on the system path.
:manager can’t depend on :tensorboard, because :tensorboard
depends on it! But the test can depend on :tensorboard and add that to
the system path so that :manager picks up the binary:

    # Add our Bazel-provided `tensorboard` to the system path.
    tensorboard_binary_dir = os.path.realpath("./tensorboard/")
    path_environ = {
        "PATH": os.pathsep.join((tensorboard_binary_dir, os.environ["PATH"])),
    }
    path_environ_patcher = mock.patch.dict(os.environ, path_environ)
    path_environ_patcher.start()

It doesn’t directly invoke it, but it does directly consume it (by
adding it to the path as listed above).

],
)

py_library(
name = "program",
srcs = ["program.py"],
Expand Down
134 changes: 134 additions & 0 deletions tensorboard/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import errno
import json
import os
import subprocess
import tempfile
import time

import six

Expand Down Expand Up @@ -304,3 +306,135 @@ def get_all():
else:
results.append(info)
return results


# The following four types enumerate the possible return values of the
# `start` function.

# Indicates that a call to `start` was compatible with an existing
# TensorBoard process, which can be reused according to the provided
# info.
StartReused = collections.namedtuple("StartReused", ("info",))

# Indicates that a call to `start` successfully launched a new
# TensorBoard process, which is available with the provided info.
StartLaunched = collections.namedtuple("StartLaunched", ("info",))

# Indicates that a call to `start` tried to launch a new TensorBoard
# instance, but the subprocess exited with the given exit code and
# output streams. (If the contents of the output streams are no longer
# available---e.g., because the user has emptied /tmp/---then the
# corresponding values will be `None`.)
StartFailed = collections.namedtuple(
"StartFailed",
(
"exit_code", # int, as `Popen.returncode` (negative for signal)
"stdout", # str, or `None` if the stream could not be read
"stderr", # str, or `None` if the stream could not be read
),
)

# Indicates that a call to `start` launched a TensorBoard process, but
# that process neither exited nor wrote its info file within the allowed
# timeout period. The process may still be running under the included
# PID.
StartTimedOut = collections.namedtuple("StartTimedOut", ("pid",))


def start(arguments, timeout=datetime.timedelta(seconds=10)):
"""Start a new TensorBoard instance, or reuse a compatible one.

If the cache key determined by the provided arguments and the current
working directory (see `cache_key`) matches the cache key of a running
TensorBoard process (see `get_all`), that process will be reused.

Otherwise, a new TensorBoard process will be spawned with the provided
arguments, using the `tensorboard` binary from the system path.

Args:
arguments: List of strings to be passed as arguments to
`tensorboard`. (If you have a raw command-line string, see
`shlex.split`.)
timeout: `datetime.timedelta` object describing how long to wait for
the subprocess to initialize a TensorBoard server and write its
`TensorboardInfo` file. If the info file is not written within
this time period, `start` will assume that the subprocess is stuck
in a bad state, and will give up on waiting for it and return a
`StartTimedOut` result. Note that in such a case the subprocess
will not be killed. Default value is 10 seconds.

Returns:
A `StartReused`, `StartLaunched`, `StartFailed`, or `StartTimedOut`
object.
"""
match = _find_matching_instance(
cache_key(
working_directory=os.getcwd(),
arguments=arguments,
configure_kwargs={},
),
)
if match:
return StartReused(info=match)

(stdout_fd, stdout_path) = tempfile.mkstemp(prefix=".tensorboard-stdout-")
(stderr_fd, stderr_path) = tempfile.mkstemp(prefix=".tensorboard-stderr-")
start_time = datetime.datetime.now()
try:
p = subprocess.Popen(
["tensorboard"] + arguments,
stdout=stdout_fd,
stderr=stderr_fd,
)
finally:
os.close(stdout_fd)
os.close(stderr_fd)

poll_interval_seconds = 0.5
end_time = start_time + timeout
while datetime.datetime.now() < end_time:
time.sleep(poll_interval_seconds)
subprocess_result = p.poll()
if subprocess_result is not None:
return StartFailed(
exit_code=subprocess_result,
stdout=_maybe_read_file(stdout_path),
stderr=_maybe_read_file(stderr_path),
)
for info in get_all():
if info.pid == p.pid and info.start_time >= start_time:
return StartLaunched(info=info)
else:
return StartTimedOut(pid=p.pid)


def _find_matching_instance(cache_key):
"""Find a running TensorBoard instance compatible with the cache key.

Returns:
A `TensorboardInfo` object, or `None` if none matches the cache key.
"""
infos = get_all()
candidates = [info for info in infos if info.cache_key == cache_key]
for candidate in sorted(candidates, key=lambda x: x.port):
# TODO(@wchargin): Check here that the provided port is still live.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No action required: what do you mean here? The port can be occupied by other program. Do you mean check whether TensorBoard is alive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, check whether TensorBoard is alive. If the TensorBoard process is
killed ungracefully (e.g., with SIGKILL or SIGQUIT), then it won’t get a
chance to clean up its info file, so the files in this directory may
be out of date. We can easily check whether the server is alive by just
sending an HTTP request to its /data/logdir.

return candidate
return None


def _maybe_read_file(filename):
"""Read the given file, if it exists.

Args:
filename: A path to a file.

Returns:
A string containing the file contents, or `None` if the file does
not exist.
"""
try:
with open(filename) as infile:
return infile.read()
except IOError as e:
if e.errno == errno.ENOENT:
return None
Loading