diff --git a/tensorboard/BUILD b/tensorboard/BUILD index 87c38d6101..94dd1b2cc7 100644 --- a/tensorboard/BUILD +++ b/tensorboard/BUILD @@ -216,11 +216,10 @@ py_library( ":version", "//tensorboard:expect_absl_flags_installed", "//tensorboard:expect_absl_logging_installed", - "//tensorboard:expect_grpc_installed", "//tensorboard/backend:application", "//tensorboard/backend/event_processing:data_ingester", "//tensorboard/backend/event_processing:event_file_inspector", - "//tensorboard/data:grpc_provider", + "//tensorboard/data:server_ingester", "//tensorboard/util:argparse_util", "@org_pocoo_werkzeug", "@org_pythonhosted_six", diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index 41143c99e3..b2faa47637 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -37,6 +37,7 @@ py_library( ":data_provider", ":event_multiplexer", ":tag_types", + "//tensorboard/data:ingester", "//tensorboard/plugins/audio:metadata", "//tensorboard/plugins/histogram:metadata", "//tensorboard/plugins/image:metadata", diff --git a/tensorboard/backend/event_processing/data_ingester.py b/tensorboard/backend/event_processing/data_ingester.py index f57e8078c6..d091448b2e 100644 --- a/tensorboard/backend/event_processing/data_ingester.py +++ b/tensorboard/backend/event_processing/data_ingester.py @@ -24,6 +24,7 @@ from tensorboard.backend.event_processing import data_provider from tensorboard.backend.event_processing import plugin_event_multiplexer from tensorboard.backend.event_processing import tag_types +from tensorboard.data import ingester from tensorboard.plugins.audio import metadata as audio_metadata from tensorboard.plugins.histogram import metadata as histogram_metadata from tensorboard.plugins.image import metadata as image_metadata @@ -48,7 +49,7 @@ logger = tb_logging.get_logger() -class LocalDataIngester(object): +class LocalDataIngester(ingester.DataIngester): """Data ingestion implementation to use when running locally.""" def __init__(self, flags): diff --git a/tensorboard/data/BUILD b/tensorboard/data/BUILD index 8d16bfe45f..5637760e84 100644 --- a/tensorboard/data/BUILD +++ b/tensorboard/data/BUILD @@ -64,6 +64,59 @@ py_test( ], ) +py_library( + name = "ingester", + srcs = ["ingester.py"], + srcs_version = "PY3", +) + +py_test( + name = "ingester_test", + size = "small", + srcs = ["ingester_test.py"], + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":ingester", + ], +) + +config_setting( + name = "link_data_server", + define_values = {"link_data_server": "true"}, +) + +py_library( + name = "server_ingester", + srcs = ["server_ingester.py"], + data = select({ + ":link_data_server": ["//tensorboard/data/server"], + "//conditions:default": [], + }), + srcs_version = "PY3", + deps = [ + ":grpc_provider", + ":ingester", + "//tensorboard:expect_grpc_installed", + "//tensorboard/util:tb_logging", + ], +) + +py_test( + name = "server_ingester_test", + size = "medium", # time.sleep + timeout = "short", + srcs = ["server_ingester_test.py"], + srcs_version = "PY3", + tags = ["support_notf"], + deps = [ + ":grpc_provider", + ":server_ingester", + "//tensorboard:expect_grpc_installed", + "//tensorboard:test", + ], +) + py_library( name = "grpc_provider", srcs = ["grpc_provider.py"], diff --git a/tensorboard/data/ingester.py b/tensorboard/data/ingester.py new file mode 100644 index 0000000000..230bd4b8a6 --- /dev/null +++ b/tensorboard/data/ingester.py @@ -0,0 +1,46 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Abstraction for data ingestion logic.""" + +import abc + + +class DataIngester(metaclass=abc.ABCMeta): + """Link between a data source and a data provider. + + A data ingester starts a reload operation in the background and + provides a data provider as a view. + """ + + @property + @abc.abstractmethod + def data_provider(self): + """Returns a `DataProvider`. + + It may be an error to dereference this before `start` is called. + """ + pass + + @abc.abstractmethod + def start(self): + """Starts ingesting data. + + This may start a background thread or process, and will return + once communication with that task is established. It won't block + forever as data is reloaded. + + Must only be called once. + """ + pass diff --git a/tensorboard/data/ingester_test.py b/tensorboard/data/ingester_test.py new file mode 100644 index 0000000000..d9271ca55e --- /dev/null +++ b/tensorboard/data/ingester_test.py @@ -0,0 +1,21 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Unit tests for `tensorboard.ingester`.""" + +from tensorboard.data import ingester + +# This is a pure abstract class. There's really nothing to test other than that +# it executes successfully. +del ingester diff --git a/tensorboard/data/server/DEVELOPMENT.md b/tensorboard/data/server/DEVELOPMENT.md index 3433be9548..98a9e92ca1 100644 --- a/tensorboard/data/server/DEVELOPMENT.md +++ b/tensorboard/data/server/DEVELOPMENT.md @@ -36,6 +36,45 @@ Rust source files in your editor. For editor setup, consult [ra-docs]: https://rust-analyzer.github.io/ +## Running under TensorBoard + +You can point TensorBoard at a data server in two ways: start the server +yourself and give TensorBoard an address, or tell TensorBoard to start the +server as a subprocess. + +To connect to an existing server, pass `--grpc_data_provider ADDRESS`, where the +address is like `localhost:6806`. Thus: + +``` +bazel run -c opt //tensorboard -- --grpc_data_provider localhost:6806 +``` + +You don’t have to pass a `--logdir` if you do this, but you do have to +concurrently run `//tensorboard/data/server` (say, in the background, or in a +separate shell). You can also swap out the data server whenever you want without +restarting TensorBoard; new RPCs will transparently reconnect. The server +doesn’t have to be running when TensorBoard starts. + +To tell TensorBoard to start the server as a subprocess, build with +`--define=link_data_server=true` and pass `--load_fast` to TensorBoard along +with a normal `--logdir`. Thus: + +``` +bazel run -c opt --define=link_data_server=true //tensorboard -- \ + --load_fast --logdir ~/tensorboard_data/mnist/ --bind_all --verbosity 0 +``` + +This is an easier one-shot solution, but requires a `--define` flag, offers less +flexibility over the flags to the data server, and requires restarting +TensorBoard if you want to restart the data server (though that’s not usually a +big deal). The data server will automatically shut down when TensorBoard exits +for any reason. + +As an alternative to `--define=link_data_server=true`, you can set the +`TENSORBOARD_DATA_SERVER_BINARY` environment variable to the path to a data +server binary, and pass `--load_fast`. If running with `bazel run`, this should +be an absolute path. + ## Adding third-party dependencies Rust dependencies are usually hosted on [crates.io]. We use [`cargo-raze`][raze] @@ -51,8 +90,8 @@ dependency: package on . 3. Change into the `tensorboard/data/server/` directory. 4. Run `cargo fetch` to update `Cargo.lock`. Running this before `cargo raze` - ensures that the `http_archive` workspace rules in the generated build - files will have `sha256` checksums. + ensures that the `http_archive` workspace rules in the generated build files + will have `sha256` checksums. 5. Run `cargo raze` to update `third_party/rust/...`. This will add a new target like `//third_party/rust:rand`. Manually build it diff --git a/tensorboard/data/server/main.rs b/tensorboard/data/server/main.rs index 4fc74e6274..fc4af5d968 100644 --- a/tensorboard/data/server/main.rs +++ b/tensorboard/data/server/main.rs @@ -14,10 +14,11 @@ limitations under the License. ==============================================================================*/ use clap::Clap; -use log::{debug, info, LevelFilter}; -use std::io::Read; +use log::{debug, error, info, LevelFilter}; +use std::fs::File; +use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::thread; use std::time::{Duration, Instant}; @@ -74,6 +75,15 @@ struct Opts { /// sense) but not killed. #[clap(long)] die_after_stdin: bool, + + /// Write bound port to this file + /// + /// Once a server socket is opened, write the port on which it's listening to the file at this + /// path. Useful with `--port 0`. Port will be written as ASCII decimal followed by a newline + /// (e.g., "6806\n"). If the server fails to start, this file may not be written at all. If the + /// port file is specified but cannot be written, the server will die. + #[clap(long)] + port_file: Option, } /// A duration in seconds. @@ -113,21 +123,39 @@ async fn main() -> Result<(), Box> { let bound = listener.local_addr()?; eprintln!("listening on {:?}", bound); + if let Some(port_file) = opts.port_file { + let port = bound.port(); + if let Err(e) = write_port_file(&port_file, port) { + error!( + "Failed to write port \"{}\" to {}: {}", + port, + port_file.display(), + e + ); + std::process::exit(1); + } + info!("Wrote port \"{}\" to {}", port, port_file.display()); + } + // Leak the commit object, since the Tonic server must have only 'static references. This only // leaks the outer commit structure (of constant size), not the pointers to the actual data. let commit: &'static Commit = Box::leak(Box::new(Commit::new())); thread::Builder::new() .name("Reloader".to_string()) - .spawn(move || { - let mut loader = LogdirLoader::new(commit, opts.logdir); - loop { - info!("Starting load cycle"); - let start = Instant::now(); - loader.reload(); - let end = Instant::now(); - info!("Finished load cycle ({:?})", end - start); - thread::sleep(opts.reload_interval.duration()); + .spawn({ + let logdir = opts.logdir; + let reload_interval = opts.reload_interval; + move || { + let mut loader = LogdirLoader::new(commit, logdir); + loop { + info!("Starting load cycle"); + let start = Instant::now(); + loader.reload(); + let end = Instant::now(); + info!("Finished load cycle ({:?})", end - start); + thread::sleep(reload_interval.duration()); + } } }) .expect("failed to spawn reloader thread"); @@ -156,3 +184,11 @@ fn die_after_stdin() { info!("Stdin closed; exiting"); std::process::exit(0); } + +/// Writes `port` to file `path` as an ASCII decimal followed by newline. +fn write_port_file(path: &Path, port: u16) -> std::io::Result<()> { + let mut f = File::create(path)?; + writeln!(f, "{}", port)?; + f.sync_all()?; + Ok(()) +} diff --git a/tensorboard/data/server_ingester.py b/tensorboard/data/server_ingester.py new file mode 100644 index 0000000000..147ee1c188 --- /dev/null +++ b/tensorboard/data/server_ingester.py @@ -0,0 +1,149 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Provides data ingestion logic backed by a gRPC server.""" + +import errno +import logging +import os +import subprocess +import tempfile +import time + +import grpc + +from tensorboard.data import grpc_provider +from tensorboard.data import ingester +from tensorboard.util import tb_logging + + +logger = tb_logging.get_logger() + +# If this environment variable is non-empty, it will be used as the path to the +# data server binary rather than using a bundled version. +_ENV_DATA_SERVER_BINARY = "TENSORBOARD_DATA_SERVER_BINARY" + + +class ExistingServerDataIngester(ingester.DataIngester): + """Connect to an already running gRPC server.""" + + def __init__(self, address): + self._data_provider = _make_provider(address) + + @property + def data_provider(self): + return self._data_provider + + def start(self): + pass + + +class SubprocessServerDataIngester(ingester.DataIngester): + """Start a new data server as a subprocess.""" + + def __init__(self, logdir): + self._data_provider = None + self._logdir = logdir + + @property + def data_provider(self): + if self._data_provider is None: + raise RuntimeError("Must call `start` first") + return self._data_provider + + def start(self): + if self._data_provider: + return + server_binary = os.environ.get(_ENV_DATA_SERVER_BINARY) + if not server_binary: + server_binary = os.path.join( + os.path.dirname(__file__), "server", "server" + ) + logger.info("Data server binary: %s", server_binary) + if not os.path.exists(server_binary): + raise RuntimeError( + "TensorBoard data server not found. This mode is experimental " + "and not supported in release builds. If building from source, " + "pass --define=link_data_server=true." + ) + + tmpdir = tempfile.TemporaryDirectory(prefix="tensorboard_data_server_") + port_file_path = os.path.join(tmpdir.name, "port") + + args = [ + server_binary, + "--logdir=%s" % (self._logdir,), + "--port=0", + "--port-file=%s" % (port_file_path,), + "--die-after-stdin", + ] + if logger.isEnabledFor(logging.INFO): + args.append("--verbose") + + logger.info("Spawning data server: %r", args) + popen = subprocess.Popen(args, stdin=subprocess.PIPE) + # Stash stdin to avoid calling its destructor: on Windows, this + # is a `subprocess.Handle` that closes itself in `__del__`, + # which would cause the data server to shut down. (This is not + # documented; you have to read CPython source to figure it out.) + # We want that to happen at end of process, but not before. + self._stdin_handle = popen.stdin # stash to avoid stdin being closed + + port = None + # The server only needs about 10 microseconds to spawn on my machine, + # but give a few orders of magnitude of padding, and then poll. + time.sleep(0.01) + for i in range(20): + if popen.poll() is not None: + raise RuntimeError( + "Data server exited with %d; check stderr for details" + % popen.poll() + ) + logger.info("Polling for data server port (attempt %d)", i) + port_file_contents = None + try: + with open(port_file_path) as infile: + port_file_contents = infile.read() + except OSError as e: + if e.errno != errno.ENOENT: + raise + logger.info("Port file contents: %r", port_file_contents) + if (port_file_contents or "").endswith("\n"): + port = int(port_file_contents) + break + # Else, not done writing yet. + time.sleep(0.5) + if port is None: + raise RuntimeError( + "Timed out while waiting for data server to start. " + "It may still be running as pid %d." % popen.pid + ) + + addr = "localhost:%d" % port + self._data_provider = _make_provider(addr) + logger.info( + "Established connection to data server at pid %d via %s", + popen.pid, + addr, + ) + + +def _make_provider(addr): + options = [ + ("grpc.max_receive_message_length", 1024 * 1024 * 256), + ] + creds = grpc.local_channel_credentials() + channel = grpc.secure_channel(addr, creds, options=options) + stub = grpc_provider.make_stub(channel) + return grpc_provider.GrpcDataProvider(addr, stub) diff --git a/tensorboard/data/server_ingester_test.py b/tensorboard/data/server_ingester_test.py new file mode 100644 index 0000000000..9e9b4fb773 --- /dev/null +++ b/tensorboard/data/server_ingester_test.py @@ -0,0 +1,102 @@ +# Copyright 2020 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Unit tests for `tensorboard.data.server_ingester`.""" + +import os +import subprocess +import threading +import time +from unittest import mock + +import grpc + +from tensorboard import test as tb_test +from tensorboard.data import grpc_provider +from tensorboard.data import server_ingester + + +class ExistingServerDataIngesterTest(tb_test.TestCase): + def test(self): + addr = "localhost:6806" + with mock.patch.object(grpc, "secure_channel", autospec=True): + ingester = server_ingester.ExistingServerDataIngester(addr) + ingester.start() + self.assertIsInstance( + ingester.data_provider, grpc_provider.GrpcDataProvider + ) + + +class SubprocessServerDataIngesterTest(tb_test.TestCase): + def test(self): + # Create a fake server binary so that the `os.path.exists` check + # passes. + fake_binary = os.path.join(self.get_temp_dir(), "server") + with open(fake_binary, "wb"): + pass + self.enter_context( + mock.patch.dict( + os.environ, + {server_ingester._ENV_DATA_SERVER_BINARY: fake_binary}, + ) + ) + + real_popen = subprocess.Popen + port_file_box = [None] # value of `--port-file` to be stashed here + + # Stub out `subprocess.Popen` to write the port file. + def fake_popen(subprocess_args, *args, **kwargs): + def target(): + time.sleep(0.2) # wait one cycle + for arg in subprocess_args: + port_file_prefix = "--port-file=" + if not arg.startswith(port_file_prefix): + continue + port_file = arg[len(port_file_prefix) :] + port_file_box[0] = port_file + with open(port_file, "w") as outfile: + outfile.write("23456\n") + + result = mock.create_autospec(real_popen, instance=True) + result.stdin = mock.Mock() + result.poll = lambda: None + result.pid = 789 + threading.Thread(target=target).start() + return result + + with mock.patch.object(subprocess, "Popen", wraps=fake_popen) as popen: + with mock.patch.object(grpc, "secure_channel", autospec=True) as sc: + logdir = "/tmp/logs" + ingester = server_ingester.SubprocessServerDataIngester(logdir) + ingester.start() + self.assertIsInstance( + ingester.data_provider, grpc_provider.GrpcDataProvider + ) + + expected_args = [ + fake_binary, + "--logdir=/tmp/logs", + "--port=0", + "--port-file=%s" % port_file_box[0], + "--die-after-stdin", + "--verbose", # logging is enabled in tests + ] + popen.assert_called_once_with(expected_args, stdin=subprocess.PIPE) + sc.assert_called_once_with( + "localhost:23456", mock.ANY, options=mock.ANY + ) + + +if __name__ == "__main__": + tb_test.main() diff --git a/tensorboard/plugins/core/core_plugin.py b/tensorboard/plugins/core/core_plugin.py index 8199e6a1f4..eca74e39fd 100644 --- a/tensorboard/plugins/core/core_plugin.py +++ b/tensorboard/plugins/core/core_plugin.py @@ -293,6 +293,14 @@ def define_flags(self, parser): % DEFAULT_PORT, ) + parser.add_argument( + "--load_fast", + action="store_true", + help="""\ +Experimental. Use a data server to accelerate loading. +""", + ) + parser.add_argument( "--grpc_data_provider", metavar="PORT", diff --git a/tensorboard/program.py b/tensorboard/program.py index 9cf59855e7..7e2c0c4b65 100644 --- a/tensorboard/program.py +++ b/tensorboard/program.py @@ -48,7 +48,6 @@ from absl import flags as absl_flags from absl.flags import argparse_flags import absl.logging -import grpc import six from six.moves import urllib from six.moves import xrange # pylint: disable=redefined-builtin @@ -57,9 +56,9 @@ from tensorboard import manager from tensorboard import version from tensorboard.backend import application -from tensorboard.backend.event_processing import data_ingester +from tensorboard.backend.event_processing import data_ingester as local_ingester from tensorboard.backend.event_processing import event_file_inspector as efi -from tensorboard.data import grpc_provider +from tensorboard.data import server_ingester from tensorboard.plugins.core import core_plugin from tensorboard.util import argparse_util from tensorboard.util import tb_logging @@ -409,24 +408,29 @@ def _fix_mime_types(self): mimetypes.add_type("font/woff2", ".woff2") mimetypes.add_type("text/html", ".html") - def _make_grpc_provider(self, addr): - options = [ - ("grpc.max_receive_message_length", 1024 * 1024 * 256), - ] - channel = grpc.insecure_channel(addr, options=options) - stub = grpc_provider.make_stub(channel) - provider = grpc_provider.GrpcDataProvider(addr, stub) - return provider - def _make_data_provider(self): """Returns `(data_provider, deprecated_multiplexer)`.""" - grpc_addr = self.flags.grpc_data_provider - if grpc_addr: - return (self._make_grpc_provider(grpc_addr), None) + flags = self.flags + if flags.grpc_data_provider: + ingester = server_ingester.ExistingServerDataIngester( + flags.grpc_data_provider + ) + elif flags.load_fast: + ingester = server_ingester.SubprocessServerDataIngester( + flags.logdir + ) else: - ingester = data_ingester.LocalDataIngester(self.flags) - ingester.start() - return (ingester.data_provider, ingester.deprecated_multiplexer) + ingester = local_ingester.LocalDataIngester(flags) + + # Stash ingester so that it can avoid GCing Windows file handles. + # (See comment in `SubprocessServerDataIngester.start` for details.) + self._ingester = ingester + + ingester.start() + deprecated_multiplexer = None + if isinstance(ingester, local_ingester.LocalDataIngester): + deprecated_multiplexer = ingester.deprecated_multiplexer + return (ingester.data_provider, deprecated_multiplexer) def _make_server(self): """Constructs the TensorBoard WSGI app and instantiates the server."""