From 28eb8a39f440b0b9ecb87cd1d4d657fa163aea2c Mon Sep 17 00:00:00 2001 From: William Chargin Date: Mon, 30 Nov 2020 17:27:22 -0800 Subject: [PATCH 1/3] rust: add option to kill server at stdin EOF MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: If `tensorboard` is to launch RustBoard as a subprocess rather than relying on the user to launch it concurrently, then we should try hard not to leave around a zombie server. An easy solution is an `atexit` handler in Python TensorBoard, but this doesn’t handle the case when TensorBoard crashes or gets SIGTERMed. On Linux, we can be notified when our parent dies by setting the parent-death signal (`man 2 prctl`), but this isn’t portable. On portable Unix, the child can poll its PPID to see when it changes to `1` or the PID of some subreaper, but this isn’t portable to Windows, which doesn’t update PPID when the parent dies. Windows is not my strong suit, but some web searching didn’t turn up an easy and clean solution portable to both Windows and Unix (any Windows internals like “job objects” are a non-starter, since I can’t test them). The one thing that I would expect to work everywhere is “just wait until stdin closes and then exit”. If even that doesn’t work on Windows, well, we can burn that bridge when we come to it. Test Plan: Build `bazel build //tensorboard/data/server`, then write a simple Python driver: ```python import subprocess import time server = "bazel-bin/tensorboard/data/server/server" p = subprocess.Popen( [server, "--logdir", "/tmp/nonexistent", "-v", "--die-after-stdin"], stdin=subprocess.PIPE, ) print(p.pid) time.sleep(2) ``` Run it with `python test.py`, and note that after 2 seconds the server prints “Stdin closed; exiting”. Run it again, and suspend (`^Z`) the process before it finishes sleeping. Run `kill -SIGCONT CHILD_PID` with the PID printed by the Python script to resume server execution; this is just needed because your `^Z` propagates to all processes in the group. Note that the server continues to print logs as it starts and finishes new load cycles. Then, run `fg` and wait for the sleep to complete, and note that the server again exits, as desired. wchargin-branch: rust-die-after-stdin wchargin-source: ad925e07a2106ee37d268e32c1b997606326ae99 --- tensorboard/data/server/main.rs | 53 ++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/tensorboard/data/server/main.rs b/tensorboard/data/server/main.rs index 772147d0c7..4fc74e6274 100644 --- a/tensorboard/data/server/main.rs +++ b/tensorboard/data/server/main.rs @@ -15,9 +15,11 @@ limitations under the License. use clap::Clap; use log::{debug, info, LevelFilter}; +use std::io::Read; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::str::FromStr; +use std::thread; use std::time::{Duration, Instant}; use tokio::net::TcpListener; use tonic::transport::Server; @@ -62,6 +64,16 @@ struct Opts { /// Use verbose output (-vv for very verbose output) #[clap(long = "verbose", short, parse(from_occurrences))] verbosity: u32, + + /// Kill this server once stdin is closed + /// + /// While this server is running, read stdin to end of file and then kill the server. Used to + /// portably ensure that the server exits when the parent process dies, even due to a crash. + /// Don't set this if stdin is connected to a tty and the process will be backgrounded, since + /// then the server will receive `SIGTTIN` and its process will be stopped (in the `SIGSTOP` + /// sense) but not killed. + #[clap(long)] + die_after_stdin: bool, } /// A duration in seconds. @@ -89,6 +101,13 @@ async fn main() -> Result<(), Box> { }); debug!("Parsed options: {:?}", opts); + if opts.die_after_stdin { + thread::Builder::new() + .name("StdinWatcher".to_string()) + .spawn(die_after_stdin) + .expect("failed to spawn stdin watcher thread"); + } + let addr = SocketAddr::new(opts.host, opts.port); let listener = TcpListener::bind(addr).await?; let bound = listener.local_addr()?; @@ -98,17 +117,20 @@ async fn main() -> Result<(), Box> { // leaks the outer commit structure (of constant size), not the pointers to the actual data. let commit: &'static Commit = Box::leak(Box::new(Commit::new())); - std::thread::spawn(move || { - let mut loader = LogdirLoader::new(commit, opts.logdir); - loop { - info!("Starting load cycle"); - let start = Instant::now(); - loader.reload(); - let end = Instant::now(); - info!("Finished load cycle ({:?})", end - start); - std::thread::sleep(opts.reload_interval.duration()); - } - }); + thread::Builder::new() + .name("Reloader".to_string()) + .spawn(move || { + let mut loader = LogdirLoader::new(commit, opts.logdir); + loop { + info!("Starting load cycle"); + let start = Instant::now(); + loader.reload(); + let end = Instant::now(); + info!("Finished load cycle ({:?})", end - start); + thread::sleep(opts.reload_interval.duration()); + } + }) + .expect("failed to spawn reloader thread"); let handler = DataProviderHandler { commit }; Server::builder() @@ -125,3 +147,12 @@ fn init_logging(default_log_level: LevelFilter) { use env_logger::{Builder, Env}; Builder::from_env(Env::default().default_filter_or(default_log_level.to_string())).init(); } + +/// Locks stdin and reads it to EOF, then exits the process. +fn die_after_stdin() { + let stdin = std::io::stdin(); + let stdin_lock = stdin.lock(); + for _ in stdin_lock.bytes() {} + info!("Stdin closed; exiting"); + std::process::exit(0); +} From 8c2baa795964b80ab25581c18b9dceda125bd7ea Mon Sep 17 00:00:00 2001 From: William Chargin Date: Mon, 30 Nov 2020 23:18:31 -0800 Subject: [PATCH 2/3] program: add gRPC data ingester MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: TensorBoard can now start RustBoard if `--load_fast` is given at runtime and `--define=link_data_server=true` was given at build time. The default configuration still has no Rust code, so our Pip packages are still portable. When we actually deploy this, we can distribute the Rust binary in a separate Pip package that TensorBoard talks to via [entry points], but the UX can stay the same: “just add `--load_fast`.” [entry points]: https://packaging.python.org/specifications/entry-points/ Test Plan: Test in the following configurations: - with just a `--logdir`, everything works as normal; - with `--define=link_data_server=true -- --logdir ... --load_fast`, the subprocess is spawned and cleaned up on a successful exit or a SIGKILL to TensorBoard, and INFO logs are shown iff `--verbosity 0` is specified; - with `--logdir ... --load_fast` but no `--define`, TensorBoard fails to start the server and prints a message before exiting; and - with `--grpc_data_provider localhost:6806`, TensorBoard connects to an existing server without needing `--logdir` or `--define`. To test the “data server died” case, comment out the `--logdir=%s` flag, which will cause the server to fail with a usage message. That message should appear in the logs. This also works after syncing into Google, in all relevant configurations; see and . wchargin-branch: grpc-ingester wchargin-source: 57ed441a34d496055b4b75e6cd935f63b8ae2e09 --- tensorboard/BUILD | 21 ++- tensorboard/backend/event_processing/BUILD | 2 + .../backend/event_processing/data_ingester.py | 16 +- .../event_processing/data_ingester_test.py | 8 + tensorboard/data/BUILD | 37 +++++ tensorboard/data/server/main.rs | 3 +- tensorboard/data/server_ingester.py | 137 ++++++++++++++++++ tensorboard/data/server_ingester_test.py | 65 +++++++++ tensorboard/ingester.py | 73 ++++++++++ tensorboard/ingester_test.py | 21 +++ tensorboard/plugins/core/core_plugin.py | 8 + tensorboard/program.py | 49 ++++--- 12 files changed, 408 insertions(+), 32 deletions(-) create mode 100644 tensorboard/data/server_ingester.py create mode 100644 tensorboard/data/server_ingester_test.py create mode 100644 tensorboard/ingester.py create mode 100644 tensorboard/ingester_test.py diff --git a/tensorboard/BUILD b/tensorboard/BUILD index bcfe1848ab..a3d5d289cd 100644 --- a/tensorboard/BUILD +++ b/tensorboard/BUILD @@ -130,6 +130,23 @@ py_test( ], ) +py_library( + name = "ingester", + srcs = ["ingester.py"], + srcs_version = "PY3", +) + +py_test( + name = "ingester_test", + size = "small", + srcs = ["ingester_test.py"], + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":ingester", + ], +) + py_library( name = "errors", srcs = ["errors.py"], @@ -212,15 +229,15 @@ py_library( srcs = ["program.py"], srcs_version = "PY3", deps = [ + ":ingester", ":manager", ":version", "//tensorboard:expect_absl_flags_installed", "//tensorboard:expect_absl_logging_installed", - "//tensorboard:expect_grpc_installed", "//tensorboard/backend:application", "//tensorboard/backend/event_processing:data_ingester", "//tensorboard/backend/event_processing:event_file_inspector", - "//tensorboard/data:grpc_provider", + "//tensorboard/data:server_ingester", "//tensorboard/util:argparse_util", "@org_pocoo_werkzeug", "@org_pythonhosted_six", diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index 41143c99e3..a7af0a11a5 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -37,6 +37,7 @@ py_library( ":data_provider", ":event_multiplexer", ":tag_types", + "//tensorboard:ingester", "//tensorboard/plugins/audio:metadata", "//tensorboard/plugins/histogram:metadata", "//tensorboard/plugins/image:metadata", @@ -53,6 +54,7 @@ py_test( srcs_version = "PY3", deps = [ ":data_ingester", + "//tensorboard:ingester", "//tensorboard:test", "@org_pythonhosted_mock", ], diff --git a/tensorboard/backend/event_processing/data_ingester.py b/tensorboard/backend/event_processing/data_ingester.py index f57e8078c6..124f3c2837 100644 --- a/tensorboard/backend/event_processing/data_ingester.py +++ b/tensorboard/backend/event_processing/data_ingester.py @@ -21,6 +21,7 @@ import six +from tensorboard import ingester from tensorboard.backend.event_processing import data_provider from tensorboard.backend.event_processing import plugin_event_multiplexer from tensorboard.backend.event_processing import tag_types @@ -48,18 +49,13 @@ logger = tb_logging.get_logger() -class LocalDataIngester(object): +class LocalDataIngester(ingester.DataIngester): """Data ingestion implementation to use when running locally.""" def __init__(self, flags): - """Initializes a `LocalDataIngester` from `flags`. - - Args: - flags: An argparse.Namespace containing TensorBoard CLI flags. - - Returns: - The new `LocalDataIngester`. - """ + logdir = flags.logdir or flags.logdir_spec + if not logdir: + raise ingester.NotApplicableError("No logdir given") tensor_size_guidance = dict(DEFAULT_TENSOR_SIZE_GUIDANCE) tensor_size_guidance.update(flags.samples_per_plugin) self._multiplexer = plugin_event_multiplexer.EventMultiplexer( @@ -70,7 +66,7 @@ def __init__(self, flags): event_file_active_filter=_get_event_file_active_filter(flags), ) self._data_provider = data_provider.MultiplexerDataProvider( - self._multiplexer, flags.logdir or flags.logdir_spec + self._multiplexer, logdir ) self._reload_interval = flags.reload_interval self._reload_task = flags.reload_task diff --git a/tensorboard/backend/event_processing/data_ingester_test.py b/tensorboard/backend/event_processing/data_ingester_test.py index 13a34cd6f1..3347f65fe9 100644 --- a/tensorboard/backend/event_processing/data_ingester_test.py +++ b/tensorboard/backend/event_processing/data_ingester_test.py @@ -25,6 +25,7 @@ except ImportError: import mock # pylint: disable=unused-import +from tensorboard import ingester as ingester_lib from tensorboard import test as tb_test from tensorboard.backend.event_processing import data_ingester @@ -59,6 +60,13 @@ def __init__( self.generic_data = generic_data +class NoLogdirTest(tb_test.TestCase): + def test(self): + flags = FakeFlags(logdir="", logdir_spec="") + with self.assertRaises(ingester_lib.NotApplicableError): + data_ingester.LocalDataIngester(flags) + + class GetEventFileActiveFilterTest(tb_test.TestCase): def testDisabled(self): flags = FakeFlags("logdir", reload_multifile=False) diff --git a/tensorboard/data/BUILD b/tensorboard/data/BUILD index 8d16bfe45f..dd818d9547 100644 --- a/tensorboard/data/BUILD +++ b/tensorboard/data/BUILD @@ -64,6 +64,43 @@ py_test( ], ) +config_setting( + name = "link_data_server", + define_values = {"link_data_server": "true"}, +) + +py_library( + name = "server_ingester", + srcs = ["server_ingester.py"], + data = select({ + ":link_data_server": ["//tensorboard/data/server"], + "//conditions:default": [], + }), + srcs_version = "PY3", + deps = [ + ":grpc_provider", + "//tensorboard:expect_grpc_installed", + "//tensorboard:ingester", + "//tensorboard/util:tb_logging", + ], +) + +py_test( + name = "server_ingester_test", + size = "small", + srcs = ["server_ingester_test.py"], + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":grpc_provider", + ":server_ingester", + "//tensorboard:context", + "//tensorboard:expect_grpc_installed", + "//tensorboard:ingester", + "//tensorboard:test", + ], +) + py_library( name = "grpc_provider", srcs = ["grpc_provider.py"], diff --git a/tensorboard/data/server/main.rs b/tensorboard/data/server/main.rs index 4fc74e6274..de801efc2c 100644 --- a/tensorboard/data/server/main.rs +++ b/tensorboard/data/server/main.rs @@ -111,7 +111,8 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::new(opts.host, opts.port); let listener = TcpListener::bind(addr).await?; let bound = listener.local_addr()?; - eprintln!("listening on {:?}", bound); + // This magic string is read by the TensorBoard data ingester to determine the port. + println!("listening on port {} ({})", bound.port(), bound); // Leak the commit object, since the Tonic server must have only 'static references. This only // leaks the outer commit structure (of constant size), not the pointers to the actual data. diff --git a/tensorboard/data/server_ingester.py b/tensorboard/data/server_ingester.py new file mode 100644 index 0000000000..ebea19168c --- /dev/null +++ b/tensorboard/data/server_ingester.py @@ -0,0 +1,137 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Provides data ingestion logic backed by a gRPC server.""" + +import logging +import os +import re +import select +import subprocess +import time + +import grpc + +from tensorboard import ingester +from tensorboard.data import grpc_provider +from tensorboard.util import tb_logging + + +logger = tb_logging.get_logger() + +_RE_PORT = re.compile("listening on port ([0-9]+)") + + +class ServerDataIngester(ingester.DataIngester): + """Data ingestion implementation to use when against a gRPC server.""" + + def __init__(self, flags): + address = flags.grpc_data_provider + if address: + # Connect to an existing server at the given address. + self._data_provider = _make_provider(address) + return + if not flags.load_fast: + raise ingester.NotApplicableError( + "Neither `--load_fast` nor `--grpc_data_provider` given" + ) + self._flags = flags + self._data_provider = None + + @property + def data_provider(self): + if self._data_provider is None: + raise RuntimeError("Must call `start` first") + return self._data_provider + + @property + def deprecated_multiplexer(self): + return None + + def start(self): + if self._data_provider: + return + server_binary = os.path.join( + os.path.dirname(__file__), "server", "server" + ) + logger.info("Data server binary: %s", server_binary) + if not os.path.exists(server_binary): + raise RuntimeError( + "TensorBoard data server not found. This mode is experimental " + "and not supported in release builds. If building from source, " + "pass --define=link_data_server=true." + ) + args = [ + server_binary, + "--logdir=%s" % (self._flags.logdir,), + "--port=0", + "--die-after-stdin", + ] + if logger.isEnabledFor(logging.INFO): + args.append("--verbose") + + logger.warn("Spawning data server: %r", args) + popen = subprocess.Popen( + args, stdin=subprocess.PIPE, stdout=subprocess.PIPE + ) + # Stash stdin to avoid calling its destructor: on Windows, this + # is a `subprocess.Handle` that closes itself in `__del__`, + # which would cause the data server to shut down. (This is not + # documented; you have to read CPython source to figure it out.) + # We want that to happen at end of process, but not before. + self._stdin_handle = popen.stdin # stash to avoid stdin being closed + + port = None + # The server only needs about 10 microseconds to spawn on my machine, + # but give a few orders of magnitude of padding, and then poll. + time.sleep(0.01) + poller = select.poll() + poller.register(popen.stdout, select.POLLIN) + for i in range(20): + if popen.poll() is not None: + raise RuntimeError( + "Data server exited with %d; check stderr for details" + % popen.poll() + ) + logger.info("Polling for data server port (attempt %d)", i) + if poller.poll(0): + line = next(popen.stdout) + logger.info("Data server stdout line: %r", line) + match = _RE_PORT.match(line.decode(errors="replace")) + if match: + port = int(match.group(1)) + break + time.sleep(0.5) + if port is None: + raise RuntimeError( + "Timed out while waiting for data server to start. " + "It may still be running as pid %d." % popen.pid + ) + + addr = "localhost:%d" % port + self._data_provider = _make_provider(addr) + logger.info( + "Established connection to data server at pid %d via %s", + popen.pid, + addr, + ) + + +def _make_provider(addr): + options = [ + ("grpc.max_receive_message_length", 1024 * 1024 * 256), + ] + channel = grpc.insecure_channel(addr, options=options) + stub = grpc_provider.make_stub(channel) + return grpc_provider.GrpcDataProvider(addr, stub) diff --git a/tensorboard/data/server_ingester_test.py b/tensorboard/data/server_ingester_test.py new file mode 100644 index 0000000000..9195a51a2d --- /dev/null +++ b/tensorboard/data/server_ingester_test.py @@ -0,0 +1,65 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Unit tests for `tensorboard.data.server_ingester`.""" + +import argparse +from unittest import mock + +import grpc + +from tensorboard import context +from tensorboard import ingester as ingester_lib +from tensorboard import test as tb_test +from tensorboard.data import server_ingester +from tensorboard.data import grpc_provider + + +def make_flags(**kwargs): + kwargs.setdefault("grpc_data_provider", "") + kwargs.setdefault("logdir", "") + kwargs.setdefault("logdir_spec", "") + kwargs.setdefault("load_fast", False) + return argparse.Namespace(**kwargs) + + +class ServerDataIngesterTest(tb_test.TestCase): + def setUp(self): + super().setUp() + self.enter_context( + mock.patch.object(grpc, "insecure_channel", autospec=True) + ) + + def test_fixed_address(self): + flags = make_flags(grpc_data_provider="localhost:6806") + ingester = server_ingester.ServerDataIngester(flags) + ingester.start() + ctx = context.RequestContext() + self.assertIsInstance( + ingester.data_provider, grpc_provider.GrpcDataProvider + ) + + def test_empty_flags(self): + flags = make_flags(logdir="/some/logdir") + with self.assertRaises(ingester_lib.NotApplicableError): + server_ingester.ServerDataIngester(flags) + + def test_load_fast(self): + flags = make_flags(logdir="/some/logdir", load_fast=True) + ingester = server_ingester.ServerDataIngester(flags) + # Not much more that we can easily test here. + + +if __name__ == "__main__": + tb_test.main() diff --git a/tensorboard/ingester.py b/tensorboard/ingester.py new file mode 100644 index 0000000000..e526376b36 --- /dev/null +++ b/tensorboard/ingester.py @@ -0,0 +1,73 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Abstraction for data ingestion logic.""" + +import abc + + +class NotApplicableError(ValueError): + """This kind of ingester cannot be constructed from the given flags.""" + + +class DataIngester(metaclass=abc.ABCMeta): + """Link between a data source and a data provider. + + A data ingester starts a reload operation in the background and + provides a data provider as a view. + """ + + @abc.abstractmethod + def __init__(self, flags): + """Creates a data ingester from flags. + + Args: + flags: An argparse.Namespace containing TensorBoard CLI flags. + + Returns: + The new data ingester. + + Raises: + NotApplicableError: If this kind of data ingester is not + applicable for an invocation with the given flags. This is + not necessarily a fatal error; it indicates that the next + ingester kind should be tried. + """ + pass + + @property + @abc.abstractmethod + def data_provider(self): + """Returns a `DataProvider`. + + It may be an error to dereference this before `start` is called. + """ + pass + + @property + @abc.abstractmethod + def deprecated_multiplexer(self): + """Returns a `PluginEventMultiplexer`, or `None` if not applicable. + + It may be an error to dereference this before `start` is called. + """ + pass + + @abc.abstractmethod + def start(self): + """Starts ingesting data based on the ingester flag configuration. + + Must only be called once. + """ + pass diff --git a/tensorboard/ingester_test.py b/tensorboard/ingester_test.py new file mode 100644 index 0000000000..6396240dd3 --- /dev/null +++ b/tensorboard/ingester_test.py @@ -0,0 +1,21 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Unit tests for `tensorboard.ingester`.""" + +from tensorboard import ingester + +# This is a pure abstract class. There's really nothing to test other than that +# it executes successfully. +del ingester diff --git a/tensorboard/plugins/core/core_plugin.py b/tensorboard/plugins/core/core_plugin.py index 8199e6a1f4..eca74e39fd 100644 --- a/tensorboard/plugins/core/core_plugin.py +++ b/tensorboard/plugins/core/core_plugin.py @@ -293,6 +293,14 @@ def define_flags(self, parser): % DEFAULT_PORT, ) + parser.add_argument( + "--load_fast", + action="store_true", + help="""\ +Experimental. Use a data server to accelerate loading. +""", + ) + parser.add_argument( "--grpc_data_provider", metavar="PORT", diff --git a/tensorboard/program.py b/tensorboard/program.py index 9cf59855e7..e38310ec72 100644 --- a/tensorboard/program.py +++ b/tensorboard/program.py @@ -48,18 +48,18 @@ from absl import flags as absl_flags from absl.flags import argparse_flags import absl.logging -import grpc import six from six.moves import urllib from six.moves import xrange # pylint: disable=redefined-builtin from werkzeug import serving +from tensorboard import ingester as ingester_lib from tensorboard import manager from tensorboard import version from tensorboard.backend import application -from tensorboard.backend.event_processing import data_ingester +from tensorboard.backend.event_processing import data_ingester as local_ingester from tensorboard.backend.event_processing import event_file_inspector as efi -from tensorboard.data import grpc_provider +from tensorboard.data import server_ingester from tensorboard.plugins.core import core_plugin from tensorboard.util import argparse_util from tensorboard.util import tb_logging @@ -409,24 +409,35 @@ def _fix_mime_types(self): mimetypes.add_type("font/woff2", ".woff2") mimetypes.add_type("text/html", ".html") - def _make_grpc_provider(self, addr): - options = [ - ("grpc.max_receive_message_length", 1024 * 1024 * 256), - ] - channel = grpc.insecure_channel(addr, options=options) - stub = grpc_provider.make_stub(channel) - provider = grpc_provider.GrpcDataProvider(addr, stub) - return provider - def _make_data_provider(self): """Returns `(data_provider, deprecated_multiplexer)`.""" - grpc_addr = self.flags.grpc_data_provider - if grpc_addr: - return (self._make_grpc_provider(grpc_addr), None) - else: - ingester = data_ingester.LocalDataIngester(self.flags) - ingester.start() - return (ingester.data_provider, ingester.deprecated_multiplexer) + # Ingesters to try. Order matters: `LocalDataIngester` will + # succeed whenever `--logdir` is set, but other ingesters may + # also be able to handle such cases. + ingester_types = [ + server_ingester.ServerDataIngester, + local_ingester.LocalDataIngester, + ] + errors = [] + ingester = None + for ty in ingester_types: + try: + ingester = ty(self.flags) + break + except ingester_lib.NotApplicableError as e: + errors.append(e) + if ingester is None: + # This shouldn't happen: the flag validation is intended to + # ensure that one of the stock ingesters can succeed. Still, + # handle it nicely. + raise RuntimeError( + "Failed to load data:\n%s" + % "\n".join(" - %s" % e for e in errors) + ) + # Stash ingester so that it can avoid GCing Windows file handles. + self._ingester = ingester + ingester.start() + return (ingester.data_provider, ingester.deprecated_multiplexer) def _make_server(self): """Constructs the TensorBoard WSGI app and instantiates the server.""" From 90bfc1d3d1688bda563f47c0b7e63fabbca501e5 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 2 Dec 2020 17:05:38 -0800 Subject: [PATCH 3/3] [update patch: grpc-ingester] wchargin-branch: grpc-ingester wchargin-source: fba72285f3ec0868039033a74bc326a80ad45de6 --- tensorboard/BUILD | 18 ---- tensorboard/backend/event_processing/BUILD | 3 +- .../backend/event_processing/data_ingester.py | 15 ++- .../event_processing/data_ingester_test.py | 8 -- tensorboard/data/BUILD | 24 ++++- tensorboard/{ => data}/ingester.py | 37 +------- tensorboard/{ => data}/ingester_test.py | 2 +- tensorboard/data/server/DEVELOPMENT.md | 43 ++++++++- tensorboard/data/server/main.rs | 63 ++++++++++--- tensorboard/data/server_ingester.py | 90 ++++++++++-------- tensorboard/data/server_ingester_test.py | 93 +++++++++++++------ tensorboard/program.py | 41 ++++---- 12 files changed, 260 insertions(+), 177 deletions(-) rename tensorboard/{ => data}/ingester.py (56%) rename tensorboard/{ => data}/ingester_test.py (95%) diff --git a/tensorboard/BUILD b/tensorboard/BUILD index a3d5d289cd..088508413e 100644 --- a/tensorboard/BUILD +++ b/tensorboard/BUILD @@ -130,23 +130,6 @@ py_test( ], ) -py_library( - name = "ingester", - srcs = ["ingester.py"], - srcs_version = "PY3", -) - -py_test( - name = "ingester_test", - size = "small", - srcs = ["ingester_test.py"], - srcs_version = "PY3", - tags = ["support_notf"], - deps = [ - ":ingester", - ], -) - py_library( name = "errors", srcs = ["errors.py"], @@ -229,7 +212,6 @@ py_library( srcs = ["program.py"], srcs_version = "PY3", deps = [ - ":ingester", ":manager", ":version", "//tensorboard:expect_absl_flags_installed", diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index a7af0a11a5..b2faa47637 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -37,7 +37,7 @@ py_library( ":data_provider", ":event_multiplexer", ":tag_types", - "//tensorboard:ingester", + "//tensorboard/data:ingester", "//tensorboard/plugins/audio:metadata", "//tensorboard/plugins/histogram:metadata", "//tensorboard/plugins/image:metadata", @@ -54,7 +54,6 @@ py_test( srcs_version = "PY3", deps = [ ":data_ingester", - "//tensorboard:ingester", "//tensorboard:test", "@org_pythonhosted_mock", ], diff --git a/tensorboard/backend/event_processing/data_ingester.py b/tensorboard/backend/event_processing/data_ingester.py index 124f3c2837..d091448b2e 100644 --- a/tensorboard/backend/event_processing/data_ingester.py +++ b/tensorboard/backend/event_processing/data_ingester.py @@ -21,10 +21,10 @@ import six -from tensorboard import ingester from tensorboard.backend.event_processing import data_provider from tensorboard.backend.event_processing import plugin_event_multiplexer from tensorboard.backend.event_processing import tag_types +from tensorboard.data import ingester from tensorboard.plugins.audio import metadata as audio_metadata from tensorboard.plugins.histogram import metadata as histogram_metadata from tensorboard.plugins.image import metadata as image_metadata @@ -53,9 +53,14 @@ class LocalDataIngester(ingester.DataIngester): """Data ingestion implementation to use when running locally.""" def __init__(self, flags): - logdir = flags.logdir or flags.logdir_spec - if not logdir: - raise ingester.NotApplicableError("No logdir given") + """Initializes a `LocalDataIngester` from `flags`. + + Args: + flags: An argparse.Namespace containing TensorBoard CLI flags. + + Returns: + The new `LocalDataIngester`. + """ tensor_size_guidance = dict(DEFAULT_TENSOR_SIZE_GUIDANCE) tensor_size_guidance.update(flags.samples_per_plugin) self._multiplexer = plugin_event_multiplexer.EventMultiplexer( @@ -66,7 +71,7 @@ def __init__(self, flags): event_file_active_filter=_get_event_file_active_filter(flags), ) self._data_provider = data_provider.MultiplexerDataProvider( - self._multiplexer, logdir + self._multiplexer, flags.logdir or flags.logdir_spec ) self._reload_interval = flags.reload_interval self._reload_task = flags.reload_task diff --git a/tensorboard/backend/event_processing/data_ingester_test.py b/tensorboard/backend/event_processing/data_ingester_test.py index 3347f65fe9..13a34cd6f1 100644 --- a/tensorboard/backend/event_processing/data_ingester_test.py +++ b/tensorboard/backend/event_processing/data_ingester_test.py @@ -25,7 +25,6 @@ except ImportError: import mock # pylint: disable=unused-import -from tensorboard import ingester as ingester_lib from tensorboard import test as tb_test from tensorboard.backend.event_processing import data_ingester @@ -60,13 +59,6 @@ def __init__( self.generic_data = generic_data -class NoLogdirTest(tb_test.TestCase): - def test(self): - flags = FakeFlags(logdir="", logdir_spec="") - with self.assertRaises(ingester_lib.NotApplicableError): - data_ingester.LocalDataIngester(flags) - - class GetEventFileActiveFilterTest(tb_test.TestCase): def testDisabled(self): flags = FakeFlags("logdir", reload_multifile=False) diff --git a/tensorboard/data/BUILD b/tensorboard/data/BUILD index dd818d9547..5637760e84 100644 --- a/tensorboard/data/BUILD +++ b/tensorboard/data/BUILD @@ -64,6 +64,23 @@ py_test( ], ) +py_library( + name = "ingester", + srcs = ["ingester.py"], + srcs_version = "PY3", +) + +py_test( + name = "ingester_test", + size = "small", + srcs = ["ingester_test.py"], + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":ingester", + ], +) + config_setting( name = "link_data_server", define_values = {"link_data_server": "true"}, @@ -79,24 +96,23 @@ py_library( srcs_version = "PY3", deps = [ ":grpc_provider", + ":ingester", "//tensorboard:expect_grpc_installed", - "//tensorboard:ingester", "//tensorboard/util:tb_logging", ], ) py_test( name = "server_ingester_test", - size = "small", + size = "medium", # time.sleep + timeout = "short", srcs = ["server_ingester_test.py"], srcs_version = "PY3", tags = ["support_notf"], deps = [ ":grpc_provider", ":server_ingester", - "//tensorboard:context", "//tensorboard:expect_grpc_installed", - "//tensorboard:ingester", "//tensorboard:test", ], ) diff --git a/tensorboard/ingester.py b/tensorboard/data/ingester.py similarity index 56% rename from tensorboard/ingester.py rename to tensorboard/data/ingester.py index e526376b36..230bd4b8a6 100644 --- a/tensorboard/ingester.py +++ b/tensorboard/data/ingester.py @@ -17,10 +17,6 @@ import abc -class NotApplicableError(ValueError): - """This kind of ingester cannot be constructed from the given flags.""" - - class DataIngester(metaclass=abc.ABCMeta): """Link between a data source and a data provider. @@ -28,24 +24,6 @@ class DataIngester(metaclass=abc.ABCMeta): provides a data provider as a view. """ - @abc.abstractmethod - def __init__(self, flags): - """Creates a data ingester from flags. - - Args: - flags: An argparse.Namespace containing TensorBoard CLI flags. - - Returns: - The new data ingester. - - Raises: - NotApplicableError: If this kind of data ingester is not - applicable for an invocation with the given flags. This is - not necessarily a fatal error; it indicates that the next - ingester kind should be tried. - """ - pass - @property @abc.abstractmethod def data_provider(self): @@ -55,18 +33,13 @@ def data_provider(self): """ pass - @property - @abc.abstractmethod - def deprecated_multiplexer(self): - """Returns a `PluginEventMultiplexer`, or `None` if not applicable. - - It may be an error to dereference this before `start` is called. - """ - pass - @abc.abstractmethod def start(self): - """Starts ingesting data based on the ingester flag configuration. + """Starts ingesting data. + + This may start a background thread or process, and will return + once communication with that task is established. It won't block + forever as data is reloaded. Must only be called once. """ diff --git a/tensorboard/ingester_test.py b/tensorboard/data/ingester_test.py similarity index 95% rename from tensorboard/ingester_test.py rename to tensorboard/data/ingester_test.py index 6396240dd3..d9271ca55e 100644 --- a/tensorboard/ingester_test.py +++ b/tensorboard/data/ingester_test.py @@ -14,7 +14,7 @@ # ============================================================================== """Unit tests for `tensorboard.ingester`.""" -from tensorboard import ingester +from tensorboard.data import ingester # This is a pure abstract class. There's really nothing to test other than that # it executes successfully. diff --git a/tensorboard/data/server/DEVELOPMENT.md b/tensorboard/data/server/DEVELOPMENT.md index 3433be9548..98a9e92ca1 100644 --- a/tensorboard/data/server/DEVELOPMENT.md +++ b/tensorboard/data/server/DEVELOPMENT.md @@ -36,6 +36,45 @@ Rust source files in your editor. For editor setup, consult [ra-docs]: https://rust-analyzer.github.io/ +## Running under TensorBoard + +You can point TensorBoard at a data server in two ways: start the server +yourself and give TensorBoard an address, or tell TensorBoard to start the +server as a subprocess. + +To connect to an existing server, pass `--grpc_data_provider ADDRESS`, where the +address is like `localhost:6806`. Thus: + +``` +bazel run -c opt //tensorboard -- --grpc_data_provider localhost:6806 +``` + +You don’t have to pass a `--logdir` if you do this, but you do have to +concurrently run `//tensorboard/data/server` (say, in the background, or in a +separate shell). You can also swap out the data server whenever you want without +restarting TensorBoard; new RPCs will transparently reconnect. The server +doesn’t have to be running when TensorBoard starts. + +To tell TensorBoard to start the server as a subprocess, build with +`--define=link_data_server=true` and pass `--load_fast` to TensorBoard along +with a normal `--logdir`. Thus: + +``` +bazel run -c opt --define=link_data_server=true //tensorboard -- \ + --load_fast --logdir ~/tensorboard_data/mnist/ --bind_all --verbosity 0 +``` + +This is an easier one-shot solution, but requires a `--define` flag, offers less +flexibility over the flags to the data server, and requires restarting +TensorBoard if you want to restart the data server (though that’s not usually a +big deal). The data server will automatically shut down when TensorBoard exits +for any reason. + +As an alternative to `--define=link_data_server=true`, you can set the +`TENSORBOARD_DATA_SERVER_BINARY` environment variable to the path to a data +server binary, and pass `--load_fast`. If running with `bazel run`, this should +be an absolute path. + ## Adding third-party dependencies Rust dependencies are usually hosted on [crates.io]. We use [`cargo-raze`][raze] @@ -51,8 +90,8 @@ dependency: package on . 3. Change into the `tensorboard/data/server/` directory. 4. Run `cargo fetch` to update `Cargo.lock`. Running this before `cargo raze` - ensures that the `http_archive` workspace rules in the generated build - files will have `sha256` checksums. + ensures that the `http_archive` workspace rules in the generated build files + will have `sha256` checksums. 5. Run `cargo raze` to update `third_party/rust/...`. This will add a new target like `//third_party/rust:rand`. Manually build it diff --git a/tensorboard/data/server/main.rs b/tensorboard/data/server/main.rs index de801efc2c..fc4af5d968 100644 --- a/tensorboard/data/server/main.rs +++ b/tensorboard/data/server/main.rs @@ -14,10 +14,11 @@ limitations under the License. ==============================================================================*/ use clap::Clap; -use log::{debug, info, LevelFilter}; -use std::io::Read; +use log::{debug, error, info, LevelFilter}; +use std::fs::File; +use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::thread; use std::time::{Duration, Instant}; @@ -74,6 +75,15 @@ struct Opts { /// sense) but not killed. #[clap(long)] die_after_stdin: bool, + + /// Write bound port to this file + /// + /// Once a server socket is opened, write the port on which it's listening to the file at this + /// path. Useful with `--port 0`. Port will be written as ASCII decimal followed by a newline + /// (e.g., "6806\n"). If the server fails to start, this file may not be written at all. If the + /// port file is specified but cannot be written, the server will die. + #[clap(long)] + port_file: Option, } /// A duration in seconds. @@ -111,8 +121,21 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::new(opts.host, opts.port); let listener = TcpListener::bind(addr).await?; let bound = listener.local_addr()?; - // This magic string is read by the TensorBoard data ingester to determine the port. - println!("listening on port {} ({})", bound.port(), bound); + eprintln!("listening on {:?}", bound); + + if let Some(port_file) = opts.port_file { + let port = bound.port(); + if let Err(e) = write_port_file(&port_file, port) { + error!( + "Failed to write port \"{}\" to {}: {}", + port, + port_file.display(), + e + ); + std::process::exit(1); + } + info!("Wrote port \"{}\" to {}", port, port_file.display()); + } // Leak the commit object, since the Tonic server must have only 'static references. This only // leaks the outer commit structure (of constant size), not the pointers to the actual data. @@ -120,15 +143,19 @@ async fn main() -> Result<(), Box> { thread::Builder::new() .name("Reloader".to_string()) - .spawn(move || { - let mut loader = LogdirLoader::new(commit, opts.logdir); - loop { - info!("Starting load cycle"); - let start = Instant::now(); - loader.reload(); - let end = Instant::now(); - info!("Finished load cycle ({:?})", end - start); - thread::sleep(opts.reload_interval.duration()); + .spawn({ + let logdir = opts.logdir; + let reload_interval = opts.reload_interval; + move || { + let mut loader = LogdirLoader::new(commit, logdir); + loop { + info!("Starting load cycle"); + let start = Instant::now(); + loader.reload(); + let end = Instant::now(); + info!("Finished load cycle ({:?})", end - start); + thread::sleep(reload_interval.duration()); + } } }) .expect("failed to spawn reloader thread"); @@ -157,3 +184,11 @@ fn die_after_stdin() { info!("Stdin closed; exiting"); std::process::exit(0); } + +/// Writes `port` to file `path` as an ASCII decimal followed by newline. +fn write_port_file(path: &Path, port: u16) -> std::io::Result<()> { + let mut f = File::create(path)?; + writeln!(f, "{}", port)?; + f.sync_all()?; + Ok(()) +} diff --git a/tensorboard/data/server_ingester.py b/tensorboard/data/server_ingester.py index ebea19168c..147ee1c188 100644 --- a/tensorboard/data/server_ingester.py +++ b/tensorboard/data/server_ingester.py @@ -14,40 +14,47 @@ # ============================================================================== """Provides data ingestion logic backed by a gRPC server.""" +import errno import logging import os -import re -import select import subprocess +import tempfile import time import grpc -from tensorboard import ingester from tensorboard.data import grpc_provider +from tensorboard.data import ingester from tensorboard.util import tb_logging logger = tb_logging.get_logger() -_RE_PORT = re.compile("listening on port ([0-9]+)") +# If this environment variable is non-empty, it will be used as the path to the +# data server binary rather than using a bundled version. +_ENV_DATA_SERVER_BINARY = "TENSORBOARD_DATA_SERVER_BINARY" -class ServerDataIngester(ingester.DataIngester): - """Data ingestion implementation to use when against a gRPC server.""" +class ExistingServerDataIngester(ingester.DataIngester): + """Connect to an already running gRPC server.""" - def __init__(self, flags): - address = flags.grpc_data_provider - if address: - # Connect to an existing server at the given address. - self._data_provider = _make_provider(address) - return - if not flags.load_fast: - raise ingester.NotApplicableError( - "Neither `--load_fast` nor `--grpc_data_provider` given" - ) - self._flags = flags + def __init__(self, address): + self._data_provider = _make_provider(address) + + @property + def data_provider(self): + return self._data_provider + + def start(self): + pass + + +class SubprocessServerDataIngester(ingester.DataIngester): + """Start a new data server as a subprocess.""" + + def __init__(self, logdir): self._data_provider = None + self._logdir = logdir @property def data_provider(self): @@ -55,16 +62,14 @@ def data_provider(self): raise RuntimeError("Must call `start` first") return self._data_provider - @property - def deprecated_multiplexer(self): - return None - def start(self): if self._data_provider: return - server_binary = os.path.join( - os.path.dirname(__file__), "server", "server" - ) + server_binary = os.environ.get(_ENV_DATA_SERVER_BINARY) + if not server_binary: + server_binary = os.path.join( + os.path.dirname(__file__), "server", "server" + ) logger.info("Data server binary: %s", server_binary) if not os.path.exists(server_binary): raise RuntimeError( @@ -72,19 +77,22 @@ def start(self): "and not supported in release builds. If building from source, " "pass --define=link_data_server=true." ) + + tmpdir = tempfile.TemporaryDirectory(prefix="tensorboard_data_server_") + port_file_path = os.path.join(tmpdir.name, "port") + args = [ server_binary, - "--logdir=%s" % (self._flags.logdir,), + "--logdir=%s" % (self._logdir,), "--port=0", + "--port-file=%s" % (port_file_path,), "--die-after-stdin", ] if logger.isEnabledFor(logging.INFO): args.append("--verbose") - logger.warn("Spawning data server: %r", args) - popen = subprocess.Popen( - args, stdin=subprocess.PIPE, stdout=subprocess.PIPE - ) + logger.info("Spawning data server: %r", args) + popen = subprocess.Popen(args, stdin=subprocess.PIPE) # Stash stdin to avoid calling its destructor: on Windows, this # is a `subprocess.Handle` that closes itself in `__del__`, # which would cause the data server to shut down. (This is not @@ -96,8 +104,6 @@ def start(self): # The server only needs about 10 microseconds to spawn on my machine, # but give a few orders of magnitude of padding, and then poll. time.sleep(0.01) - poller = select.poll() - poller.register(popen.stdout, select.POLLIN) for i in range(20): if popen.poll() is not None: raise RuntimeError( @@ -105,13 +111,18 @@ def start(self): % popen.poll() ) logger.info("Polling for data server port (attempt %d)", i) - if poller.poll(0): - line = next(popen.stdout) - logger.info("Data server stdout line: %r", line) - match = _RE_PORT.match(line.decode(errors="replace")) - if match: - port = int(match.group(1)) - break + port_file_contents = None + try: + with open(port_file_path) as infile: + port_file_contents = infile.read() + except OSError as e: + if e.errno != errno.ENOENT: + raise + logger.info("Port file contents: %r", port_file_contents) + if (port_file_contents or "").endswith("\n"): + port = int(port_file_contents) + break + # Else, not done writing yet. time.sleep(0.5) if port is None: raise RuntimeError( @@ -132,6 +143,7 @@ def _make_provider(addr): options = [ ("grpc.max_receive_message_length", 1024 * 1024 * 256), ] - channel = grpc.insecure_channel(addr, options=options) + creds = grpc.local_channel_credentials() + channel = grpc.secure_channel(addr, creds, options=options) stub = grpc_provider.make_stub(channel) return grpc_provider.GrpcDataProvider(addr, stub) diff --git a/tensorboard/data/server_ingester_test.py b/tensorboard/data/server_ingester_test.py index 9195a51a2d..9e9b4fb773 100644 --- a/tensorboard/data/server_ingester_test.py +++ b/tensorboard/data/server_ingester_test.py @@ -14,51 +14,88 @@ # ============================================================================== """Unit tests for `tensorboard.data.server_ingester`.""" -import argparse +import os +import subprocess +import threading +import time from unittest import mock import grpc -from tensorboard import context -from tensorboard import ingester as ingester_lib from tensorboard import test as tb_test -from tensorboard.data import server_ingester from tensorboard.data import grpc_provider +from tensorboard.data import server_ingester -def make_flags(**kwargs): - kwargs.setdefault("grpc_data_provider", "") - kwargs.setdefault("logdir", "") - kwargs.setdefault("logdir_spec", "") - kwargs.setdefault("load_fast", False) - return argparse.Namespace(**kwargs) +class ExistingServerDataIngesterTest(tb_test.TestCase): + def test(self): + addr = "localhost:6806" + with mock.patch.object(grpc, "secure_channel", autospec=True): + ingester = server_ingester.ExistingServerDataIngester(addr) + ingester.start() + self.assertIsInstance( + ingester.data_provider, grpc_provider.GrpcDataProvider + ) -class ServerDataIngesterTest(tb_test.TestCase): - def setUp(self): - super().setUp() +class SubprocessServerDataIngesterTest(tb_test.TestCase): + def test(self): + # Create a fake server binary so that the `os.path.exists` check + # passes. + fake_binary = os.path.join(self.get_temp_dir(), "server") + with open(fake_binary, "wb"): + pass self.enter_context( - mock.patch.object(grpc, "insecure_channel", autospec=True) + mock.patch.dict( + os.environ, + {server_ingester._ENV_DATA_SERVER_BINARY: fake_binary}, + ) ) - def test_fixed_address(self): - flags = make_flags(grpc_data_provider="localhost:6806") - ingester = server_ingester.ServerDataIngester(flags) - ingester.start() - ctx = context.RequestContext() + real_popen = subprocess.Popen + port_file_box = [None] # value of `--port-file` to be stashed here + + # Stub out `subprocess.Popen` to write the port file. + def fake_popen(subprocess_args, *args, **kwargs): + def target(): + time.sleep(0.2) # wait one cycle + for arg in subprocess_args: + port_file_prefix = "--port-file=" + if not arg.startswith(port_file_prefix): + continue + port_file = arg[len(port_file_prefix) :] + port_file_box[0] = port_file + with open(port_file, "w") as outfile: + outfile.write("23456\n") + + result = mock.create_autospec(real_popen, instance=True) + result.stdin = mock.Mock() + result.poll = lambda: None + result.pid = 789 + threading.Thread(target=target).start() + return result + + with mock.patch.object(subprocess, "Popen", wraps=fake_popen) as popen: + with mock.patch.object(grpc, "secure_channel", autospec=True) as sc: + logdir = "/tmp/logs" + ingester = server_ingester.SubprocessServerDataIngester(logdir) + ingester.start() self.assertIsInstance( ingester.data_provider, grpc_provider.GrpcDataProvider ) - def test_empty_flags(self): - flags = make_flags(logdir="/some/logdir") - with self.assertRaises(ingester_lib.NotApplicableError): - server_ingester.ServerDataIngester(flags) - - def test_load_fast(self): - flags = make_flags(logdir="/some/logdir", load_fast=True) - ingester = server_ingester.ServerDataIngester(flags) - # Not much more that we can easily test here. + expected_args = [ + fake_binary, + "--logdir=/tmp/logs", + "--port=0", + "--port-file=%s" % port_file_box[0], + "--die-after-stdin", + "--verbose", # logging is enabled in tests + ] + popen.assert_called_once_with(expected_args, stdin=subprocess.PIPE) + sc.assert_called_once_with( + "localhost:23456", mock.ANY, options=mock.ANY + ) if __name__ == "__main__": diff --git a/tensorboard/program.py b/tensorboard/program.py index e38310ec72..7e2c0c4b65 100644 --- a/tensorboard/program.py +++ b/tensorboard/program.py @@ -53,7 +53,6 @@ from six.moves import xrange # pylint: disable=redefined-builtin from werkzeug import serving -from tensorboard import ingester as ingester_lib from tensorboard import manager from tensorboard import version from tensorboard.backend import application @@ -411,33 +410,27 @@ def _fix_mime_types(self): def _make_data_provider(self): """Returns `(data_provider, deprecated_multiplexer)`.""" - # Ingesters to try. Order matters: `LocalDataIngester` will - # succeed whenever `--logdir` is set, but other ingesters may - # also be able to handle such cases. - ingester_types = [ - server_ingester.ServerDataIngester, - local_ingester.LocalDataIngester, - ] - errors = [] - ingester = None - for ty in ingester_types: - try: - ingester = ty(self.flags) - break - except ingester_lib.NotApplicableError as e: - errors.append(e) - if ingester is None: - # This shouldn't happen: the flag validation is intended to - # ensure that one of the stock ingesters can succeed. Still, - # handle it nicely. - raise RuntimeError( - "Failed to load data:\n%s" - % "\n".join(" - %s" % e for e in errors) + flags = self.flags + if flags.grpc_data_provider: + ingester = server_ingester.ExistingServerDataIngester( + flags.grpc_data_provider + ) + elif flags.load_fast: + ingester = server_ingester.SubprocessServerDataIngester( + flags.logdir ) + else: + ingester = local_ingester.LocalDataIngester(flags) + # Stash ingester so that it can avoid GCing Windows file handles. + # (See comment in `SubprocessServerDataIngester.start` for details.) self._ingester = ingester + ingester.start() - return (ingester.data_provider, ingester.deprecated_multiplexer) + deprecated_multiplexer = None + if isinstance(ingester, local_ingester.LocalDataIngester): + deprecated_multiplexer = ingester.deprecated_multiplexer + return (ingester.data_provider, deprecated_multiplexer) def _make_server(self): """Constructs the TensorBoard WSGI app and instantiates the server."""